1use crate::channel::bounded as raw_bounded;
2use crate::piper::event::Event;
3use crate::swap_slot::SwapSlot;
4use std::sync::Arc;
6
7pub mod publisher;
8pub mod subscriber;
9
10pub use publisher::Publisher;
11pub use subscriber::Subscriber;
12
13#[allow(dead_code)]
14pub fn bounded<T, S: SwapSlot<T>>(size: usize) -> (Publisher<T, S>, Subscriber<T, S>) {
15 let (sender, receiver) = raw_bounded(size);
16 let event = Arc::new(Event::new());
17 (
18 Publisher::from((sender, event.clone())),
19 Subscriber::from((receiver, event)),
20 )
21}
22#[cfg(test)]
23mod test {
24 use crate::bounded as default_bounded;
25 use futures::{pin_mut, task::Poll, FutureExt, SinkExt};
26 use futures_test::task::noop_context;
27 use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
28 use std::sync::Arc;
29
30 #[test]
31 fn subscriber_is_in_pending_state_before_first_data_is_published() {
32 let (_publisher, subscriber) = default_bounded::<usize>(1);
33 pin_mut!(subscriber);
34
35 assert_stream_pending!(subscriber);
37 }
38
39 #[test]
40 fn subscriber_receives_an_item_after_it_is_published() {
41 let mut cx = noop_context();
42 let (publisher, subscriber) = default_bounded::<usize>(1);
43 pin_mut!(subscriber);
44 pin_mut!(publisher);
45
46 assert_eq!(publisher.send(1).poll_unpin(&mut cx), Poll::Ready(Ok(())));
48
49 assert_stream_next!(subscriber, Arc::new(1));
51 }
52
53 #[test]
54 fn subscriber_recieves_an_item_after_publisher_overflowed() {
55 let mut cx = noop_context();
56 let (publisher, subscriber) = default_bounded::<usize>(1);
57 pin_mut!(subscriber);
58 pin_mut!(publisher);
59
60 assert_eq!(publisher.send(1).poll_unpin(&mut cx), Poll::Ready(Ok(())));
62
63 assert_eq!(publisher.send(2).poll_unpin(&mut cx), Poll::Ready(Ok(())));
66
67 assert_stream_next!(subscriber, Arc::new(2));
70 }
71 #[test]
72 fn subscriber_is_done_after_publisher_closes() {
73 let mut cx = noop_context();
74 let (publisher, subscriber) = default_bounded::<usize>(1);
75 pin_mut!(subscriber);
76 pin_mut!(publisher);
77
78 assert_eq!(publisher.close().poll_unpin(&mut cx), Poll::Ready(Ok(())));
80
81 assert_stream_done!(subscriber);
83 }
84
85 #[test]
86 fn subscriber_is_done_after_publisher_drop() {
87 let (publisher, subscriber) = default_bounded::<usize>(1);
88 pin_mut!(subscriber);
89
90 drop(publisher);
92
93 assert_stream_done!(subscriber);
95 }
96
97 #[test]
98 fn notify() {
99 let (publisher, subscriber) = default_bounded::<usize>(1);
100 pin_mut!(subscriber);
101 pin_mut!(publisher);
102
103 assert_stream_pending!(subscriber);
105
106 let mut cx = noop_context();
108 assert_eq!(publisher.send(1).poll_unpin(&mut cx), Poll::Ready(Ok(())));
109
110 assert_stream_next!(subscriber, Arc::new(1));
112
113 assert_eq!(publisher.send(2).poll_unpin(&mut cx), Poll::Ready(Ok(())));
115
116 assert_stream_next!(subscriber, Arc::new(2));
118
119 assert_stream_pending!(subscriber);
121
122 assert_eq!(publisher.close().poll_unpin(&mut cx), Poll::Ready(Ok(())));
124
125 assert_stream_done!(subscriber);
127 }
128 #[test]
129 fn test_set_skip_items() {
130 let (publisher, subscriber1) = default_bounded(3);
131 let mut subscriber2 = subscriber1.clone();
132 let mut subscriber3 = subscriber1.clone();
133 let mut subscriber4 = subscriber1.clone();
134 subscriber2.set_skip_items(1);
135 subscriber3.set_skip_items(2);
136 subscriber4.set_skip_items(3);
137
138 pin_mut!(publisher);
139 pin_mut!(subscriber1);
140 pin_mut!(subscriber2);
141 pin_mut!(subscriber3);
142 pin_mut!(subscriber4);
143
144 let mut cx = noop_context();
145 for i in 0..6 {
146 assert_eq!(publisher.send(i).poll_unpin(&mut cx), Poll::Ready(Ok(())));
147 }
148 assert_stream_next!(subscriber1, Arc::new(3));
149 assert_stream_next!(subscriber2, Arc::new(4));
150 assert_stream_next!(subscriber3, Arc::new(5));
151 assert_stream_next!(subscriber4, Arc::new(5));
152 }
153
154 #[test]
155 fn test_publisher_eq() {
156 let (publisher1, _) = default_bounded::<i32>(1);
157 let (publisher2, _) = default_bounded::<i32>(1);
158 assert!(!publisher1.eq(&publisher2));
159 assert!(publisher1.eq(&publisher1));
160 assert!(publisher2.eq(&publisher2));
161 }
162
163 #[test]
164 fn test_subscriber_eq() {
165 let (_, subscriber1) = default_bounded::<i32>(1);
166 let subscriber2 = subscriber1.clone();
167 let (_, subscriber3) = default_bounded::<i32>(1);
168 assert_eq!(subscriber1, subscriber2);
169 assert_ne!(subscriber2, subscriber3);
170 assert_ne!(subscriber1, subscriber3);
171 }
172}