Skip to main content

moduvex_runtime/sync/
mpsc.rs

1//! Multi-producer single-consumer (MPSC) channel.
2//!
3//! Provides both bounded and unbounded variants. Senders are `Clone`; the
4//! Receiver is unique. Dropping all Senders closes the channel so `recv`
5//! returns `None`.
6
7use std::collections::VecDeque;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll, Waker};
12
13// ── Shared inner state ────────────────────────────────────────────────────────
14
15struct Inner<T> {
16    /// Buffered values awaiting consumption.
17    queue: VecDeque<T>,
18    /// Maximum number of items allowed when bounded (`None` = unbounded).
19    capacity: Option<usize>,
20    /// Number of live `Sender` handles (including `UnboundedSender`).
21    sender_count: usize,
22    /// Set to `true` when the `Receiver` is dropped.
23    receiver_dropped: bool,
24    /// Waker for the blocked receiver (empty queue).
25    recv_waker: Option<Waker>,
26    /// Wakers for blocked senders (full bounded queue).
27    send_wakers: VecDeque<Waker>,
28}
29
30impl<T> Inner<T> {
31    fn new(capacity: Option<usize>) -> Self {
32        Self {
33            queue: VecDeque::new(),
34            capacity,
35            sender_count: 1,
36            receiver_dropped: false,
37            recv_waker: None,
38            send_wakers: VecDeque::new(),
39        }
40    }
41
42    /// True when the channel has room (or is unbounded).
43    fn has_capacity(&self) -> bool {
44        match self.capacity {
45            None => true,
46            Some(cap) => self.queue.len() < cap,
47        }
48    }
49
50    /// True when all senders have been dropped.
51    fn senders_closed(&self) -> bool {
52        self.sender_count == 0
53    }
54
55    /// True when the channel is closed from either direction.
56    fn is_closed(&self) -> bool {
57        self.sender_count == 0 || self.receiver_dropped
58    }
59}
60
61// ── Bounded channel ───────────────────────────────────────────────────────────
62
63/// Create a bounded MPSC channel with the given `capacity`.
64///
65/// `Sender::send` will suspend if the buffer is full; it resumes once the
66/// receiver has consumed an item.
67pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
68    let inner = Arc::new(Mutex::new(Inner::new(Some(capacity.max(1)))));
69    (
70        Sender {
71            inner: inner.clone(),
72        },
73        Receiver { inner },
74    )
75}
76
77/// Sending half of a bounded MPSC channel.
78///
79/// Cheap to clone; each clone shares the same channel.
80pub struct Sender<T> {
81    inner: Arc<Mutex<Inner<T>>>,
82}
83
84impl<T> Clone for Sender<T> {
85    fn clone(&self) -> Self {
86        self.inner.lock().unwrap().sender_count += 1;
87        Self {
88            inner: self.inner.clone(),
89        }
90    }
91}
92
93impl<T> Drop for Sender<T> {
94    fn drop(&mut self) {
95        let mut g = self.inner.lock().unwrap();
96        g.sender_count -= 1;
97        if g.sender_count == 0 {
98            // Channel closed — wake the receiver so it can return `None`.
99            if let Some(w) = g.recv_waker.take() {
100                drop(g);
101                w.wake();
102            }
103        }
104    }
105}
106
107impl<T> Drop for Receiver<T> {
108    fn drop(&mut self) {
109        let mut g = self.inner.lock().unwrap();
110        g.receiver_dropped = true;
111        // Wake all blocked senders so they can observe the closed channel.
112        let wakers: Vec<Waker> = g.send_wakers.drain(..).collect();
113        drop(g);
114        for w in wakers {
115            w.wake();
116        }
117    }
118}
119
120impl<T> Sender<T> {
121    /// Send `value` through the channel, waiting if the buffer is full.
122    ///
123    /// Returns `Err(value)` if the receiver has been dropped.
124    pub fn send(&self, value: T) -> SendFuture<'_, T> {
125        SendFuture {
126            inner: &self.inner,
127            value: Some(value),
128            registered_waker: None,
129        }
130    }
131}
132
133/// Future returned by [`Sender::send`].
134pub struct SendFuture<'a, T> {
135    inner: &'a Arc<Mutex<Inner<T>>>,
136    /// `None` after the value has been deposited.
137    value: Option<T>,
138    /// Waker we registered in `send_wakers`, stored for Drop cleanup.
139    registered_waker: Option<Waker>,
140}
141
142impl<T> Future for SendFuture<'_, T> {
143    type Output = Result<(), T>;
144
145    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146        // SAFETY: `SendFuture` contains only a shared reference, `Option<T>`,
147        // and `Option<Waker>`. No structural pinning on T; safe to get &mut Self.
148        let this = unsafe { self.get_unchecked_mut() };
149        let mut g = this.inner.lock().unwrap();
150        if g.is_closed() {
151            this.registered_waker = None;
152            return Poll::Ready(Err(this.value.take().unwrap()));
153        }
154        if g.has_capacity() {
155            this.registered_waker = None;
156            let val = this.value.take().unwrap();
157            g.queue.push_back(val);
158            if let Some(w) = g.recv_waker.take() {
159                drop(g);
160                w.wake();
161            }
162            Poll::Ready(Ok(()))
163        } else {
164            let new_waker = cx.waker().clone();
165            if let Some(ref existing) = this.registered_waker {
166                if !existing.will_wake(&new_waker) {
167                    // Replace our stale waker in send_wakers.
168                    for w in &mut g.send_wakers {
169                        if w.will_wake(existing) {
170                            *w = new_waker.clone();
171                            break;
172                        }
173                    }
174                    this.registered_waker = Some(new_waker);
175                }
176            } else {
177                g.send_wakers.push_back(new_waker.clone());
178                this.registered_waker = Some(new_waker);
179            }
180            Poll::Pending
181        }
182    }
183}
184
185impl<T> Drop for SendFuture<'_, T> {
186    fn drop(&mut self) {
187        if let Some(ref waker) = self.registered_waker {
188            // Remove our waker from send_wakers to prevent orphaned wake-ups.
189            if let Ok(mut g) = self.inner.lock() {
190                if let Some(pos) = g.send_wakers.iter().position(|w| w.will_wake(waker)) {
191                    g.send_wakers.remove(pos);
192                }
193            }
194        }
195    }
196}
197
198// ── Unbounded channel ─────────────────────────────────────────────────────────
199
200/// Create an unbounded MPSC channel.
201///
202/// Sends never block; the buffer grows as needed.
203pub fn unbounded<T>() -> (UnboundedSender<T>, Receiver<T>) {
204    let inner = Arc::new(Mutex::new(Inner::new(None)));
205    (
206        UnboundedSender {
207            inner: inner.clone(),
208        },
209        Receiver { inner },
210    )
211}
212
213/// Sending half of an unbounded MPSC channel.
214pub struct UnboundedSender<T> {
215    inner: Arc<Mutex<Inner<T>>>,
216}
217
218impl<T> Clone for UnboundedSender<T> {
219    fn clone(&self) -> Self {
220        self.inner.lock().unwrap().sender_count += 1;
221        Self {
222            inner: self.inner.clone(),
223        }
224    }
225}
226
227impl<T> Drop for UnboundedSender<T> {
228    fn drop(&mut self) {
229        let mut g = self.inner.lock().unwrap();
230        g.sender_count -= 1;
231        if g.sender_count == 0 {
232            if let Some(w) = g.recv_waker.take() {
233                drop(g);
234                w.wake();
235            }
236        }
237    }
238}
239
240
241impl<T> UnboundedSender<T> {
242    /// Send `value` immediately (never suspends).
243    ///
244    /// Returns `Err(value)` if the receiver has been dropped.
245    pub fn send(&self, value: T) -> Result<(), T> {
246        let mut g = self.inner.lock().unwrap();
247        if g.is_closed() {
248            return Err(value);
249        }
250        g.queue.push_back(value);
251        if let Some(w) = g.recv_waker.take() {
252            drop(g);
253            w.wake();
254        }
255        Ok(())
256    }
257}
258
259// ── Receiver ──────────────────────────────────────────────────────────────────
260
261/// Receiving half of either channel variant. Not `Clone`.
262pub struct Receiver<T> {
263    inner: Arc<Mutex<Inner<T>>>,
264}
265
266impl<T> Receiver<T> {
267    /// Receive the next value, waiting if the buffer is empty.
268    ///
269    /// Returns `None` when the channel is empty and all senders have been
270    /// dropped.
271    pub fn recv(&mut self) -> RecvFuture<'_, T> {
272        RecvFuture { inner: &self.inner }
273    }
274}
275
276/// Future returned by [`Receiver::recv`].
277pub struct RecvFuture<'a, T> {
278    inner: &'a Arc<Mutex<Inner<T>>>,
279}
280
281impl<T> Future for RecvFuture<'_, T> {
282    type Output = Option<T>;
283
284    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
285        let mut g = self.inner.lock().unwrap();
286        if let Some(val) = g.queue.pop_front() {
287            // Wake one blocked sender (bounded channel backpressure).
288            if let Some(w) = g.send_wakers.pop_front() {
289                drop(g);
290                w.wake();
291            }
292            Poll::Ready(Some(val))
293        } else if g.senders_closed() {
294            Poll::Ready(None)
295        } else {
296            g.recv_waker = Some(cx.waker().clone());
297            Poll::Pending
298        }
299    }
300}
301
302// ── Tests ─────────────────────────────────────────────────────────────────────
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use crate::executor::{block_on, block_on_with_spawn, spawn};
308
309    #[test]
310    fn bounded_send_recv_basic() {
311        block_on(async {
312            let (tx, mut rx) = channel::<u32>(4);
313            tx.send(1).await.unwrap();
314            tx.send(2).await.unwrap();
315            assert_eq!(rx.recv().await, Some(1));
316            assert_eq!(rx.recv().await, Some(2));
317        });
318    }
319
320    #[test]
321    fn bounded_channel_close_on_sender_drop() {
322        block_on(async {
323            let (tx, mut rx) = channel::<u32>(4);
324            tx.send(42).await.unwrap();
325            drop(tx);
326            assert_eq!(rx.recv().await, Some(42));
327            assert_eq!(rx.recv().await, None);
328        });
329    }
330
331    #[test]
332    fn unbounded_multi_producer() {
333        block_on_with_spawn(async {
334            let (tx1, mut rx) = unbounded::<u32>();
335            let tx2 = tx1.clone();
336            let jh1 = spawn(async move {
337                tx1.send(10).unwrap();
338            });
339            let jh2 = spawn(async move {
340                tx2.send(20).unwrap();
341            });
342            jh1.await.unwrap();
343            jh2.await.unwrap();
344            let mut vals = vec![rx.recv().await.unwrap(), rx.recv().await.unwrap()];
345            vals.sort();
346            assert_eq!(vals, vec![10, 20]);
347        });
348    }
349
350    #[test]
351    fn bounded_backpressure_unblocks_when_consumed() {
352        block_on_with_spawn(async {
353            let (tx, mut rx) = channel::<u32>(1);
354            // Fill the channel
355            tx.send(1).await.unwrap();
356            // Spawn a producer that will block until we consume
357            let jh = spawn(async move {
358                tx.send(2).await.unwrap();
359            });
360            assert_eq!(rx.recv().await, Some(1));
361            jh.await.unwrap();
362            assert_eq!(rx.recv().await, Some(2));
363        });
364    }
365
366    #[test]
367    fn unbounded_close_returns_none() {
368        block_on(async {
369            let (tx, mut rx) = unbounded::<i32>();
370            drop(tx);
371            assert_eq!(rx.recv().await, None);
372        });
373    }
374
375    #[test]
376    fn bounded_send_to_closed_receiver_returns_err() {
377        block_on(async {
378            let (tx, rx) = channel::<u32>(4);
379            drop(rx);
380            // Receiver dropped — sender must get Err immediately.
381            let result = tx.send(99).await;
382            assert!(result.is_err());
383            assert_eq!(result.unwrap_err(), 99);
384        });
385    }
386
387    #[test]
388    fn unbounded_send_to_closed_receiver_returns_err() {
389        let (tx, rx) = unbounded::<u32>();
390        drop(rx);
391        assert_eq!(tx.send(42), Err(42));
392    }
393
394    #[test]
395    fn bounded_blocked_sender_woken_on_receiver_drop() {
396        block_on_with_spawn(async {
397            // Channel capacity = 1, fill it, spawn a sender that will block
398            let (tx, rx) = channel::<u32>(1);
399            tx.send(1).await.unwrap();
400            let tx2 = tx.clone();
401            let jh = spawn(async move {
402                // This send blocks because buffer is full; receiver drop should wake it.
403                tx2.send(2).await
404            });
405            // Drop receiver — should unblock the sender with Err
406            drop(tx);
407            drop(rx);
408            let result = jh.await.unwrap();
409            assert!(result.is_err());
410        });
411    }
412
413    // ── Additional MPSC tests ──────────────────────────────────────────────
414
415    #[test]
416    fn bounded_capacity_1_sequential_sends() {
417        block_on_with_spawn(async {
418            let (tx, mut rx) = channel::<u32>(1);
419            for i in 0..5u32 {
420                tx.send(i).await.unwrap();
421                assert_eq!(rx.recv().await, Some(i));
422            }
423        });
424    }
425
426    #[test]
427    fn bounded_clone_increments_sender_count() {
428        block_on(async {
429            let (tx, mut rx) = channel::<u32>(4);
430            let tx2 = tx.clone();
431            tx.send(1).await.unwrap();
432            tx2.send(2).await.unwrap();
433            drop(tx);
434            assert_eq!(rx.recv().await, Some(1));
435            assert_eq!(rx.recv().await, Some(2));
436            // tx2 still alive — channel not yet closed
437            drop(tx2);
438            assert_eq!(rx.recv().await, None); // now closed
439        });
440    }
441
442    #[test]
443    fn unbounded_stress_100_msgs() {
444        block_on_with_spawn(async {
445            let (tx, mut rx) = unbounded::<u32>();
446            let jh = spawn(async move {
447                for i in 0..100u32 {
448                    tx.send(i).unwrap();
449                }
450            });
451            jh.await.unwrap();
452            let mut count = 0u32;
453            while let Some(v) = rx.recv().await {
454                assert_eq!(v, count);
455                count += 1;
456            }
457            assert_eq!(count, 100);
458        });
459    }
460
461    #[test]
462    fn bounded_send_future_drop_cleans_waker() {
463        block_on(async {
464            let (tx, rx) = channel::<u32>(1);
465            tx.send(1).await.unwrap(); // fill
466            // Create send future but drop it without polling
467            let fut = tx.send(2);
468            drop(fut); // must not panic or corrupt waker list
469            drop(rx);
470        });
471    }
472
473    #[test]
474    fn bounded_multiple_senders_all_items_received() {
475        block_on_with_spawn(async {
476            let (tx1, mut rx) = channel::<u32>(16);
477            let tx2 = tx1.clone();
478            let tx3 = tx2.clone();
479            let jh1 = spawn(async move {
480                for i in 0..3u32 {
481                    tx1.send(i).await.unwrap();
482                }
483            });
484            let jh2 = spawn(async move {
485                for i in 10..13u32 {
486                    tx2.send(i).await.unwrap();
487                }
488            });
489            let jh3 = spawn(async move {
490                for i in 20..23u32 {
491                    tx3.send(i).await.unwrap();
492                }
493            });
494            jh1.await.unwrap();
495            jh2.await.unwrap();
496            jh3.await.unwrap();
497            // Collect exactly 9 items (3 per sender × 3 senders)
498            let mut vals: Vec<u32> = Vec::new();
499            for _ in 0..9 {
500                if let Some(v) = rx.recv().await {
501                    vals.push(v);
502                }
503            }
504            vals.sort();
505            assert_eq!(vals, vec![0, 1, 2, 10, 11, 12, 20, 21, 22]);
506        });
507    }
508
509    #[test]
510    fn unbounded_capacity_is_unlimited() {
511        block_on(async {
512            let (tx, mut rx) = unbounded::<u32>();
513            // Send 500 items without consuming — should never block
514            for i in 0..500u32 {
515                tx.send(i).unwrap();
516            }
517            for i in 0..500u32 {
518                assert_eq!(rx.recv().await, Some(i));
519            }
520        });
521    }
522
523    #[test]
524    fn bounded_receiver_drop_mid_queue() {
525        block_on(async {
526            let (tx, rx) = channel::<u32>(4);
527            tx.send(1).await.unwrap();
528            tx.send(2).await.unwrap();
529            drop(rx); // items queued but receiver dropped
530            // Subsequent send must return Err immediately
531            let result = tx.send(3).await;
532            assert!(result.is_err());
533        });
534    }
535
536    #[test]
537    fn bounded_channel_capacity_max_1_enforced() {
538        block_on_with_spawn(async {
539            let (tx, mut rx) = channel::<u32>(1);
540            tx.send(10).await.unwrap();
541            let tx2 = tx.clone();
542            let jh = spawn(async move {
543                // This should block until rx consumes the first item
544                tx2.send(20).await.unwrap();
545            });
546            // Consume first
547            let v = rx.recv().await.unwrap();
548            assert_eq!(v, 10);
549            jh.await.unwrap();
550            let v2 = rx.recv().await.unwrap();
551            assert_eq!(v2, 20);
552        });
553    }
554
555    #[test]
556    fn bounded_channel_string_type() {
557        block_on(async {
558            let (tx, mut rx) = channel::<String>(4);
559            tx.send("hello".to_string()).await.unwrap();
560            tx.send("world".to_string()).await.unwrap();
561            drop(tx);
562            assert_eq!(rx.recv().await, Some("hello".to_string()));
563            assert_eq!(rx.recv().await, Some("world".to_string()));
564            assert_eq!(rx.recv().await, None);
565        });
566    }
567
568    #[test]
569    fn unbounded_clone_sender_count() {
570        block_on(async {
571            let (tx, mut rx) = unbounded::<u32>();
572            let tx2 = tx.clone();
573            let tx3 = tx2.clone();
574            tx.send(1).unwrap();
575            tx2.send(2).unwrap();
576            tx3.send(3).unwrap();
577            drop(tx);
578            drop(tx2);
579            assert_eq!(rx.recv().await, Some(1));
580            assert_eq!(rx.recv().await, Some(2));
581            assert_eq!(rx.recv().await, Some(3));
582            // tx3 still alive
583            drop(tx3);
584            assert_eq!(rx.recv().await, None);
585        });
586    }
587
588    #[test]
589    fn bounded_capacity_2_allows_2_sends_before_blocking() {
590        block_on_with_spawn(async {
591            let (tx, mut rx) = channel::<u32>(2);
592            // Both of these should succeed immediately without blocking
593            tx.send(1).await.unwrap();
594            tx.send(2).await.unwrap();
595            // These should be buffered
596            assert_eq!(rx.recv().await, Some(1));
597            assert_eq!(rx.recv().await, Some(2));
598        });
599    }
600
601    #[test]
602    fn unbounded_receiver_close_mid_batch() {
603        block_on(async {
604            let (tx, rx) = unbounded::<u32>();
605            // Send several items
606            for i in 0..5 {
607                tx.send(i).unwrap();
608            }
609            drop(rx);
610            // Subsequent sends must fail
611            assert!(tx.send(99).is_err());
612        });
613    }
614
615    #[test]
616    fn bounded_channel_capacity_10_fills_before_block() {
617        block_on_with_spawn(async {
618            let (tx, mut rx) = channel::<u32>(10);
619            // Fill all 10 slots
620            for i in 0..10u32 {
621                tx.send(i).await.unwrap();
622            }
623            // Drain all
624            for i in 0..10u32 {
625                assert_eq!(rx.recv().await, Some(i));
626            }
627        });
628    }
629
630    #[test]
631    fn bounded_single_item_channel_send_recv_alternating() {
632        block_on_with_spawn(async {
633            let (tx, mut rx) = channel::<u32>(1);
634            for i in 0..10u32 {
635                tx.send(i * 2).await.unwrap();
636                let v = rx.recv().await.unwrap();
637                assert_eq!(v, i * 2);
638            }
639        });
640    }
641
642    #[test]
643    fn unbounded_send_err_value_preserves_original() {
644        let (tx, rx) = unbounded::<String>();
645        drop(rx);
646        let original = "test_value".to_string();
647        let result = tx.send(original.clone());
648        assert_eq!(result, Err(original));
649    }
650
651    #[test]
652    fn bounded_send_err_value_preserves_original() {
653        block_on(async {
654            let (tx, rx) = channel::<String>(4);
655            drop(rx);
656            let original = "test".to_string();
657            let result = tx.send(original.clone()).await;
658            assert_eq!(result, Err(original));
659        });
660    }
661
662    #[test]
663    fn bounded_three_senders_one_receiver_pipelining() {
664        block_on_with_spawn(async {
665            let (tx, mut rx) = channel::<u32>(3);
666            let tx2 = tx.clone();
667            let tx3 = tx.clone();
668            // Send 1 each from 3 senders (within capacity)
669            tx.send(100).await.unwrap();
670            tx2.send(200).await.unwrap();
671            tx3.send(300).await.unwrap();
672            let mut results = vec![
673                rx.recv().await.unwrap(),
674                rx.recv().await.unwrap(),
675                rx.recv().await.unwrap(),
676            ];
677            results.sort();
678            assert_eq!(results, vec![100, 200, 300]);
679        });
680    }
681
682    #[test]
683    fn bounded_channel_preserves_ordering() {
684        block_on(async {
685            let (tx, mut rx) = channel::<u32>(5);
686            for i in 0..5u32 {
687                tx.send(i * 10).await.unwrap();
688            }
689            for i in 0..5u32 {
690                assert_eq!(rx.recv().await, Some(i * 10));
691            }
692        });
693    }
694
695    #[test]
696    fn unbounded_immediately_closed_channel() {
697        block_on(async {
698            let (tx, rx) = unbounded::<u32>();
699            drop(tx);
700            drop(rx);
701            // Just verifies no panic on immediate close
702        });
703    }
704
705    #[test]
706    fn bounded_immediately_closed_channel() {
707        block_on(async {
708            let (tx, rx) = channel::<u32>(1);
709            drop(tx);
710            drop(rx);
711            // Just verifies no panic on immediate close
712        });
713    }
714
715    #[test]
716    fn unbounded_send_option_type() {
717        block_on(async {
718            let (tx, mut rx) = unbounded::<Option<u32>>();
719            tx.send(Some(42)).unwrap();
720            tx.send(None).unwrap();
721            drop(tx);
722            assert_eq!(rx.recv().await, Some(Some(42)));
723            assert_eq!(rx.recv().await, Some(None));
724            assert_eq!(rx.recv().await, None);
725        });
726    }
727}