spmc/
lib.rs

1#![deny(warnings)]
2#![deny(missing_docs)]
3//! # SPMC
4//!
5//! A single producer, multiple consumers. Commonly used to implement
6//! work-stealing.
7//!
8//! ## Example
9//!
10//! ```
11//! # use std::thread;
12//! let (mut tx, rx) = spmc::channel();
13//!
14//! let mut handles = Vec::new();
15//! for n in 0..5 {
16//!     let rx = rx.clone();
17//!     handles.push(thread::spawn(move || {
18//!         let msg = rx.recv().unwrap();
19//!         println!("worker {} recvd: {}", n, msg);
20//!     }));
21//! }
22//!
23//! for i in 0..5 {
24//!     tx.send(i * 2).unwrap();
25//! }
26//!
27//! for handle in handles {
28//!   handle.join().unwrap();
29//! }
30//! ```
31
32mod channel;
33mod loom;
34
35pub use self::channel::{channel, Sender, Receiver};
36pub use std::sync::mpsc::{SendError, RecvError, TryRecvError};
37
38
39#[cfg(test)]
40mod tests {
41    use super::*;
42
43    #[test]
44    fn test_sanity() {
45        let (mut tx, rx) = channel();
46        tx.send(5).unwrap();
47        tx.send(12).unwrap();
48        tx.send(1).unwrap();
49
50        assert_eq!(rx.try_recv(), Ok(5));
51        assert_eq!(rx.try_recv(), Ok(12));
52        assert_eq!(rx.try_recv(), Ok(1));
53        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
54    }
55
56    #[test]
57    fn test_multiple_consumers() {
58        let (mut tx, rx) = channel();
59        let rx2 = rx.clone();
60        tx.send(5).unwrap();
61        tx.send(12).unwrap();
62        tx.send(1).unwrap();
63
64        assert_eq!(rx.try_recv(), Ok(5));
65        assert_eq!(rx2.try_recv(), Ok(12));
66        assert_eq!(rx2.try_recv(), Ok(1));
67        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
68        assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
69    }
70
71    #[test]
72    fn test_send_on_dropped_chan() {
73        let (mut tx, rx) = channel();
74        drop(rx);
75        assert_eq!(tx.send(5), Err(SendError(5)));
76    }
77
78    #[test]
79    fn test_try_recv_on_dropped_chan() {
80        let (mut tx, rx) = channel();
81        tx.send(2).unwrap();
82        drop(tx);
83
84        assert_eq!(rx.try_recv(), Ok(2));
85        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
86        assert_eq!(rx.recv(), Err(RecvError));
87    }
88
89    #[test]
90    fn test_recv_blocks() {
91        use std::thread;
92        use std::sync::Arc;
93        use std::sync::atomic::{AtomicBool, Ordering};
94
95        let (mut tx, rx) = channel();
96        let toggle = Arc::new(AtomicBool::new(false));
97        let toggle_clone = toggle.clone();
98        thread::spawn(move || {
99            toggle_clone.store(true, Ordering::Relaxed);
100            tx.send(11).unwrap();
101        });
102
103        assert_eq!(rx.recv(), Ok(11));
104        assert!(toggle.load(Ordering::Relaxed))
105    }
106
107    #[test]
108    fn test_recv_unblocks_on_dropped_chan() {
109        use std::thread;
110
111        let (tx, rx) = channel::<i32>();
112        thread::spawn(move || {
113            let _tx = tx;
114        });
115
116        assert_eq!(rx.recv(), Err(RecvError));
117    }
118
119    #[test]
120    fn test_send_sleep() {
121        use std::thread;
122        use std::time::Duration;
123
124        let (mut tx, rx) = channel();
125
126        let mut handles = Vec::new();
127        for _ in 0..5 {
128            let rx = rx.clone();
129            handles.push(thread::spawn(move || {
130                rx.recv().unwrap();
131            }));
132        }
133
134        for i in 0..5 {
135            tx.send(i * 2).unwrap();
136            thread::sleep(Duration::from_millis(100));
137        }
138
139        for handle in handles {
140            handle.join().unwrap();
141        }
142    }
143
144    #[test]
145    fn test_tx_dropped_rxs_drain() {
146        for l in 0..10 {
147            println!("loop {}", l);
148
149            let (mut tx, rx) = channel();
150
151            let mut handles = Vec::new();
152            for _ in 0..5 {
153                let rx = rx.clone();
154                handles.push(::std::thread::spawn(move || {
155                    loop {
156                        match rx.recv() {
157                            Ok(_) => continue,
158                            Err(_) => break,
159                        }
160                    }
161                }));
162            }
163
164            for i in 0..10 {
165                tx.send(format!("Sending value {} {}", l, i)).unwrap();
166            }
167            drop(tx);
168
169            for handle in handles {
170                handle.join().unwrap();
171            }
172        }
173    }
174
175    #[test]
176    fn msg_dropped() {
177        use std::sync::Arc;
178        use std::sync::atomic::{AtomicBool, Ordering};
179        struct Dropped(Arc<AtomicBool>);
180
181        impl Drop for Dropped {
182            fn drop(&mut self) {
183                self.0.store(true, Ordering::Relaxed);
184            }
185        }
186
187        let sentinel = Arc::new(AtomicBool::new(false));
188        assert!(!sentinel.load(Ordering::Relaxed));
189
190
191        let (mut tx, rx) = channel();
192
193        tx.send(Dropped(sentinel.clone())).unwrap();
194        assert!(!sentinel.load(Ordering::Relaxed));
195
196        rx.recv().unwrap();
197        assert!(sentinel.load(Ordering::Relaxed));
198    }
199
200
201    #[test]
202    fn msgs_dropped() {
203        use std::sync::Arc;
204        use std::sync::atomic::{AtomicUsize, Ordering};
205        struct Dropped(Arc<AtomicUsize>);
206
207        impl Drop for Dropped {
208            fn drop(&mut self) {
209                self.0.fetch_add(1, Ordering::Relaxed);
210            }
211        }
212
213        let sentinel = Arc::new(AtomicUsize::new(0));
214        assert_eq!(0, sentinel.load(Ordering::Relaxed));
215
216
217        let (mut tx, rx) = channel();
218
219        tx.send(Dropped(sentinel.clone())).unwrap();
220        tx.send(Dropped(sentinel.clone())).unwrap();
221        tx.send(Dropped(sentinel.clone())).unwrap();
222        tx.send(Dropped(sentinel.clone())).unwrap();
223        assert_eq!(0, sentinel.load(Ordering::Relaxed));
224
225        rx.recv().unwrap();
226        assert_eq!(1, sentinel.load(Ordering::Relaxed));
227        rx.recv().unwrap();
228        rx.recv().unwrap();
229        rx.recv().unwrap();
230        assert_eq!(4, sentinel.load(Ordering::Relaxed));
231    }
232}