1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3#![warn(rustdoc::missing_crate_level_docs)]
4
5mod mpmcpq;
6pub use self::mpmcpq::PriorityQueue;
7
8mod message;
9pub use message::Message;
10
11mod guard;
12pub use guard::ReceiveGuard;
13
14mod stash;
15pub use stash::Stash;
16
17#[cfg(test)]
18mod tests {
19 use std::{thread, time};
20 use std::sync::Arc;
21 use std::sync::atomic::{AtomicU64, Ordering};
22 use std::io::Write;
23 use std::sync::Once;
24
25 #[allow(unused_imports)]
26 pub use log::{debug, error, info, trace, warn};
27 use env_logger;
28
29 use super::{Message, PriorityQueue, Stash};
30
31 pub fn init_env_logging() {
32 static LOGGER: Once = Once::new();
33 LOGGER.call_once(|| {
34 let counter: AtomicU64 = AtomicU64::new(0);
35 let seq_num = move || counter.fetch_add(1, Ordering::Relaxed);
36
37 let start = time::Instant::now();
38
39 env_logger::Builder::from_default_env()
40 .format(move |buf, record| {
41 let micros = start.elapsed().as_micros() as u64;
42 writeln!(
43 buf,
44 "{:0>12}: {:0>8}.{:0>6}: {:>5}: {}:{}: {}: {}",
45 seq_num(),
46 micros / 1000000,
47 micros % 1000000,
48 record.level().as_str(),
49 record.file().unwrap_or(""),
50 record.line().unwrap_or(0),
51 thread::current().name().unwrap_or("UNKNOWN"),
52 record.args()
53 )
54 })
55 .try_init()
56 .unwrap();
57 });
58 }
59
60 #[test]
61 fn smoke() {
62 init_env_logging();
63 let queue: PriorityQueue<String, u64> = PriorityQueue::new();
64 let mut stash = Stash::<String, u64>::new(&queue);
65 queue.send("test 1".to_string(), 1, &mut stash);
66 queue.send("test 3".to_string(), 3, &mut stash);
67 queue.send("test 2".to_string(), 2, &mut stash);
68 assert_eq!(
69 queue.recv_guard().message(),
70 &Message::Msg("test 1".to_string(), 1)
71 );
72 assert_eq!(
73 queue.recv_guard().message(),
74 &Message::Msg("test 2".to_string(), 2)
75 );
76 assert_eq!(
77 queue.recv_guard().message(),
78 &Message::Msg("test 3".to_string(), 3)
79 );
80 assert_eq!(queue.recv_guard().message(), &Message::Drained);
81 assert!(queue.try_recv_guard().is_none());
82 }
83
84 #[test]
85 fn try_recv() {
86 init_env_logging();
87 let queue: PriorityQueue<String, u64> = PriorityQueue::new();
88 let mut stash = Stash::<String, u64>::new(&queue);
89 queue.send("test 1".to_string(), 1, &mut stash);
90 queue.send("test 3".to_string(), 3, &mut stash);
91 queue.send("test 2".to_string(), 2, &mut stash);
92 assert!(queue.try_recv_guard().is_some());
93 assert!(queue.try_recv_guard().is_some());
94 assert!(queue.try_recv_guard().is_some());
95 assert!(queue.try_recv_guard().is_some());
96 assert!(queue.try_recv_guard().is_none());
97 assert!(queue.try_recv_guard().is_none());
98 }
99
100 #[test]
101 fn threads() {
102 init_env_logging();
103 let queue: Arc<PriorityQueue<String, u64>> = Arc::new(PriorityQueue::new());
104
105 let thread1_queue = queue.clone();
106 let thread1 = thread::spawn(move || {
107 let mut stash1 = Stash::<String, u64>::new(&thread1_queue);
108 thread1_queue.send("test 1".to_string(), 1, &mut stash1);
109 thread1_queue.send("test 3".to_string(), 3, &mut stash1);
110 thread1_queue.send("test 2".to_string(), 2, &mut stash1);
111 });
112 thread1.join().unwrap();
113
114 let thread2_queue = queue.clone();
115 let thread2 = thread::spawn(move || {
116 let mut stash2 = Stash::<String, u64>::new(&thread2_queue);
117 assert_eq!(
118 thread2_queue.recv_guard().message(),
119 &Message::Msg("test 1".to_string(), 1)
120 );
121 assert_eq!(
122 thread2_queue.recv_guard().message(),
123 &Message::Msg("test 2".to_string(), 2)
124 );
125 assert_eq!(
126 thread2_queue.recv_guard().message(),
127 &Message::Msg("test 3".to_string(), 3)
128 );
129 assert!(thread2_queue.recv_guard().message().is_drained());
130 assert!(thread2_queue.try_recv_guard().is_none());
131 thread2_queue.send("test 4".to_string(), 4, &mut stash2);
132 assert_eq!(
133 thread2_queue.recv_guard().message(),
134 &Message::Msg("test 4".to_string(), 4)
135 );
136 assert!(thread2_queue.recv_guard().message().is_drained());
137 assert!(thread2_queue.try_recv_guard().is_none());
138 });
139
140 thread2.join().unwrap();
141 }
142}