mpmcpq/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3#![warn(rustdoc::missing_crate_level_docs)]
4
5mod mpmcpq;
6pub use self::mpmcpq::PriorityQueue;
7
8mod message;
9pub use message::Message;
10
11mod guard;
12pub use guard::ReceiveGuard;
13
14mod stash;
15pub use stash::Stash;
16
17#[cfg(test)]
18mod tests {
19    use std::{thread, time};
20    use std::sync::Arc;
21    use std::sync::atomic::{AtomicU64, Ordering};
22    use std::io::Write;
23    use std::sync::Once;
24
25    #[allow(unused_imports)]
26    pub use log::{debug, error, info, trace, warn};
27    use env_logger;
28
29    use super::{Message, PriorityQueue, Stash};
30
31    pub fn init_env_logging() {
32        static LOGGER: Once = Once::new();
33        LOGGER.call_once(|| {
34            let counter: AtomicU64 = AtomicU64::new(0);
35            let seq_num = move || counter.fetch_add(1, Ordering::Relaxed);
36
37            let start = time::Instant::now();
38
39            env_logger::Builder::from_default_env()
40                .format(move |buf, record| {
41                    let micros = start.elapsed().as_micros() as u64;
42                    writeln!(
43                        buf,
44                        "{:0>12}: {:0>8}.{:0>6}: {:>5}: {}:{}: {}: {}",
45                        seq_num(),
46                        micros / 1000000,
47                        micros % 1000000,
48                        record.level().as_str(),
49                        record.file().unwrap_or(""),
50                        record.line().unwrap_or(0),
51                        thread::current().name().unwrap_or("UNKNOWN"),
52                        record.args()
53                    )
54                })
55                .try_init()
56                .unwrap();
57        });
58    }
59
60    #[test]
61    fn smoke() {
62        init_env_logging();
63        let queue: PriorityQueue<String, u64> = PriorityQueue::new();
64        let mut stash = Stash::<String, u64>::new(&queue);
65        queue.send("test 1".to_string(), 1, &mut stash);
66        queue.send("test 3".to_string(), 3, &mut stash);
67        queue.send("test 2".to_string(), 2, &mut stash);
68        assert_eq!(
69            queue.recv_guard().message(),
70            &Message::Msg("test 1".to_string(), 1)
71        );
72        assert_eq!(
73            queue.recv_guard().message(),
74            &Message::Msg("test 2".to_string(), 2)
75        );
76        assert_eq!(
77            queue.recv_guard().message(),
78            &Message::Msg("test 3".to_string(), 3)
79        );
80        assert_eq!(queue.recv_guard().message(), &Message::Drained);
81        assert!(queue.try_recv_guard().is_none());
82    }
83
84    #[test]
85    fn try_recv() {
86        init_env_logging();
87        let queue: PriorityQueue<String, u64> = PriorityQueue::new();
88        let mut stash = Stash::<String, u64>::new(&queue);
89        queue.send("test 1".to_string(), 1, &mut stash);
90        queue.send("test 3".to_string(), 3, &mut stash);
91        queue.send("test 2".to_string(), 2, &mut stash);
92        assert!(queue.try_recv_guard().is_some());
93        assert!(queue.try_recv_guard().is_some());
94        assert!(queue.try_recv_guard().is_some());
95        assert!(queue.try_recv_guard().is_some());
96        assert!(queue.try_recv_guard().is_none());
97        assert!(queue.try_recv_guard().is_none());
98    }
99
100    #[test]
101    fn threads() {
102        init_env_logging();
103        let queue: Arc<PriorityQueue<String, u64>> = Arc::new(PriorityQueue::new());
104
105        let thread1_queue = queue.clone();
106        let thread1 = thread::spawn(move || {
107            let mut stash1 = Stash::<String, u64>::new(&thread1_queue);
108            thread1_queue.send("test 1".to_string(), 1, &mut stash1);
109            thread1_queue.send("test 3".to_string(), 3, &mut stash1);
110            thread1_queue.send("test 2".to_string(), 2, &mut stash1);
111        });
112        thread1.join().unwrap();
113
114        let thread2_queue = queue.clone();
115        let thread2 = thread::spawn(move || {
116            let mut stash2 = Stash::<String, u64>::new(&thread2_queue);
117            assert_eq!(
118                thread2_queue.recv_guard().message(),
119                &Message::Msg("test 1".to_string(), 1)
120            );
121            assert_eq!(
122                thread2_queue.recv_guard().message(),
123                &Message::Msg("test 2".to_string(), 2)
124            );
125            assert_eq!(
126                thread2_queue.recv_guard().message(),
127                &Message::Msg("test 3".to_string(), 3)
128            );
129            assert!(thread2_queue.recv_guard().message().is_drained());
130            assert!(thread2_queue.try_recv_guard().is_none());
131            thread2_queue.send("test 4".to_string(), 4, &mut stash2);
132            assert_eq!(
133                thread2_queue.recv_guard().message(),
134                &Message::Msg("test 4".to_string(), 4)
135            );
136            assert!(thread2_queue.recv_guard().message().is_drained());
137            assert!(thread2_queue.try_recv_guard().is_none());
138        });
139
140        thread2.join().unwrap();
141    }
142}