Skip to main content

bus_queue/bus/
mod.rs

1use crate::channel::bounded as raw_bounded;
2use crate::piper::event::Event;
3use crate::swap_slot::SwapSlot;
4// use piper::Event;
5use std::sync::Arc;
6
7pub mod publisher;
8pub mod subscriber;
9
10pub use publisher::Publisher;
11pub use subscriber::Subscriber;
12
13#[allow(dead_code)]
14pub fn bounded<T, S: SwapSlot<T>>(size: usize) -> (Publisher<T, S>, Subscriber<T, S>) {
15    let (sender, receiver) = raw_bounded(size);
16    let event = Arc::new(Event::new());
17    (
18        Publisher::from((sender, event.clone())),
19        Subscriber::from((receiver, event)),
20    )
21}
22#[cfg(test)]
23mod test {
24    use crate::bounded as default_bounded;
25    use futures::{pin_mut, task::Poll, FutureExt, SinkExt};
26    use futures_test::task::noop_context;
27    use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
28    use std::sync::Arc;
29
30    #[test]
31    fn subscriber_is_in_pending_state_before_first_data_is_published() {
32        let (_publisher, subscriber) = default_bounded::<usize>(1);
33        pin_mut!(subscriber);
34
35        // Assert that subscriber stream is pending before the publisher publishes.
36        assert_stream_pending!(subscriber);
37    }
38
39    #[test]
40    fn subscriber_receives_an_item_after_it_is_published() {
41        let mut cx = noop_context();
42        let (publisher, subscriber) = default_bounded::<usize>(1);
43        pin_mut!(subscriber);
44        pin_mut!(publisher);
45
46        // Publish one item (1).
47        assert_eq!(publisher.send(1).poll_unpin(&mut cx), Poll::Ready(Ok(())));
48
49        // Assert that the subscriber can receive item (1).
50        assert_stream_next!(subscriber, Arc::new(1));
51    }
52
53    #[test]
54    fn subscriber_recieves_an_item_after_publisher_overflowed() {
55        let mut cx = noop_context();
56        let (publisher, subscriber) = default_bounded::<usize>(1);
57        pin_mut!(subscriber);
58        pin_mut!(publisher);
59
60        // Publish item (1).
61        assert_eq!(publisher.send(1).poll_unpin(&mut cx), Poll::Ready(Ok(())));
62
63        // Assert that the publisher is not blocked even when overflowed
64        // by publishing another item (2) while queue size is 1
65        assert_eq!(publisher.send(2).poll_unpin(&mut cx), Poll::Ready(Ok(())));
66
67        // Assert that the subscriber receives the second item (2),
68        // since the first one (1) was dropped
69        assert_stream_next!(subscriber, Arc::new(2));
70    }
71    #[test]
72    fn subscriber_is_done_after_publisher_closes() {
73        let mut cx = noop_context();
74        let (publisher, subscriber) = default_bounded::<usize>(1);
75        pin_mut!(subscriber);
76        pin_mut!(publisher);
77
78        // Close Publisher.
79        assert_eq!(publisher.close().poll_unpin(&mut cx), Poll::Ready(Ok(())));
80
81        // Assert that the subscriber is done..
82        assert_stream_done!(subscriber);
83    }
84
85    #[test]
86    fn subscriber_is_done_after_publisher_drop() {
87        let (publisher, subscriber) = default_bounded::<usize>(1);
88        pin_mut!(subscriber);
89
90        // Drop Publisher
91        drop(publisher);
92
93        // Assert that the subscriber is done.
94        assert_stream_done!(subscriber);
95    }
96
97    #[test]
98    fn notify() {
99        let (publisher, subscriber) = default_bounded::<usize>(1);
100        pin_mut!(subscriber);
101        pin_mut!(publisher);
102
103        // Assert that subscriber stream is pending before the publisher publishes.
104        assert_stream_pending!(subscriber);
105
106        // Publish one item (1).
107        let mut cx = noop_context();
108        assert_eq!(publisher.send(1).poll_unpin(&mut cx), Poll::Ready(Ok(())));
109
110        // Assert that the subscriber can receive item (1).
111        assert_stream_next!(subscriber, Arc::new(1));
112
113        // Publish one more item  (2).
114        assert_eq!(publisher.send(2).poll_unpin(&mut cx), Poll::Ready(Ok(())));
115
116        // Assert that the subscriber can receive item (2).
117        assert_stream_next!(subscriber, Arc::new(2));
118
119        // Assert that the subscirber is pending of another item to be published.
120        assert_stream_pending!(subscriber);
121
122        // Close publisher.
123        assert_eq!(publisher.close().poll_unpin(&mut cx), Poll::Ready(Ok(())));
124
125        // Assert that subscriber is done.
126        assert_stream_done!(subscriber);
127    }
128    #[test]
129    fn test_set_skip_items() {
130        let (publisher, subscriber1) = default_bounded(3);
131        let mut subscriber2 = subscriber1.clone();
132        let mut subscriber3 = subscriber1.clone();
133        let mut subscriber4 = subscriber1.clone();
134        subscriber2.set_skip_items(1);
135        subscriber3.set_skip_items(2);
136        subscriber4.set_skip_items(3);
137
138        pin_mut!(publisher);
139        pin_mut!(subscriber1);
140        pin_mut!(subscriber2);
141        pin_mut!(subscriber3);
142        pin_mut!(subscriber4);
143
144        let mut cx = noop_context();
145        for i in 0..6 {
146            assert_eq!(publisher.send(i).poll_unpin(&mut cx), Poll::Ready(Ok(())));
147        }
148        assert_stream_next!(subscriber1, Arc::new(3));
149        assert_stream_next!(subscriber2, Arc::new(4));
150        assert_stream_next!(subscriber3, Arc::new(5));
151        assert_stream_next!(subscriber4, Arc::new(5));
152    }
153
154    #[test]
155    fn test_publisher_eq() {
156        let (publisher1, _) = default_bounded::<i32>(1);
157        let (publisher2, _) = default_bounded::<i32>(1);
158        assert!(!publisher1.eq(&publisher2));
159        assert!(publisher1.eq(&publisher1));
160        assert!(publisher2.eq(&publisher2));
161    }
162
163    #[test]
164    fn test_subscriber_eq() {
165        let (_, subscriber1) = default_bounded::<i32>(1);
166        let subscriber2 = subscriber1.clone();
167        let (_, subscriber3) = default_bounded::<i32>(1);
168        assert_eq!(subscriber1, subscriber2);
169        assert_ne!(subscriber2, subscriber3);
170        assert_ne!(subscriber1, subscriber3);
171    }
172}