1use core::num::NonZeroUsize;
23
24pub(crate) use self::queue::QueuePtr;
25pub(crate) mod shards;
26pub use self::{receiver::Receiver, sender::Sender};
27
28mod queue;
29mod receiver;
30mod sender;
31
32pub fn channel<T>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
53 let queue = queue::QueuePtr::with_size(capacity);
54 (Sender::new(queue.clone()), Receiver::new(queue))
55}
56
57#[cfg(all(test, not(feature = "loom")))]
58mod test {
59 use std::num::NonZeroUsize;
60
61 use super::*;
62 use crate::thread;
63
64 #[test]
65 fn test_valid_sends() {
66 const COUNTS: NonZeroUsize = NonZeroUsize::new(4096).unwrap();
67 let (mut tx, mut rx) = channel::<usize>(COUNTS);
68
69 thread::spawn(move || {
70 for i in 0..COUNTS.get() << 3 {
71 tx.send(i as usize);
72 }
73 });
74
75 for i in 0..COUNTS.get() << 3 {
76 let r = rx.recv();
77 assert_eq!(r, i as usize);
78 }
79 }
80
81 #[test]
82 fn test_valid_try_sends() {
83 let (mut tx, mut rx) = channel::<usize>(NonZeroUsize::new(4).unwrap());
84 for _ in 0..4 {
85 assert!(rx.try_recv().is_none());
86 }
87 for i in 0..4 {
88 tx.try_send(i).unwrap();
89 }
90 assert!(tx.try_send(5).is_err());
91
92 for i in 0..4 {
93 assert_eq!(rx.try_recv(), Some(i));
94 }
95 assert!(rx.try_recv().is_none());
96 for i in 0..4 {
97 tx.try_send(i).unwrap();
98 }
99 }
100
101 #[cfg(feature = "async")]
102 #[test]
103 fn test_async_send() {
104 futures::executor::block_on(async {
105 const COUNTS: NonZeroUsize = NonZeroUsize::new(4096).unwrap();
106
107 let (mut tx, mut rx) = channel::<usize>(COUNTS);
108
109 thread::spawn(move || {
110 for i in 0..COUNTS.get() << 1 {
111 futures::executor::block_on(tx.send_async(i));
112 }
113 drop(tx);
114 });
115 for i in 0..COUNTS.get() << 1 {
116 assert_eq!(rx.recv_async().await, i);
117 }
118 });
119 }
120
121 #[test]
122 fn test_batched_send_recv() {
123 const CAPACITY: NonZeroUsize = NonZeroUsize::new(1024).unwrap();
124 const TOTAL_ITEMS: usize = 1024 << 4;
125 let (mut tx, mut rx) = channel::<usize>(CAPACITY);
126
127 thread::spawn(move || {
128 let mut sent = 0;
129 while sent < TOTAL_ITEMS {
130 let buffer = tx.write_buffer();
131 let batch_size = buffer.len().min(TOTAL_ITEMS - sent);
132 for i in 0..batch_size {
133 buffer[i].write(sent + i);
134 }
135 unsafe { tx.commit(batch_size) };
136 sent += batch_size;
137 }
138 });
139
140 let mut received = 0;
141 let mut expected = 0;
142
143 while received < TOTAL_ITEMS {
144 let buffer = rx.read_buffer();
145 if buffer.is_empty() {
146 continue;
147 }
148 for &value in buffer.iter() {
149 assert_eq!(value, expected);
150 expected += 1;
151 }
152 let count = buffer.len();
153 unsafe { rx.advance(count) };
154 received += count;
155 }
156
157 assert_eq!(received, TOTAL_ITEMS);
158 }
159
160 #[test]
161 fn test_drop_remaining_elements() {
162 use std::sync::atomic::{AtomicUsize, Ordering};
163
164 static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
165
166 #[derive(Clone)]
167 struct DropCounter;
168
169 impl Drop for DropCounter {
170 fn drop(&mut self) {
171 DROP_COUNT.fetch_add(1, Ordering::SeqCst);
172 }
173 }
174
175 DROP_COUNT.store(0, Ordering::SeqCst);
176
177 {
178 let (mut tx, rx) = channel::<DropCounter>(NonZeroUsize::new(16).unwrap());
179
180 for _ in 0..5 {
182 tx.send(DropCounter);
183 }
184
185 drop(tx);
187 drop(rx);
188 }
189
190 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 5);
192 }
193}
194
195#[cfg(all(test, feature = "loom"))]
196mod loom_test {
197 use core::num::NonZeroUsize;
198
199 use super::*;
200 use crate::thread;
201
202 #[test]
203 fn basic_loom() {
204 loom::model(|| {
205 let (mut tx, mut rx) = channel::<usize>(NonZeroUsize::new(2).unwrap());
206 let counts = 3;
207
208 thread::spawn(move || {
209 for i in 0..counts {
210 tx.send(i);
211 }
212 });
213
214 for i in 0..counts {
215 let r = rx.recv();
216 assert_eq!(r, i);
217 }
218 })
219 }
220
221 #[test]
222 fn try_ops_loom() {
223 loom::model(|| {
224 let (mut tx, mut rx) = channel::<usize>(NonZeroUsize::new(2).unwrap());
225
226 thread::spawn(move || {
227 let mut i = 0;
228 while i < 3 {
229 if tx.try_send(i).is_ok() {
230 i += 1;
231 }
232 loom::thread::yield_now();
233 }
234 });
235
236 let mut i = 0;
237 while i < 3 {
238 if let Some(val) = rx.try_recv() {
239 assert_eq!(val, i);
240 i += 1;
241 }
242 loom::thread::yield_now();
243 }
244 })
245 }
246
247 #[test]
248 fn batched_ops_loom() {
249 loom::model(|| {
250 let (mut tx, mut rx) = channel::<usize>(NonZeroUsize::new(2).unwrap());
251 let total = 3;
252
253 thread::spawn(move || {
254 let mut sent = 0;
255 while sent < total {
256 let buf = tx.write_buffer();
257 if !buf.is_empty() {
258 let count = buf.len().min(total - sent);
259 for (i, item) in buf.iter_mut().take(count).enumerate() {
260 item.write(sent + i);
261 }
262 unsafe { tx.commit(count) };
263 sent += count;
264 }
265 loom::thread::yield_now();
266 }
267 });
268
269 let mut received = 0;
270 while received < total {
271 let buf = rx.read_buffer();
272 if !buf.is_empty() {
273 let count = buf.len();
274 for (i, item) in buf.iter().take(count).enumerate() {
275 assert_eq!(*item, received + i);
276 }
277 unsafe { rx.advance(count) };
278 received += count;
279 }
280 loom::thread::yield_now();
281 }
282 })
283 }
284}