1#![deny(warnings)]
2#![deny(missing_docs)]
3mod channel;
33mod loom;
34
35pub use self::channel::{channel, Sender, Receiver};
36pub use std::sync::mpsc::{SendError, RecvError, TryRecvError};
37
38
39#[cfg(test)]
40mod tests {
41 use super::*;
42
43 #[test]
44 fn test_sanity() {
45 let (mut tx, rx) = channel();
46 tx.send(5).unwrap();
47 tx.send(12).unwrap();
48 tx.send(1).unwrap();
49
50 assert_eq!(rx.try_recv(), Ok(5));
51 assert_eq!(rx.try_recv(), Ok(12));
52 assert_eq!(rx.try_recv(), Ok(1));
53 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
54 }
55
56 #[test]
57 fn test_multiple_consumers() {
58 let (mut tx, rx) = channel();
59 let rx2 = rx.clone();
60 tx.send(5).unwrap();
61 tx.send(12).unwrap();
62 tx.send(1).unwrap();
63
64 assert_eq!(rx.try_recv(), Ok(5));
65 assert_eq!(rx2.try_recv(), Ok(12));
66 assert_eq!(rx2.try_recv(), Ok(1));
67 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
68 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
69 }
70
71 #[test]
72 fn test_send_on_dropped_chan() {
73 let (mut tx, rx) = channel();
74 drop(rx);
75 assert_eq!(tx.send(5), Err(SendError(5)));
76 }
77
78 #[test]
79 fn test_try_recv_on_dropped_chan() {
80 let (mut tx, rx) = channel();
81 tx.send(2).unwrap();
82 drop(tx);
83
84 assert_eq!(rx.try_recv(), Ok(2));
85 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
86 assert_eq!(rx.recv(), Err(RecvError));
87 }
88
89 #[test]
90 fn test_recv_blocks() {
91 use std::thread;
92 use std::sync::Arc;
93 use std::sync::atomic::{AtomicBool, Ordering};
94
95 let (mut tx, rx) = channel();
96 let toggle = Arc::new(AtomicBool::new(false));
97 let toggle_clone = toggle.clone();
98 thread::spawn(move || {
99 toggle_clone.store(true, Ordering::Relaxed);
100 tx.send(11).unwrap();
101 });
102
103 assert_eq!(rx.recv(), Ok(11));
104 assert!(toggle.load(Ordering::Relaxed))
105 }
106
107 #[test]
108 fn test_recv_unblocks_on_dropped_chan() {
109 use std::thread;
110
111 let (tx, rx) = channel::<i32>();
112 thread::spawn(move || {
113 let _tx = tx;
114 });
115
116 assert_eq!(rx.recv(), Err(RecvError));
117 }
118
119 #[test]
120 fn test_send_sleep() {
121 use std::thread;
122 use std::time::Duration;
123
124 let (mut tx, rx) = channel();
125
126 let mut handles = Vec::new();
127 for _ in 0..5 {
128 let rx = rx.clone();
129 handles.push(thread::spawn(move || {
130 rx.recv().unwrap();
131 }));
132 }
133
134 for i in 0..5 {
135 tx.send(i * 2).unwrap();
136 thread::sleep(Duration::from_millis(100));
137 }
138
139 for handle in handles {
140 handle.join().unwrap();
141 }
142 }
143
144 #[test]
145 fn test_tx_dropped_rxs_drain() {
146 for l in 0..10 {
147 println!("loop {}", l);
148
149 let (mut tx, rx) = channel();
150
151 let mut handles = Vec::new();
152 for _ in 0..5 {
153 let rx = rx.clone();
154 handles.push(::std::thread::spawn(move || {
155 loop {
156 match rx.recv() {
157 Ok(_) => continue,
158 Err(_) => break,
159 }
160 }
161 }));
162 }
163
164 for i in 0..10 {
165 tx.send(format!("Sending value {} {}", l, i)).unwrap();
166 }
167 drop(tx);
168
169 for handle in handles {
170 handle.join().unwrap();
171 }
172 }
173 }
174
175 #[test]
176 fn msg_dropped() {
177 use std::sync::Arc;
178 use std::sync::atomic::{AtomicBool, Ordering};
179 struct Dropped(Arc<AtomicBool>);
180
181 impl Drop for Dropped {
182 fn drop(&mut self) {
183 self.0.store(true, Ordering::Relaxed);
184 }
185 }
186
187 let sentinel = Arc::new(AtomicBool::new(false));
188 assert!(!sentinel.load(Ordering::Relaxed));
189
190
191 let (mut tx, rx) = channel();
192
193 tx.send(Dropped(sentinel.clone())).unwrap();
194 assert!(!sentinel.load(Ordering::Relaxed));
195
196 rx.recv().unwrap();
197 assert!(sentinel.load(Ordering::Relaxed));
198 }
199
200
201 #[test]
202 fn msgs_dropped() {
203 use std::sync::Arc;
204 use std::sync::atomic::{AtomicUsize, Ordering};
205 struct Dropped(Arc<AtomicUsize>);
206
207 impl Drop for Dropped {
208 fn drop(&mut self) {
209 self.0.fetch_add(1, Ordering::Relaxed);
210 }
211 }
212
213 let sentinel = Arc::new(AtomicUsize::new(0));
214 assert_eq!(0, sentinel.load(Ordering::Relaxed));
215
216
217 let (mut tx, rx) = channel();
218
219 tx.send(Dropped(sentinel.clone())).unwrap();
220 tx.send(Dropped(sentinel.clone())).unwrap();
221 tx.send(Dropped(sentinel.clone())).unwrap();
222 tx.send(Dropped(sentinel.clone())).unwrap();
223 assert_eq!(0, sentinel.load(Ordering::Relaxed));
224
225 rx.recv().unwrap();
226 assert_eq!(1, sentinel.load(Ordering::Relaxed));
227 rx.recv().unwrap();
228 rx.recv().unwrap();
229 rx.recv().unwrap();
230 assert_eq!(4, sentinel.load(Ordering::Relaxed));
231 }
232}