1use std::collections::VecDeque;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll, Waker};
12
13struct Inner<T> {
16 queue: VecDeque<T>,
18 capacity: Option<usize>,
20 sender_count: usize,
22 receiver_dropped: bool,
24 recv_waker: Option<Waker>,
26 send_wakers: VecDeque<Waker>,
28}
29
30impl<T> Inner<T> {
31 fn new(capacity: Option<usize>) -> Self {
32 Self {
33 queue: VecDeque::new(),
34 capacity,
35 sender_count: 1,
36 receiver_dropped: false,
37 recv_waker: None,
38 send_wakers: VecDeque::new(),
39 }
40 }
41
42 fn has_capacity(&self) -> bool {
44 match self.capacity {
45 None => true,
46 Some(cap) => self.queue.len() < cap,
47 }
48 }
49
50 fn senders_closed(&self) -> bool {
52 self.sender_count == 0
53 }
54
55 fn is_closed(&self) -> bool {
57 self.sender_count == 0 || self.receiver_dropped
58 }
59}
60
61pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
68 let inner = Arc::new(Mutex::new(Inner::new(Some(capacity.max(1)))));
69 (
70 Sender {
71 inner: inner.clone(),
72 },
73 Receiver { inner },
74 )
75}
76
77pub struct Sender<T> {
81 inner: Arc<Mutex<Inner<T>>>,
82}
83
84impl<T> Clone for Sender<T> {
85 fn clone(&self) -> Self {
86 self.inner.lock().unwrap().sender_count += 1;
87 Self {
88 inner: self.inner.clone(),
89 }
90 }
91}
92
93impl<T> Drop for Sender<T> {
94 fn drop(&mut self) {
95 let mut g = self.inner.lock().unwrap();
96 g.sender_count -= 1;
97 if g.sender_count == 0 {
98 if let Some(w) = g.recv_waker.take() {
100 drop(g);
101 w.wake();
102 }
103 }
104 }
105}
106
107impl<T> Drop for Receiver<T> {
108 fn drop(&mut self) {
109 let mut g = self.inner.lock().unwrap();
110 g.receiver_dropped = true;
111 let wakers: Vec<Waker> = g.send_wakers.drain(..).collect();
113 drop(g);
114 for w in wakers {
115 w.wake();
116 }
117 }
118}
119
120impl<T> Sender<T> {
121 pub fn send(&self, value: T) -> SendFuture<'_, T> {
125 SendFuture {
126 inner: &self.inner,
127 value: Some(value),
128 registered_waker: None,
129 }
130 }
131}
132
133pub struct SendFuture<'a, T> {
135 inner: &'a Arc<Mutex<Inner<T>>>,
136 value: Option<T>,
138 registered_waker: Option<Waker>,
140}
141
142impl<T> Future for SendFuture<'_, T> {
143 type Output = Result<(), T>;
144
145 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 let this = unsafe { self.get_unchecked_mut() };
149 let mut g = this.inner.lock().unwrap();
150 if g.is_closed() {
151 this.registered_waker = None;
152 return Poll::Ready(Err(this.value.take().unwrap()));
153 }
154 if g.has_capacity() {
155 this.registered_waker = None;
156 let val = this.value.take().unwrap();
157 g.queue.push_back(val);
158 if let Some(w) = g.recv_waker.take() {
159 drop(g);
160 w.wake();
161 }
162 Poll::Ready(Ok(()))
163 } else {
164 let new_waker = cx.waker().clone();
165 if let Some(ref existing) = this.registered_waker {
166 if !existing.will_wake(&new_waker) {
167 for w in &mut g.send_wakers {
169 if w.will_wake(existing) {
170 *w = new_waker.clone();
171 break;
172 }
173 }
174 this.registered_waker = Some(new_waker);
175 }
176 } else {
177 g.send_wakers.push_back(new_waker.clone());
178 this.registered_waker = Some(new_waker);
179 }
180 Poll::Pending
181 }
182 }
183}
184
185impl<T> Drop for SendFuture<'_, T> {
186 fn drop(&mut self) {
187 if let Some(ref waker) = self.registered_waker {
188 if let Ok(mut g) = self.inner.lock() {
190 if let Some(pos) = g.send_wakers.iter().position(|w| w.will_wake(waker)) {
191 g.send_wakers.remove(pos);
192 }
193 }
194 }
195 }
196}
197
198pub fn unbounded<T>() -> (UnboundedSender<T>, Receiver<T>) {
204 let inner = Arc::new(Mutex::new(Inner::new(None)));
205 (
206 UnboundedSender {
207 inner: inner.clone(),
208 },
209 Receiver { inner },
210 )
211}
212
213pub struct UnboundedSender<T> {
215 inner: Arc<Mutex<Inner<T>>>,
216}
217
218impl<T> Clone for UnboundedSender<T> {
219 fn clone(&self) -> Self {
220 self.inner.lock().unwrap().sender_count += 1;
221 Self {
222 inner: self.inner.clone(),
223 }
224 }
225}
226
227impl<T> Drop for UnboundedSender<T> {
228 fn drop(&mut self) {
229 let mut g = self.inner.lock().unwrap();
230 g.sender_count -= 1;
231 if g.sender_count == 0 {
232 if let Some(w) = g.recv_waker.take() {
233 drop(g);
234 w.wake();
235 }
236 }
237 }
238}
239
240
241impl<T> UnboundedSender<T> {
242 pub fn send(&self, value: T) -> Result<(), T> {
246 let mut g = self.inner.lock().unwrap();
247 if g.is_closed() {
248 return Err(value);
249 }
250 g.queue.push_back(value);
251 if let Some(w) = g.recv_waker.take() {
252 drop(g);
253 w.wake();
254 }
255 Ok(())
256 }
257}
258
259pub struct Receiver<T> {
263 inner: Arc<Mutex<Inner<T>>>,
264}
265
266impl<T> Receiver<T> {
267 pub fn recv(&mut self) -> RecvFuture<'_, T> {
272 RecvFuture { inner: &self.inner }
273 }
274}
275
276pub struct RecvFuture<'a, T> {
278 inner: &'a Arc<Mutex<Inner<T>>>,
279}
280
281impl<T> Future for RecvFuture<'_, T> {
282 type Output = Option<T>;
283
284 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
285 let mut g = self.inner.lock().unwrap();
286 if let Some(val) = g.queue.pop_front() {
287 if let Some(w) = g.send_wakers.pop_front() {
289 drop(g);
290 w.wake();
291 }
292 Poll::Ready(Some(val))
293 } else if g.senders_closed() {
294 Poll::Ready(None)
295 } else {
296 g.recv_waker = Some(cx.waker().clone());
297 Poll::Pending
298 }
299 }
300}
301
302#[cfg(test)]
305mod tests {
306 use super::*;
307 use crate::executor::{block_on, block_on_with_spawn, spawn};
308
309 #[test]
310 fn bounded_send_recv_basic() {
311 block_on(async {
312 let (tx, mut rx) = channel::<u32>(4);
313 tx.send(1).await.unwrap();
314 tx.send(2).await.unwrap();
315 assert_eq!(rx.recv().await, Some(1));
316 assert_eq!(rx.recv().await, Some(2));
317 });
318 }
319
320 #[test]
321 fn bounded_channel_close_on_sender_drop() {
322 block_on(async {
323 let (tx, mut rx) = channel::<u32>(4);
324 tx.send(42).await.unwrap();
325 drop(tx);
326 assert_eq!(rx.recv().await, Some(42));
327 assert_eq!(rx.recv().await, None);
328 });
329 }
330
331 #[test]
332 fn unbounded_multi_producer() {
333 block_on_with_spawn(async {
334 let (tx1, mut rx) = unbounded::<u32>();
335 let tx2 = tx1.clone();
336 let jh1 = spawn(async move {
337 tx1.send(10).unwrap();
338 });
339 let jh2 = spawn(async move {
340 tx2.send(20).unwrap();
341 });
342 jh1.await.unwrap();
343 jh2.await.unwrap();
344 let mut vals = vec![rx.recv().await.unwrap(), rx.recv().await.unwrap()];
345 vals.sort();
346 assert_eq!(vals, vec![10, 20]);
347 });
348 }
349
350 #[test]
351 fn bounded_backpressure_unblocks_when_consumed() {
352 block_on_with_spawn(async {
353 let (tx, mut rx) = channel::<u32>(1);
354 tx.send(1).await.unwrap();
356 let jh = spawn(async move {
358 tx.send(2).await.unwrap();
359 });
360 assert_eq!(rx.recv().await, Some(1));
361 jh.await.unwrap();
362 assert_eq!(rx.recv().await, Some(2));
363 });
364 }
365
366 #[test]
367 fn unbounded_close_returns_none() {
368 block_on(async {
369 let (tx, mut rx) = unbounded::<i32>();
370 drop(tx);
371 assert_eq!(rx.recv().await, None);
372 });
373 }
374
375 #[test]
376 fn bounded_send_to_closed_receiver_returns_err() {
377 block_on(async {
378 let (tx, rx) = channel::<u32>(4);
379 drop(rx);
380 let result = tx.send(99).await;
382 assert!(result.is_err());
383 assert_eq!(result.unwrap_err(), 99);
384 });
385 }
386
387 #[test]
388 fn unbounded_send_to_closed_receiver_returns_err() {
389 let (tx, rx) = unbounded::<u32>();
390 drop(rx);
391 assert_eq!(tx.send(42), Err(42));
392 }
393
394 #[test]
395 fn bounded_blocked_sender_woken_on_receiver_drop() {
396 block_on_with_spawn(async {
397 let (tx, rx) = channel::<u32>(1);
399 tx.send(1).await.unwrap();
400 let tx2 = tx.clone();
401 let jh = spawn(async move {
402 tx2.send(2).await
404 });
405 drop(tx);
407 drop(rx);
408 let result = jh.await.unwrap();
409 assert!(result.is_err());
410 });
411 }
412
413 #[test]
416 fn bounded_capacity_1_sequential_sends() {
417 block_on_with_spawn(async {
418 let (tx, mut rx) = channel::<u32>(1);
419 for i in 0..5u32 {
420 tx.send(i).await.unwrap();
421 assert_eq!(rx.recv().await, Some(i));
422 }
423 });
424 }
425
426 #[test]
427 fn bounded_clone_increments_sender_count() {
428 block_on(async {
429 let (tx, mut rx) = channel::<u32>(4);
430 let tx2 = tx.clone();
431 tx.send(1).await.unwrap();
432 tx2.send(2).await.unwrap();
433 drop(tx);
434 assert_eq!(rx.recv().await, Some(1));
435 assert_eq!(rx.recv().await, Some(2));
436 drop(tx2);
438 assert_eq!(rx.recv().await, None); });
440 }
441
442 #[test]
443 fn unbounded_stress_100_msgs() {
444 block_on_with_spawn(async {
445 let (tx, mut rx) = unbounded::<u32>();
446 let jh = spawn(async move {
447 for i in 0..100u32 {
448 tx.send(i).unwrap();
449 }
450 });
451 jh.await.unwrap();
452 let mut count = 0u32;
453 while let Some(v) = rx.recv().await {
454 assert_eq!(v, count);
455 count += 1;
456 }
457 assert_eq!(count, 100);
458 });
459 }
460
461 #[test]
462 fn bounded_send_future_drop_cleans_waker() {
463 block_on(async {
464 let (tx, rx) = channel::<u32>(1);
465 tx.send(1).await.unwrap(); let fut = tx.send(2);
468 drop(fut); drop(rx);
470 });
471 }
472
473 #[test]
474 fn bounded_multiple_senders_all_items_received() {
475 block_on_with_spawn(async {
476 let (tx1, mut rx) = channel::<u32>(16);
477 let tx2 = tx1.clone();
478 let tx3 = tx2.clone();
479 let jh1 = spawn(async move {
480 for i in 0..3u32 {
481 tx1.send(i).await.unwrap();
482 }
483 });
484 let jh2 = spawn(async move {
485 for i in 10..13u32 {
486 tx2.send(i).await.unwrap();
487 }
488 });
489 let jh3 = spawn(async move {
490 for i in 20..23u32 {
491 tx3.send(i).await.unwrap();
492 }
493 });
494 jh1.await.unwrap();
495 jh2.await.unwrap();
496 jh3.await.unwrap();
497 let mut vals: Vec<u32> = Vec::new();
499 for _ in 0..9 {
500 if let Some(v) = rx.recv().await {
501 vals.push(v);
502 }
503 }
504 vals.sort();
505 assert_eq!(vals, vec![0, 1, 2, 10, 11, 12, 20, 21, 22]);
506 });
507 }
508
509 #[test]
510 fn unbounded_capacity_is_unlimited() {
511 block_on(async {
512 let (tx, mut rx) = unbounded::<u32>();
513 for i in 0..500u32 {
515 tx.send(i).unwrap();
516 }
517 for i in 0..500u32 {
518 assert_eq!(rx.recv().await, Some(i));
519 }
520 });
521 }
522
523 #[test]
524 fn bounded_receiver_drop_mid_queue() {
525 block_on(async {
526 let (tx, rx) = channel::<u32>(4);
527 tx.send(1).await.unwrap();
528 tx.send(2).await.unwrap();
529 drop(rx); let result = tx.send(3).await;
532 assert!(result.is_err());
533 });
534 }
535
536 #[test]
537 fn bounded_channel_capacity_max_1_enforced() {
538 block_on_with_spawn(async {
539 let (tx, mut rx) = channel::<u32>(1);
540 tx.send(10).await.unwrap();
541 let tx2 = tx.clone();
542 let jh = spawn(async move {
543 tx2.send(20).await.unwrap();
545 });
546 let v = rx.recv().await.unwrap();
548 assert_eq!(v, 10);
549 jh.await.unwrap();
550 let v2 = rx.recv().await.unwrap();
551 assert_eq!(v2, 20);
552 });
553 }
554
555 #[test]
556 fn bounded_channel_string_type() {
557 block_on(async {
558 let (tx, mut rx) = channel::<String>(4);
559 tx.send("hello".to_string()).await.unwrap();
560 tx.send("world".to_string()).await.unwrap();
561 drop(tx);
562 assert_eq!(rx.recv().await, Some("hello".to_string()));
563 assert_eq!(rx.recv().await, Some("world".to_string()));
564 assert_eq!(rx.recv().await, None);
565 });
566 }
567
568 #[test]
569 fn unbounded_clone_sender_count() {
570 block_on(async {
571 let (tx, mut rx) = unbounded::<u32>();
572 let tx2 = tx.clone();
573 let tx3 = tx2.clone();
574 tx.send(1).unwrap();
575 tx2.send(2).unwrap();
576 tx3.send(3).unwrap();
577 drop(tx);
578 drop(tx2);
579 assert_eq!(rx.recv().await, Some(1));
580 assert_eq!(rx.recv().await, Some(2));
581 assert_eq!(rx.recv().await, Some(3));
582 drop(tx3);
584 assert_eq!(rx.recv().await, None);
585 });
586 }
587
588 #[test]
589 fn bounded_capacity_2_allows_2_sends_before_blocking() {
590 block_on_with_spawn(async {
591 let (tx, mut rx) = channel::<u32>(2);
592 tx.send(1).await.unwrap();
594 tx.send(2).await.unwrap();
595 assert_eq!(rx.recv().await, Some(1));
597 assert_eq!(rx.recv().await, Some(2));
598 });
599 }
600
601 #[test]
602 fn unbounded_receiver_close_mid_batch() {
603 block_on(async {
604 let (tx, rx) = unbounded::<u32>();
605 for i in 0..5 {
607 tx.send(i).unwrap();
608 }
609 drop(rx);
610 assert!(tx.send(99).is_err());
612 });
613 }
614
615 #[test]
616 fn bounded_channel_capacity_10_fills_before_block() {
617 block_on_with_spawn(async {
618 let (tx, mut rx) = channel::<u32>(10);
619 for i in 0..10u32 {
621 tx.send(i).await.unwrap();
622 }
623 for i in 0..10u32 {
625 assert_eq!(rx.recv().await, Some(i));
626 }
627 });
628 }
629
630 #[test]
631 fn bounded_single_item_channel_send_recv_alternating() {
632 block_on_with_spawn(async {
633 let (tx, mut rx) = channel::<u32>(1);
634 for i in 0..10u32 {
635 tx.send(i * 2).await.unwrap();
636 let v = rx.recv().await.unwrap();
637 assert_eq!(v, i * 2);
638 }
639 });
640 }
641
642 #[test]
643 fn unbounded_send_err_value_preserves_original() {
644 let (tx, rx) = unbounded::<String>();
645 drop(rx);
646 let original = "test_value".to_string();
647 let result = tx.send(original.clone());
648 assert_eq!(result, Err(original));
649 }
650
651 #[test]
652 fn bounded_send_err_value_preserves_original() {
653 block_on(async {
654 let (tx, rx) = channel::<String>(4);
655 drop(rx);
656 let original = "test".to_string();
657 let result = tx.send(original.clone()).await;
658 assert_eq!(result, Err(original));
659 });
660 }
661
662 #[test]
663 fn bounded_three_senders_one_receiver_pipelining() {
664 block_on_with_spawn(async {
665 let (tx, mut rx) = channel::<u32>(3);
666 let tx2 = tx.clone();
667 let tx3 = tx.clone();
668 tx.send(100).await.unwrap();
670 tx2.send(200).await.unwrap();
671 tx3.send(300).await.unwrap();
672 let mut results = vec![
673 rx.recv().await.unwrap(),
674 rx.recv().await.unwrap(),
675 rx.recv().await.unwrap(),
676 ];
677 results.sort();
678 assert_eq!(results, vec![100, 200, 300]);
679 });
680 }
681
682 #[test]
683 fn bounded_channel_preserves_ordering() {
684 block_on(async {
685 let (tx, mut rx) = channel::<u32>(5);
686 for i in 0..5u32 {
687 tx.send(i * 10).await.unwrap();
688 }
689 for i in 0..5u32 {
690 assert_eq!(rx.recv().await, Some(i * 10));
691 }
692 });
693 }
694
695 #[test]
696 fn unbounded_immediately_closed_channel() {
697 block_on(async {
698 let (tx, rx) = unbounded::<u32>();
699 drop(tx);
700 drop(rx);
701 });
703 }
704
705 #[test]
706 fn bounded_immediately_closed_channel() {
707 block_on(async {
708 let (tx, rx) = channel::<u32>(1);
709 drop(tx);
710 drop(rx);
711 });
713 }
714
715 #[test]
716 fn unbounded_send_option_type() {
717 block_on(async {
718 let (tx, mut rx) = unbounded::<Option<u32>>();
719 tx.send(Some(42)).unwrap();
720 tx.send(None).unwrap();
721 drop(tx);
722 assert_eq!(rx.recv().await, Some(Some(42)));
723 assert_eq!(rx.recv().await, Some(None));
724 assert_eq!(rx.recv().await, None);
725 });
726 }
727}