Skip to main content

commonware_actor/
mailbox.rs

1//! Bounded message queue with caller-managed overflow.
2//!
3//! # Architecture
4//!
5//! The mailbox is split into two queues: a bounded `ready` queue
6//! that producers push to and the receiver pops from, and an unbounded
7//! `overflow` queue that holds messages displaced when ready is full. A
8//! [`Policy`] or [`UnreliablePolicy`] decides how overflow is updated when
9//! overflow is contended.
10//!
11//! ```text
12//!                          senders
13//!                             |
14//!         +-------------------+--------------------+
15//!         | overflow inactive                      | overflow active
16//!         | and ready has room                     | or ready full
17//!         v                                        v
18//!     +----------+    refill front-to-back     +----------+
19//!     |  ready   |<----------------------------| overflow |
20//!     +----------+    after each ready pop     +----------+
21//!         |
22//!         | pop first
23//!         v
24//!      receiver
25//! ```
26//!
27//! The receiver always pops from the ready queue first. After each ready pop, it
28//! eagerly refills ready from published overflow so senders can return to the
29//! ready fast path without waiting for ready to drain completely. Overflow is
30//! refilled from front to back, but policies decide which overflow messages are
31//! retained and in what order.
32//!
33//! Overflow should be rare. When overflow is populated, the receiver refills
34//! ready immediately instead of waiting to batch refill work. This can take the
35//! overflow lock once per popped message, but it keeps ready capacity available
36//! for later sends as soon as possible.
37//!
38//! # Ordering
39//!
40//! Enqueue calls from the same sender will be delivered in order. Concurrent enqueue calls,
41//! however, are not globally ordered and may be observed in any interleaving.
42
43use crate::{Feedback, Unreliable};
44use commonware_runtime::{
45    telemetry::metrics::{Counter, MetricsExt as _},
46    Metrics,
47};
48use std::{
49    collections::VecDeque,
50    fmt,
51    future::poll_fn,
52    marker::PhantomData,
53    num::NonZeroUsize,
54    sync::mpsc::TryRecvError,
55    task::{Context, Poll},
56};
57
58/// Retained overflow messages for a mailbox policy.
59pub trait Overflow<T>: Default {
60    /// Return whether the retained message set is empty.
61    fn is_empty(&self) -> bool;
62
63    /// Drain retained messages into `push` in delivery order until `push`
64    /// rejects a message.
65    ///
66    /// If `push` returns `Some`, the undelivered message and any later messages
67    /// must remain retained for a future drain.
68    fn drain<F>(&mut self, push: F)
69    where
70        F: FnMut(T) -> Option<T>;
71}
72
73impl<T> Overflow<T> for VecDeque<T> {
74    fn is_empty(&self) -> bool {
75        self.is_empty()
76    }
77
78    fn drain<F>(&mut self, mut push: F)
79    where
80        F: FnMut(T) -> Option<T>,
81    {
82        while let Some(message) = self.pop_front() {
83            if let Some(message) = push(message) {
84                self.push_front(message);
85                break;
86            }
87        }
88    }
89}
90
91/// Overflow behavior for actor messages when an inbox is full.
92pub trait Policy: Sized {
93    /// Overflow storage used by this policy.
94    type Overflow: Overflow<Self>;
95
96    /// Reliably handle `message` when it cannot enter the bounded ready queue immediately.
97    ///
98    /// This may retain the message, coalesce it with retained work, replace older retained work,
99    /// or deliberately do no work because the message is already satisfied, superseded, or no
100    /// longer needed (for example, a request whose response channel is already closed).
101    ///
102    /// # Warning
103    ///
104    /// Do not enqueue into the same mailbox from this method or from destructors triggered by
105    /// editing `overflow`. This method runs while the mailbox holds its overflow lock, so same
106    /// mailbox re-entry can deadlock.
107    ///
108    /// This method should not unwind after mutating `overflow`. A panic, including one from a
109    /// destructor triggered while editing `overflow`, can leave retained overflow data stranded in
110    /// the mailbox.
111    fn handle(overflow: &mut Self::Overflow, message: Self);
112}
113
114/// Overflow behavior for actor messages that can be rejected when an inbox is full.
115pub trait UnreliablePolicy: Sized {
116    /// Overflow storage used by this policy.
117    type Overflow: Overflow<Self>;
118
119    /// Unreliably handle `message` when it cannot enter the bounded ready queue immediately.
120    ///
121    /// Returns `true` when the policy considered the message's effects. This includes retaining
122    /// the message, coalescing it with retained work, replacing older retained work, or deliberately
123    /// doing no work because the message is already satisfied, superseded, or no longer needed.
124    ///
125    /// Returns `false` only when the policy rejects the message under backpressure without
126    /// retaining, coalescing, replacing, or otherwise handling it. This is the unreliable case: the
127    /// submitted work was not semantically handled, and callers that care should retry or treat the
128    /// submission as failed.
129    ///
130    /// # Warning
131    ///
132    /// Do not enqueue into the same mailbox from this method or from destructors triggered by
133    /// editing `overflow`. This method runs while the mailbox holds its overflow lock, so same
134    /// mailbox re-entry can deadlock.
135    ///
136    /// This method should not unwind after mutating `overflow`. A panic, including one from a
137    /// destructor triggered while editing `overflow`, can leave retained overflow data stranded in
138    /// the mailbox.
139    fn handle(overflow: &mut Self::Overflow, message: Self) -> bool;
140}
141
142// Marker types that select the mailbox overflow policy.
143mod mode {
144    /// Uses a policy that always handles overflow messages.
145    pub(super) struct Reliable;
146
147    /// Uses a policy that may reject overflow messages.
148    pub(super) struct Unreliable;
149}
150
151trait Mode<T>: Sized {
152    /// Overflow storage used by this mode.
153    type Overflow: Overflow<T>;
154    /// Feedback returned from enqueue attempts.
155    type Feedback;
156
157    /// Updates overflow for a full inbox and reports whether the message was handled.
158    fn handle(overflow: &mut Self::Overflow, message: T) -> bool;
159    /// Maps ready-path feedback into this mode's feedback type.
160    fn ready_feedback(feedback: Feedback) -> Self::Feedback;
161    /// Maps overflow handling into this mode's feedback type.
162    fn overflow_feedback(handled: bool) -> Self::Feedback;
163    /// Returns `true` when this feedback should count as backoff.
164    fn is_backoff(feedback: &Self::Feedback) -> bool;
165    /// Returns `true` when this feedback means the receiver is closed.
166    fn is_closed(feedback: &Self::Feedback) -> bool;
167}
168
169impl<T: Policy> Mode<T> for mode::Reliable {
170    type Overflow = T::Overflow;
171    type Feedback = Feedback;
172
173    fn handle(overflow: &mut Self::Overflow, message: T) -> bool {
174        T::handle(overflow, message);
175        true
176    }
177
178    fn ready_feedback(feedback: Feedback) -> Self::Feedback {
179        feedback
180    }
181
182    fn overflow_feedback(_handled: bool) -> Self::Feedback {
183        Feedback::Backoff
184    }
185
186    fn is_backoff(feedback: &Self::Feedback) -> bool {
187        *feedback == Feedback::Backoff
188    }
189
190    fn is_closed(feedback: &Self::Feedback) -> bool {
191        *feedback == Feedback::Closed
192    }
193}
194
195impl<T: UnreliablePolicy> Mode<T> for mode::Unreliable {
196    type Overflow = T::Overflow;
197    type Feedback = Unreliable<Feedback>;
198
199    fn handle(overflow: &mut Self::Overflow, message: T) -> bool {
200        T::handle(overflow, message)
201    }
202
203    fn ready_feedback(feedback: Feedback) -> Self::Feedback {
204        Unreliable::new(feedback)
205    }
206
207    fn overflow_feedback(handled: bool) -> Self::Feedback {
208        if handled {
209            Unreliable::new(Feedback::Backoff)
210        } else {
211            Unreliable::Rejected
212        }
213    }
214
215    fn is_backoff(feedback: &Self::Feedback) -> bool {
216        *feedback == Unreliable::new(Feedback::Backoff)
217    }
218
219    fn is_closed(feedback: &Self::Feedback) -> bool {
220        *feedback == Unreliable::new(Feedback::Closed)
221    }
222}
223
224/// Sender half of a mailbox.
225pub struct Sender<T: Policy> {
226    state: Arc<State<T, mode::Reliable>>,
227}
228
229/// Sender half of an unreliable mailbox.
230pub struct UnreliableSender<T: UnreliablePolicy> {
231    state: Arc<State<T, mode::Unreliable>>,
232}
233
234impl<T: Policy> Clone for Sender<T> {
235    fn clone(&self) -> Self {
236        Self {
237            state: clone_sender_state(&self.state),
238        }
239    }
240}
241
242impl<T: UnreliablePolicy> Clone for UnreliableSender<T> {
243    fn clone(&self) -> Self {
244        Self {
245            state: clone_sender_state(&self.state),
246        }
247    }
248}
249
250impl<T: Policy> Drop for Sender<T> {
251    fn drop(&mut self) {
252        drop_sender_state(&self.state);
253    }
254}
255
256impl<T: UnreliablePolicy> Drop for UnreliableSender<T> {
257    fn drop(&mut self) {
258        drop_sender_state(&self.state);
259    }
260}
261
262impl<T: Policy> fmt::Debug for Sender<T> {
263    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264        fmt_sender_state("Sender", &self.state, f)
265    }
266}
267
268impl<T: UnreliablePolicy> fmt::Debug for UnreliableSender<T> {
269    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270        fmt_sender_state("UnreliableSender", &self.state, f)
271    }
272}
273
274impl<T: Policy> Sender<T> {
275    /// Submit a message without waiting for inbox capacity.
276    #[must_use = "caller must handle enqueue feedback"]
277    pub fn enqueue(&self, message: T) -> Feedback {
278        self.state.enqueue(message)
279    }
280}
281
282impl<T: UnreliablePolicy> UnreliableSender<T> {
283    /// Submit a message without waiting for inbox capacity, allowing policy rejection.
284    #[must_use = "caller must handle enqueue feedback"]
285    pub fn enqueue(&self, message: T) -> Unreliable<Feedback> {
286        self.state.enqueue(message)
287    }
288}
289
290/// Receiver half of a mailbox.
291///
292/// Dropping the receiver closes the mailbox and drains buffered messages.
293///
294/// Dropping the last sender disconnects the mailbox, but the receiver continues
295/// returning buffered messages until ready and overflow are empty.
296pub struct Receiver<T: Policy> {
297    state: Arc<State<T, mode::Reliable>>,
298}
299
300/// Receiver half of an unreliable mailbox.
301///
302/// Dropping the receiver closes the mailbox and drains buffered messages.
303///
304/// Dropping the last sender disconnects the mailbox, but the receiver continues
305/// returning buffered messages until ready and overflow are empty.
306pub struct UnreliableReceiver<T: UnreliablePolicy> {
307    state: Arc<State<T, mode::Unreliable>>,
308}
309
310impl<T: Policy> Receiver<T> {
311    /// Receive the next message.
312    ///
313    /// Returns `None` after all senders are dropped and all buffered messages
314    /// have been drained.
315    pub async fn recv(&mut self) -> Option<T> {
316        recv_from(&self.state).await
317    }
318
319    /// Try to receive the next message without waiting.
320    ///
321    /// Returns [`TryRecvError::Disconnected`] after all senders are dropped and
322    /// all buffered messages have been drained.
323    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
324        try_recv_from(&self.state)
325    }
326}
327
328impl<T: UnreliablePolicy> UnreliableReceiver<T> {
329    /// Receive the next message.
330    ///
331    /// Returns `None` after all senders are dropped and all buffered messages
332    /// have been drained.
333    pub async fn recv(&mut self) -> Option<T> {
334        recv_from(&self.state).await
335    }
336
337    /// Try to receive the next message without waiting.
338    ///
339    /// Returns [`TryRecvError::Disconnected`] after all senders are dropped and
340    /// all buffered messages have been drained.
341    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
342        try_recv_from(&self.state)
343    }
344}
345
346impl<T: Policy> Drop for Receiver<T> {
347    fn drop(&mut self) {
348        self.state.close();
349    }
350}
351
352impl<T: UnreliablePolicy> Drop for UnreliableReceiver<T> {
353    fn drop(&mut self) {
354        self.state.close();
355    }
356}
357
358/// Create a new bounded mailbox.
359pub fn new<T: Policy>(metrics: impl Metrics, capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
360    let state = new_state(metrics, capacity);
361    (
362        Sender {
363            state: state.clone(),
364        },
365        Receiver { state },
366    )
367}
368
369/// Create a new bounded unreliable mailbox.
370pub fn new_unreliable<T: UnreliablePolicy>(
371    metrics: impl Metrics,
372    capacity: NonZeroUsize,
373) -> (UnreliableSender<T>, UnreliableReceiver<T>) {
374    let state = new_state(metrics, capacity);
375    (
376        UnreliableSender {
377            state: state.clone(),
378        },
379        UnreliableReceiver { state },
380    )
381}
382
383// `activity` packs the published overflow state and in-flight overflow
384// mutations into one atomic word. The overflow lock serializes actual
385// overflow changes (this word lets the ready fast path avoid that lock when
386// overflow is inactive).
387//
388// The low bit records whether the most recently published overflow state was
389// non-empty. The higher bits count active overflow mutations. Each mutation
390// adds `OVERFLOW_MUTATION` while it may mutate or publish overflow state, so
391// the count and the state bit coexist in the same word.
392//
393// Useful states:
394// - `activity == 0`: no published overflow and no active overflow mutation, so
395//   senders may try the direct ready fast path.
396// - `activity & OVERFLOW_HAS_MESSAGES != 0`: overflow has published messages,
397//   so the receiver may try to refill ready. The overflow lock serializes
398//   refill with any active mutation.
399// - `activity >= OVERFLOW_MUTATION`: at least one overflow mutation is active.
400//   The overflow lock still serializes queue access; this state only keeps
401//   lock-free fast-path/refill decisions from acting on a changing overflow
402//   snapshot.
403//
404// Activity accesses are relaxed because this word does not publish queue
405// contents. The overflow mutex serializes overflow access, and the ready queue
406// owns its own synchronization. Stale activity observations only decide whether
407// a caller tries a fast path, locks overflow, or waits for a later wake.
408const OVERFLOW_HAS_MESSAGES: usize = 1;
409const OVERFLOW_MUTATION: usize = 2;
410
411cfg_if::cfg_if! {
412    if #[cfg(feature = "loom")] {
413        use loom::{
414            future::AtomicWaker,
415            sync::{
416                atomic::{AtomicBool, AtomicUsize, Ordering},
417                Arc, Mutex, MutexGuard,
418            },
419        };
420
421        fn register_waker(waker: &AtomicWaker, task: &std::task::Waker) {
422            waker.register_by_ref(task);
423        }
424
425        fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
426            mutex.lock().unwrap()
427        }
428
429        struct ReadyState<T> {
430            published: VecDeque<T>,
431            reserved: usize,
432        }
433
434        struct Ready<T> {
435            state: Mutex<ReadyState<T>>,
436            capacity: usize,
437        }
438
439        impl<T> Ready<T> {
440            fn new(capacity: usize) -> Self {
441                Self {
442                    state: Mutex::new(ReadyState {
443                        published: VecDeque::new(),
444                        reserved: 0,
445                    }),
446                    capacity,
447                }
448            }
449
450            const fn capacity(&self) -> usize {
451                self.capacity
452            }
453
454            fn push(&self, message: T) -> Result<(), T> {
455                {
456                    let mut state = lock(&self.state);
457                    if state.published.len() + state.reserved >= self.capacity {
458                        return Err(message);
459                    }
460                    state.reserved += 1;
461                }
462
463                loom::thread::yield_now();
464
465                let mut state = lock(&self.state);
466                state.reserved -= 1;
467                state.published.push_back(message);
468                Ok(())
469            }
470
471            fn pop(&self) -> Option<T> {
472                loop {
473                    let mut state = lock(&self.state);
474                    if let Some(message) = state.published.pop_front() {
475                        return Some(message);
476                    }
477                    if state.reserved == 0 {
478                        return None;
479                    }
480                    drop(state);
481                    loom::thread::yield_now();
482                }
483            }
484        }
485    } else {
486        use crossbeam_queue::ArrayQueue;
487        use futures_util::task::AtomicWaker;
488        use parking_lot::{Mutex, MutexGuard};
489        use std::sync::{
490            atomic::{AtomicBool, AtomicUsize, Ordering},
491            Arc,
492        };
493
494        fn register_waker(waker: &AtomicWaker, task: &std::task::Waker) {
495            waker.register(task);
496        }
497
498        fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
499            mutex.lock()
500        }
501
502        struct Ready<T> {
503            queue: ArrayQueue<T>,
504        }
505
506        impl<T> Ready<T> {
507            fn new(capacity: usize) -> Self {
508                Self {
509                    queue: ArrayQueue::new(capacity),
510                }
511            }
512
513            fn capacity(&self) -> usize {
514                self.queue.capacity()
515            }
516
517            fn push(&self, message: T) -> Result<(), T> {
518                self.queue.push(message)
519            }
520
521            fn pop(&self) -> Option<T> {
522                self.queue.pop()
523            }
524        }
525    }
526}
527
528struct OverflowState<T, M: Mode<T>> {
529    queue: Mutex<M::Overflow>,
530    activity: AtomicUsize,
531    _phantom: PhantomData<fn() -> T>,
532}
533
534impl<T, M: Mode<T>> OverflowState<T, M> {
535    #[allow(clippy::missing_const_for_fn)]
536    fn new() -> Self {
537        Self {
538            queue: Mutex::new(M::Overflow::default()),
539            activity: AtomicUsize::new(0),
540            _phantom: PhantomData,
541        }
542    }
543
544    fn try_ready(&self, ready: &Ready<T>, message: T) -> Result<(), T> {
545        // Avoid ready while overflow is retained or changing.
546        if self.activity.load(Ordering::Relaxed) != 0 {
547            return Err(message);
548        }
549        ready.push(message)
550    }
551
552    fn enqueue_overflow(
553        &self,
554        ready: &Ready<T>,
555        message: T,
556        is_closed: impl Fn() -> bool,
557    ) -> M::Feedback {
558        // Mark overflow active so racing senders stay off the ready fast path.
559        let mutation = Mutation::begin(&self.activity);
560        let mut queue = lock(&self.queue);
561        if is_closed() {
562            mutation.publish(queue.is_empty());
563            return M::ready_feedback(Feedback::Closed);
564        }
565
566        // The fast-path push may have observed stale ready fullness. Retry
567        // ready under the overflow lock before applying policy, but only when
568        // there is no retained overflow that must stay ahead of this message.
569        let message = if queue.is_empty() {
570            match ready.push(message) {
571                Ok(()) => {
572                    mutation.publish(queue.is_empty());
573                    return M::ready_feedback(Feedback::Ok);
574                }
575                Err(message) => message,
576            }
577        } else {
578            message
579        };
580
581        // Preserve overflow order, or handle a still-full ready queue.
582        let handled = M::handle(&mut queue, message);
583        mutation.publish(queue.is_empty());
584        M::overflow_feedback(handled)
585    }
586
587    fn refill(&self, ready: &Ready<T>) {
588        // Skip the overflow lock unless non-empty overflow was published.
589        if self.activity.load(Ordering::Relaxed) & OVERFLOW_HAS_MESSAGES == 0 {
590            return;
591        }
592
593        let mutation = Mutation::begin(&self.activity);
594        let mut queue = lock(&self.queue);
595        queue.drain(|message| ready.push(message).err());
596        mutation.publish(queue.is_empty());
597    }
598
599    fn drain(&self, ready: &Ready<T>) {
600        // Attempt to drain all messages from ready
601        let mutation = Mutation::begin(&self.activity);
602        while ready.pop().is_some() {}
603
604        // Attempt to drain all messages from overflow (storing messages to drop after
605        // releasing the lock)
606        let mut drained = Vec::new();
607        let mut queue = lock(&self.queue);
608        queue.drain(|message| {
609            drained.push(message);
610            None
611        });
612        mutation.publish(queue.is_empty());
613        drop(queue);
614        drop(drained);
615
616        // A sender may have passed the fast-path activity check before this
617        // mutation began, so we drain again
618        while ready.pop().is_some() {}
619    }
620}
621
622struct Mutation<'a> {
623    activity: &'a AtomicUsize,
624}
625
626impl<'a> Mutation<'a> {
627    fn begin(activity: &'a AtomicUsize) -> Self {
628        activity.fetch_add(OVERFLOW_MUTATION, Ordering::Relaxed);
629        Self { activity }
630    }
631
632    fn publish(&self, is_empty: bool) {
633        if is_empty {
634            self.activity
635                .fetch_and(!OVERFLOW_HAS_MESSAGES, Ordering::Relaxed);
636        } else {
637            self.activity
638                .fetch_or(OVERFLOW_HAS_MESSAGES, Ordering::Relaxed);
639        }
640    }
641}
642
643impl Drop for Mutation<'_> {
644    fn drop(&mut self) {
645        let previous = self
646            .activity
647            .fetch_sub(OVERFLOW_MUTATION, Ordering::Relaxed);
648        assert!(previous >= OVERFLOW_MUTATION);
649    }
650}
651
652struct State<T, M: Mode<T>> {
653    ready: Ready<T>,
654    overflow: OverflowState<T, M>,
655    backoff: Counter,
656    closed: AtomicBool,
657    senders: AtomicUsize,
658    waker: AtomicWaker,
659}
660
661impl<T, M: Mode<T>> State<T, M> {
662    fn enqueue(&self, message: T) -> M::Feedback {
663        // Receiver closure makes new sends fail immediately.
664        if self.closed.load(Ordering::Acquire) {
665            return M::ready_feedback(Feedback::Closed);
666        }
667
668        // Common case: publish directly to ready without taking overflow lock.
669        let message = match self.overflow.try_ready(&self.ready, message) {
670            Ok(()) => {
671                if self.closed.load(Ordering::Acquire) {
672                    self.overflow.drain(&self.ready);
673                    return M::ready_feedback(Feedback::Closed);
674                }
675                self.waker.wake();
676                return M::ready_feedback(Feedback::Ok);
677            }
678            Err(message) => message,
679        };
680
681        // Slow path: serialize through overflow and apply the policy.
682        let feedback = self
683            .overflow
684            .enqueue_overflow(&self.ready, message, || self.closed.load(Ordering::Acquire));
685
686        // Record any backoff.
687        if M::is_backoff(&feedback) {
688            self.backoff.inc();
689        }
690
691        // Wake after any non-closed slow-path enqueue because a receiver may
692        // have skipped refill while this overflow mutation was active. By the
693        // time we wake, the mutation has published its overflow state. Spurious
694        // wakes are acceptable.
695        if !M::is_closed(&feedback) {
696            self.waker.wake();
697        }
698        feedback
699    }
700
701    fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
702        // Fast path avoids waker churn when a message is already ready.
703        if let Some(message) = self.pop() {
704            return Poll::Ready(Some(message));
705        }
706
707        if self.is_disconnected() {
708            return Poll::Ready(self.pop());
709        }
710
711        register_waker(&self.waker, cx.waker());
712
713        // A sender can enqueue and wake after the first pop but before this
714        // waker is installed. Re-check before sleeping so the wake is not lost.
715        if let Some(message) = self.pop() {
716            return Poll::Ready(Some(message));
717        }
718
719        if self.is_disconnected() {
720            Poll::Ready(self.pop())
721        } else {
722            Poll::Pending
723        }
724    }
725
726    fn pop(&self) -> Option<T> {
727        if let Some(message) = self.ready.pop() {
728            // A freed ready slot may let the oldest overflow message advance.
729            self.overflow.refill(&self.ready);
730            return Some(message);
731        }
732
733        // Empty ready may race with stale activity, so let `refill`
734        // decide whether overflow is worth locking.
735        self.overflow.refill(&self.ready);
736        self.ready.pop()
737    }
738
739    fn is_disconnected(&self) -> bool {
740        self.closed.load(Ordering::Acquire) || self.senders.load(Ordering::Acquire) == 0
741    }
742
743    fn close(&self) {
744        self.closed.store(true, Ordering::Release);
745        self.overflow.drain(&self.ready);
746    }
747}
748
749fn new_state<T, M: Mode<T>>(metrics: impl Metrics, capacity: NonZeroUsize) -> Arc<State<T, M>> {
750    Arc::new(State {
751        ready: Ready::new(capacity.get()),
752        overflow: OverflowState::new(),
753        backoff: metrics.counter("backoff", "number of enqueue calls that requested backoff"),
754        closed: AtomicBool::new(false),
755        senders: AtomicUsize::new(1),
756        waker: AtomicWaker::new(),
757    })
758}
759
760fn clone_sender_state<T, M: Mode<T>>(state: &Arc<State<T, M>>) -> Arc<State<T, M>> {
761    // Live sender count drives receiver disconnect detection.
762    state.senders.fetch_add(1, Ordering::Relaxed);
763    state.clone()
764}
765
766fn drop_sender_state<T, M: Mode<T>>(state: &State<T, M>) {
767    let previous = state.senders.fetch_sub(1, Ordering::AcqRel);
768    assert!(previous > 0);
769    // Wake a receiver that is parked waiting for data or disconnect.
770    if previous == 1 {
771        state.waker.wake();
772    }
773}
774
775fn fmt_sender_state<T, M: Mode<T>>(
776    name: &str,
777    state: &State<T, M>,
778    f: &mut fmt::Formatter<'_>,
779) -> fmt::Result {
780    f.debug_struct(name)
781        .field("capacity", &state.ready.capacity())
782        .field("closed", &state.closed.load(Ordering::Acquire))
783        .finish()
784}
785
786async fn recv_from<T, M: Mode<T>>(state: &State<T, M>) -> Option<T> {
787    poll_fn(|cx| state.poll_recv(cx)).await
788}
789
790fn try_recv_from<T, M: Mode<T>>(state: &State<T, M>) -> Result<T, TryRecvError> {
791    if let Some(message) = state.pop() {
792        return Ok(message);
793    }
794    if state.is_disconnected() {
795        return state.pop().ok_or(TryRecvError::Disconnected);
796    }
797    Err(TryRecvError::Empty)
798}
799
800#[cfg(test)]
801mod mocks {
802    use commonware_runtime::{
803        telemetry::metrics::{Metric, Registered, Registration},
804        Metrics as RuntimeMetrics, Name, Supervisor,
805    };
806    use std::fmt;
807
808    #[derive(Clone, Copy, Debug, Default)]
809    pub(super) struct Metrics;
810
811    impl Supervisor for Metrics {
812        fn name(&self) -> Name {
813            Name::default()
814        }
815
816        fn child(&self, _label: &'static str) -> Self {
817            Self
818        }
819
820        fn with_attribute(self, _key: &'static str, _value: impl fmt::Display) -> Self {
821            self
822        }
823    }
824
825    impl RuntimeMetrics for Metrics {
826        fn register<N: Into<String>, H: Into<String>, M: Metric>(
827            &self,
828            _name: N,
829            _help: H,
830            metric: M,
831        ) -> Registered<M> {
832            Registered::with_registration(metric, Registration::from(()))
833        }
834
835        fn encode(&self) -> String {
836            String::new()
837        }
838    }
839}
840
841#[cfg(all(test, not(feature = "loom")))]
842mod tests {
843    use super::{mocks, *};
844    use commonware_macros::test_async;
845    use commonware_runtime::{deterministic, Runner as _, Supervisor};
846    use commonware_utils::{channel::oneshot, NZUsize};
847    use futures::{
848        pin_mut,
849        task::{waker_ref, ArcWake},
850        FutureExt,
851    };
852    use std::sync::{
853        atomic::{AtomicUsize, Ordering},
854        mpsc::TryRecvError,
855        Arc,
856    };
857
858    fn new<T: Policy>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
859        super::new(mocks::Metrics, capacity)
860    }
861
862    fn new_unreliable<T: UnreliablePolicy>(
863        capacity: NonZeroUsize,
864    ) -> (UnreliableSender<T>, UnreliableReceiver<T>) {
865        super::new_unreliable(mocks::Metrics, capacity)
866    }
867
868    #[derive(Debug, PartialEq, Eq)]
869    enum Message {
870        Update(u64),
871        Vote(u64),
872        Required(u64),
873        Buffered(u64),
874        Hint(u64),
875    }
876
877    impl UnreliablePolicy for Message {
878        type Overflow = VecDeque<Self>;
879
880        fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
881            match message {
882                Self::Update(value) => {
883                    if let Some(index) = overflow
884                        .iter()
885                        .rposition(|pending| matches!(pending, Self::Update(_)))
886                    {
887                        overflow.remove(index);
888                    }
889                    overflow.push_back(Self::Update(value));
890                    true
891                }
892                Self::Required(_) | Self::Buffered(_) => {
893                    overflow.push_back(message);
894                    true
895                }
896                Self::Hint(value) => {
897                    let Some(index) = overflow
898                        .iter()
899                        .rposition(|pending| matches!(pending, Self::Update(_)))
900                    else {
901                        return true;
902                    };
903                    overflow.remove(index);
904                    overflow.push_back(Self::Hint(value));
905                    true
906                }
907                Self::Vote(_) => false,
908            }
909        }
910    }
911
912    struct Ack {
913        _sender: oneshot::Sender<()>,
914    }
915
916    impl Policy for Ack {
917        type Overflow = VecDeque<Self>;
918
919        fn handle(overflow: &mut VecDeque<Self>, message: Self) {
920            overflow.push_back(message);
921        }
922    }
923
924    #[derive(Default)]
925    struct WakeCounter {
926        wakes: AtomicUsize,
927    }
928
929    impl WakeCounter {
930        fn count(&self) -> usize {
931            self.wakes.load(Ordering::Acquire)
932        }
933    }
934
935    impl ArcWake for WakeCounter {
936        fn wake_by_ref(arc_self: &Arc<Self>) {
937            arc_self.wakes.fetch_add(1, Ordering::AcqRel);
938        }
939    }
940
941    #[test]
942    fn vecdeque_overflow_drain_stops_after_rejected_message() {
943        let mut overflow = VecDeque::from([Message::Vote(1), Message::Vote(2), Message::Vote(3)]);
944        let mut drained = VecDeque::new();
945
946        Overflow::drain(&mut overflow, |message| {
947            drained.push_back(message);
948            if drained.len() == 2 {
949                drained.pop_back()
950            } else {
951                None
952            }
953        });
954
955        assert_eq!(drained, VecDeque::from([Message::Vote(1)]));
956        assert_eq!(
957            overflow,
958            VecDeque::from([Message::Vote(2), Message::Vote(3)])
959        );
960    }
961
962    #[test_async]
963    async fn full_inbox_replaces_stale_overflow_message() {
964        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
965        assert_eq!(
966            sender.enqueue(Message::Update(1)),
967            Unreliable::new(Feedback::Ok)
968        );
969        assert_eq!(
970            sender.enqueue(Message::Update(2)),
971            Unreliable::new(Feedback::Backoff)
972        );
973        assert_eq!(
974            sender.enqueue(Message::Update(3)),
975            Unreliable::new(Feedback::Backoff)
976        );
977
978        assert_eq!(receiver.recv().await, Some(Message::Update(1)));
979        assert_eq!(receiver.recv().await, Some(Message::Update(3)));
980    }
981
982    #[test_async]
983    async fn policy_can_replace_stale_overflow_at_back() {
984        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
985        assert_eq!(
986            sender.enqueue(Message::Vote(1)),
987            Unreliable::new(Feedback::Ok)
988        );
989        assert_eq!(
990            sender.enqueue(Message::Update(2)),
991            Unreliable::new(Feedback::Backoff)
992        );
993        assert_eq!(
994            sender.enqueue(Message::Required(3)),
995            Unreliable::new(Feedback::Backoff)
996        );
997        assert_eq!(
998            sender.enqueue(Message::Update(4)),
999            Unreliable::new(Feedback::Backoff)
1000        );
1001
1002        assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1003        assert_eq!(receiver.recv().await, Some(Message::Required(3)));
1004        assert_eq!(receiver.recv().await, Some(Message::Update(4)));
1005    }
1006
1007    #[test_async]
1008    async fn full_inbox_rejects_non_replaceable_message() {
1009        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1010        assert_eq!(
1011            sender.enqueue(Message::Vote(1)),
1012            Unreliable::new(Feedback::Ok)
1013        );
1014        assert_eq!(sender.enqueue(Message::Vote(2)), Unreliable::Rejected);
1015
1016        assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1017    }
1018
1019    #[test_async]
1020    async fn full_inbox_retains_required_message() {
1021        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1022        assert_eq!(
1023            sender.enqueue(Message::Vote(1)),
1024            Unreliable::new(Feedback::Ok)
1025        );
1026        assert_eq!(
1027            sender.enqueue(Message::Buffered(2)),
1028            Unreliable::new(Feedback::Backoff)
1029        );
1030
1031        assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1032        assert_eq!(receiver.recv().await, Some(Message::Buffered(2)));
1033    }
1034
1035    #[test]
1036    fn try_recv_refills_from_overflow() {
1037        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1038        assert_eq!(
1039            sender.enqueue(Message::Vote(1)),
1040            Unreliable::new(Feedback::Ok)
1041        );
1042        assert_eq!(
1043            sender.enqueue(Message::Buffered(2)),
1044            Unreliable::new(Feedback::Backoff)
1045        );
1046
1047        assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1048        assert_eq!(receiver.try_recv(), Ok(Message::Buffered(2)));
1049    }
1050
1051    #[test]
1052    fn backoff_metric_counts_backoff_feedback() {
1053        let executor = deterministic::Runner::default();
1054        executor.start(|context| async move {
1055            let (sender, _receiver) = super::new_unreliable(context.child("mailbox"), NZUsize!(1));
1056            assert_eq!(
1057                sender.enqueue(Message::Vote(1)),
1058                Unreliable::new(Feedback::Ok)
1059            );
1060            assert_eq!(
1061                sender.enqueue(Message::Buffered(2)),
1062                Unreliable::new(Feedback::Backoff)
1063            );
1064            assert_eq!(
1065                sender.enqueue(Message::Buffered(3)),
1066                Unreliable::new(Feedback::Backoff)
1067            );
1068
1069            let buffer = context.encode();
1070            assert!(
1071                buffer.contains("mailbox_backoff_total 2"),
1072                "missing backoff count in metrics: {buffer}"
1073            );
1074        });
1075    }
1076
1077    #[test]
1078    fn unreliable_rejected_feedback_is_not_accepted_or_counted_as_backoff() {
1079        let executor = deterministic::Runner::default();
1080        executor.start(|context| async move {
1081            let (sender, _receiver) = super::new_unreliable(context.child("mailbox"), NZUsize!(1));
1082            assert_eq!(
1083                sender.enqueue(Message::Vote(1)),
1084                Unreliable::new(Feedback::Ok)
1085            );
1086            let feedback = sender.enqueue(Message::Vote(2));
1087
1088            assert_eq!(feedback, Unreliable::Rejected);
1089            assert!(!feedback.accepted());
1090
1091            let buffer = context.encode();
1092            assert!(
1093                buffer.contains("mailbox_backoff_total 0"),
1094                "unexpected backoff count in metrics: {buffer}"
1095            );
1096        });
1097    }
1098
1099    #[test]
1100    fn try_recv_drains_buffered_messages_after_senders_drop() {
1101        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1102        assert_eq!(
1103            sender.enqueue(Message::Vote(1)),
1104            Unreliable::new(Feedback::Ok)
1105        );
1106        assert_eq!(
1107            sender.enqueue(Message::Buffered(2)),
1108            Unreliable::new(Feedback::Backoff)
1109        );
1110        drop(sender);
1111
1112        assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1113        assert_eq!(receiver.try_recv(), Ok(Message::Buffered(2)));
1114        assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
1115    }
1116
1117    #[test]
1118    fn poll_recv_drains_buffered_messages_after_senders_drop() {
1119        let (sender, receiver) = new_unreliable(NZUsize!(1));
1120        let wakes = Arc::new(WakeCounter::default());
1121        let waker = waker_ref(&wakes);
1122        let mut cx = Context::from_waker(&waker);
1123
1124        assert_eq!(
1125            sender.enqueue(Message::Vote(1)),
1126            Unreliable::new(Feedback::Ok)
1127        );
1128        assert_eq!(
1129            sender.enqueue(Message::Buffered(2)),
1130            Unreliable::new(Feedback::Backoff)
1131        );
1132        drop(sender);
1133
1134        assert_eq!(
1135            receiver.state.poll_recv(&mut cx),
1136            Poll::Ready(Some(Message::Vote(1)))
1137        );
1138        assert_eq!(
1139            receiver.state.poll_recv(&mut cx),
1140            Poll::Ready(Some(Message::Buffered(2)))
1141        );
1142        assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None));
1143    }
1144
1145    #[test]
1146    fn enqueue_uses_ready_capacity_after_partial_drain() {
1147        let (sender, mut receiver) = new_unreliable(NZUsize!(2));
1148        assert_eq!(
1149            sender.enqueue(Message::Vote(1)),
1150            Unreliable::new(Feedback::Ok)
1151        );
1152        assert_eq!(
1153            sender.enqueue(Message::Vote(2)),
1154            Unreliable::new(Feedback::Ok)
1155        );
1156        assert_eq!(
1157            sender.enqueue(Message::Required(3)),
1158            Unreliable::new(Feedback::Backoff)
1159        );
1160
1161        assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1162        assert_eq!(receiver.try_recv(), Ok(Message::Vote(2)));
1163
1164        assert_eq!(
1165            sender.enqueue(Message::Vote(4)),
1166            Unreliable::new(Feedback::Ok)
1167        );
1168        assert_eq!(receiver.try_recv(), Ok(Message::Required(3)));
1169        assert_eq!(receiver.try_recv(), Ok(Message::Vote(4)));
1170    }
1171
1172    #[test]
1173    fn receiver_refills_overflow_after_partial_drain() {
1174        let (sender, mut receiver) = new_unreliable(NZUsize!(3));
1175        assert_eq!(
1176            sender.enqueue(Message::Vote(1)),
1177            Unreliable::new(Feedback::Ok)
1178        );
1179        assert_eq!(
1180            sender.enqueue(Message::Vote(2)),
1181            Unreliable::new(Feedback::Ok)
1182        );
1183        assert_eq!(
1184            sender.enqueue(Message::Vote(3)),
1185            Unreliable::new(Feedback::Ok)
1186        );
1187        assert_eq!(
1188            sender.enqueue(Message::Required(4)),
1189            Unreliable::new(Feedback::Backoff)
1190        );
1191
1192        assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1193        assert_eq!(receiver.try_recv(), Ok(Message::Vote(2)));
1194
1195        assert_eq!(
1196            sender.enqueue(Message::Vote(5)),
1197            Unreliable::new(Feedback::Ok)
1198        );
1199        assert_eq!(receiver.try_recv(), Ok(Message::Vote(3)));
1200        assert_eq!(receiver.try_recv(), Ok(Message::Required(4)));
1201        assert_eq!(receiver.try_recv(), Ok(Message::Vote(5)));
1202    }
1203
1204    #[test_async]
1205    async fn full_inbox_retains_unmatched_replaceable_message() {
1206        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1207        assert_eq!(
1208            sender.enqueue(Message::Vote(1)),
1209            Unreliable::new(Feedback::Ok)
1210        );
1211        assert_eq!(
1212            sender.enqueue(Message::Required(2)),
1213            Unreliable::new(Feedback::Backoff)
1214        );
1215
1216        assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1217        assert_eq!(receiver.recv().await, Some(Message::Required(2)));
1218    }
1219
1220    #[test_async]
1221    async fn full_inbox_replaces_stale_overflow_after_ready_fills() {
1222        let (sender, mut receiver) = new_unreliable(NZUsize!(2));
1223        assert_eq!(
1224            sender.enqueue(Message::Vote(1)),
1225            Unreliable::new(Feedback::Ok)
1226        );
1227        assert_eq!(
1228            sender.enqueue(Message::Update(2)),
1229            Unreliable::new(Feedback::Ok)
1230        );
1231        assert_eq!(
1232            sender.enqueue(Message::Update(3)),
1233            Unreliable::new(Feedback::Backoff)
1234        );
1235        assert_eq!(
1236            sender.enqueue(Message::Update(4)),
1237            Unreliable::new(Feedback::Backoff)
1238        );
1239
1240        assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1241        assert_eq!(receiver.recv().await, Some(Message::Update(2)));
1242        assert_eq!(receiver.recv().await, Some(Message::Update(4)));
1243    }
1244
1245    #[test_async]
1246    async fn mailbox_capacity_is_soft_limit_for_required_messages() {
1247        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1248        assert_eq!(
1249            sender.enqueue(Message::Vote(1)),
1250            Unreliable::new(Feedback::Ok)
1251        );
1252        assert_eq!(
1253            sender.enqueue(Message::Required(2)),
1254            Unreliable::new(Feedback::Backoff)
1255        );
1256        assert_eq!(
1257            sender.enqueue(Message::Required(3)),
1258            Unreliable::new(Feedback::Backoff)
1259        );
1260
1261        assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1262        assert_eq!(receiver.recv().await, Some(Message::Required(2)));
1263        assert_eq!(receiver.recv().await, Some(Message::Required(3)));
1264    }
1265
1266    #[test_async]
1267    async fn full_inbox_rejects_hint() {
1268        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1269        assert_eq!(
1270            sender.enqueue(Message::Vote(1)),
1271            Unreliable::new(Feedback::Ok)
1272        );
1273        assert_eq!(
1274            sender.enqueue(Message::Hint(2)),
1275            Unreliable::new(Feedback::Backoff)
1276        );
1277
1278        assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1279    }
1280
1281    #[test_async]
1282    async fn full_inbox_can_replace_or_drop_by_message() {
1283        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1284        assert_eq!(
1285            sender.enqueue(Message::Vote(1)),
1286            Unreliable::new(Feedback::Ok)
1287        );
1288        assert_eq!(
1289            sender.enqueue(Message::Update(2)),
1290            Unreliable::new(Feedback::Backoff)
1291        );
1292        assert_eq!(
1293            sender.enqueue(Message::Hint(3)),
1294            Unreliable::new(Feedback::Backoff)
1295        );
1296
1297        assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1298        assert_eq!(receiver.recv().await, Some(Message::Hint(3)));
1299    }
1300
1301    #[test_async]
1302    async fn empty_inbox_wakes_on_enqueue() {
1303        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1304
1305        let next = receiver.recv();
1306        pin_mut!(next);
1307        assert!(next.as_mut().now_or_never().is_none());
1308
1309        assert_eq!(
1310            sender.enqueue(Message::Vote(1)),
1311            Unreliable::new(Feedback::Ok)
1312        );
1313        assert_eq!(next.await, Some(Message::Vote(1)));
1314    }
1315
1316    #[test]
1317    fn pending_recv_wakes_when_senders_drop() {
1318        let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1319        let wakes = Arc::new(WakeCounter::default());
1320        let waker = waker_ref(&wakes);
1321        let mut cx = Context::from_waker(&waker);
1322
1323        assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1324        assert_eq!(wakes.count(), 0);
1325
1326        drop(sender);
1327
1328        assert_eq!(wakes.count(), 1);
1329        assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None));
1330    }
1331
1332    #[test]
1333    fn pending_recv_wakes_on_handled_overflow_enqueue() {
1334        let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1335        let wakes = Arc::new(WakeCounter::default());
1336        let waker = waker_ref(&wakes);
1337        let mut cx = Context::from_waker(&waker);
1338
1339        assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1340        assert_eq!(wakes.count(), 0);
1341
1342        // Prime ready directly to isolate the overflow wake after registration.
1343        assert_eq!(sender.state.ready.push(Message::Vote(1)), Ok(()));
1344        assert_eq!(
1345            sender.enqueue(Message::Buffered(2)),
1346            Unreliable::new(Feedback::Backoff)
1347        );
1348
1349        assert_eq!(wakes.count(), 1);
1350        assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1351        assert_eq!(receiver.try_recv(), Ok(Message::Buffered(2)));
1352    }
1353
1354    #[test]
1355    fn receiver_drop_blocks_ready_fast_path_feedback() {
1356        let (sender, receiver) = new_unreliable(NZUsize!(1));
1357        let wakes = Arc::new(WakeCounter::default());
1358        let waker = waker_ref(&wakes);
1359        let mut cx = Context::from_waker(&waker);
1360
1361        assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1362        drop(receiver);
1363
1364        assert_eq!(
1365            sender.enqueue(Message::Vote(1)),
1366            Unreliable::new(Feedback::Closed)
1367        );
1368        assert_eq!(wakes.count(), 0);
1369    }
1370
1371    #[test_async]
1372    async fn empty_inbox_closes_when_senders_drop() {
1373        let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
1374        drop(sender);
1375
1376        assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
1377        assert_eq!(receiver.recv().await, None);
1378    }
1379
1380    #[test]
1381    fn enqueue_after_receiver_drop_returns_closed() {
1382        let (sender, receiver) = new_unreliable(NZUsize!(1));
1383        drop(receiver);
1384
1385        assert_eq!(
1386            sender.enqueue(Message::Vote(1)),
1387            Unreliable::new(Feedback::Closed)
1388        );
1389    }
1390
1391    #[test_async]
1392    async fn receiver_drop_cancels_buffered_responders() {
1393        let (sender, receiver) = new(NZUsize!(1));
1394        let (ready_tx, ready_rx) = oneshot::channel();
1395        let (overflow_tx, overflow_rx) = oneshot::channel();
1396
1397        assert_eq!(sender.enqueue(Ack { _sender: ready_tx }), Feedback::Ok);
1398        assert_eq!(
1399            sender.enqueue(Ack {
1400                _sender: overflow_tx
1401            }),
1402            Feedback::Backoff
1403        );
1404        drop(receiver);
1405
1406        assert!(ready_rx.await.is_err());
1407        assert!(overflow_rx.await.is_err());
1408    }
1409
1410    #[derive(Debug, PartialEq, Eq)]
1411    enum ClearingMessage {
1412        FillReady,
1413        ClearOverflow,
1414    }
1415
1416    impl Policy for ClearingMessage {
1417        type Overflow = VecDeque<Self>;
1418
1419        fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1420            overflow.push_back(message);
1421            overflow.clear();
1422        }
1423    }
1424
1425    #[test]
1426    fn policy_can_clear_overflow_and_request_backoff() {
1427        let (sender, mut receiver) = new(NZUsize!(1));
1428        assert_eq!(sender.enqueue(ClearingMessage::FillReady), Feedback::Ok);
1429        assert_eq!(
1430            sender.enqueue(ClearingMessage::ClearOverflow),
1431            Feedback::Backoff
1432        );
1433
1434        assert!(matches!(
1435            receiver.try_recv(),
1436            Ok(ClearingMessage::FillReady)
1437        ));
1438        assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
1439    }
1440
1441    #[derive(Debug, PartialEq, Eq)]
1442    enum SpillMessage {
1443        FillReady,
1444        Spill,
1445    }
1446
1447    impl Policy for SpillMessage {
1448        type Overflow = VecDeque<Self>;
1449
1450        fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1451            overflow.push_back(message);
1452        }
1453    }
1454
1455    #[test]
1456    fn pending_recv_wakes_when_policy_spills() {
1457        let (sender, mut receiver) = new(NZUsize!(1));
1458        let wakes = Arc::new(WakeCounter::default());
1459        let waker = waker_ref(&wakes);
1460        let mut cx = Context::from_waker(&waker);
1461
1462        assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1463        assert_eq!(wakes.count(), 0);
1464
1465        assert_eq!(sender.state.ready.push(SpillMessage::FillReady), Ok(()));
1466        assert_eq!(sender.enqueue(SpillMessage::Spill), Feedback::Backoff);
1467
1468        assert_eq!(wakes.count(), 1);
1469        assert_eq!(receiver.try_recv(), Ok(SpillMessage::FillReady));
1470        assert_eq!(receiver.try_recv(), Ok(SpillMessage::Spill));
1471    }
1472}
1473
1474#[cfg(all(test, feature = "loom"))]
1475mod loom_tests {
1476    use super::{mocks, *};
1477    use commonware_utils::NZUsize;
1478    use futures::pin_mut;
1479    use loom::{
1480        sync::{
1481            atomic::{AtomicUsize, Ordering},
1482            Arc,
1483        },
1484        thread,
1485    };
1486    use std::{
1487        future::Future,
1488        task::{RawWaker, RawWakerVTable, Waker},
1489    };
1490
1491    fn new<T: Policy>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
1492        super::new(mocks::Metrics, capacity)
1493    }
1494
1495    fn new_unreliable<T: UnreliablePolicy>(
1496        capacity: NonZeroUsize,
1497    ) -> (UnreliableSender<T>, UnreliableReceiver<T>) {
1498        super::new_unreliable(mocks::Metrics, capacity)
1499    }
1500
1501    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1502    enum Message {
1503        Drop(u8),
1504        Spill(u8),
1505    }
1506
1507    #[derive(Clone, Debug)]
1508    enum OrderedMessage {
1509        Item(u8),
1510        Coordinated(u8, Arc<AtomicUsize>),
1511    }
1512
1513    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1514    enum ReplacingMessage {
1515        FillReady,
1516        Replace(u8),
1517    }
1518
1519    struct TrackedMessage {
1520        drops: Arc<AtomicUsize>,
1521    }
1522
1523    struct CyclicMessage {
1524        _sender: Sender<Self>,
1525        drops: Arc<AtomicUsize>,
1526    }
1527
1528    impl TrackedMessage {
1529        const fn new(drops: Arc<AtomicUsize>) -> Self {
1530            Self { drops }
1531        }
1532    }
1533
1534    impl Drop for TrackedMessage {
1535        fn drop(&mut self) {
1536            self.drops.fetch_add(1, Ordering::AcqRel);
1537        }
1538    }
1539
1540    impl Drop for CyclicMessage {
1541        fn drop(&mut self) {
1542            self.drops.fetch_add(1, Ordering::AcqRel);
1543        }
1544    }
1545
1546    impl UnreliablePolicy for Message {
1547        type Overflow = VecDeque<Self>;
1548
1549        fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
1550            match message {
1551                Self::Drop(_) => false,
1552                Self::Spill(_) => {
1553                    overflow.push_back(message);
1554                    true
1555                }
1556            }
1557        }
1558    }
1559
1560    impl Policy for OrderedMessage {
1561        type Overflow = VecDeque<Self>;
1562
1563        fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1564            let gate = match &message {
1565                Self::Item(_) => None,
1566                Self::Coordinated(_, gate) => Some(gate.clone()),
1567            };
1568            overflow.push_back(message);
1569            if let Some(gate) = gate {
1570                gate.store(1, Ordering::Release);
1571                while gate.load(Ordering::Acquire) == 1 {
1572                    thread::yield_now();
1573                }
1574            }
1575        }
1576    }
1577
1578    impl UnreliablePolicy for ReplacingMessage {
1579        type Overflow = VecDeque<Self>;
1580
1581        fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
1582            match message {
1583                Self::FillReady => false,
1584                Self::Replace(_) => {
1585                    if let Some(pending) = overflow
1586                        .iter_mut()
1587                        .rev()
1588                        .find(|pending| matches!(pending, Self::Replace(_)))
1589                    {
1590                        *pending = message;
1591                    } else {
1592                        overflow.push_back(message);
1593                    }
1594                    true
1595                }
1596            }
1597        }
1598    }
1599
1600    impl Policy for TrackedMessage {
1601        type Overflow = VecDeque<Self>;
1602
1603        fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1604            overflow.push_back(message);
1605        }
1606    }
1607
1608    impl Policy for CyclicMessage {
1609        type Overflow = VecDeque<Self>;
1610
1611        fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1612            overflow.push_back(message);
1613        }
1614    }
1615
1616    fn record(seen: &AtomicUsize, message: Message) {
1617        let value = match message {
1618            Message::Drop(value) | Message::Spill(value) => value,
1619        };
1620        seen.fetch_or(1usize << usize::from(value), Ordering::AcqRel);
1621    }
1622
1623    fn value(message: OrderedMessage) -> u8 {
1624        match message {
1625            OrderedMessage::Item(value) | OrderedMessage::Coordinated(value, _) => value,
1626        }
1627    }
1628
1629    const fn replacement_value(message: ReplacingMessage) -> Option<u8> {
1630        match message {
1631            ReplacingMessage::FillReady => None,
1632            ReplacingMessage::Replace(value) => Some(value),
1633        }
1634    }
1635
1636    unsafe fn clone_counter(data: *const ()) -> RawWaker {
1637        // SAFETY: `data` was created by `Arc::into_raw` for an `AtomicUsize`
1638        // in `counting_waker` or this function's clone path.
1639        let wakes = unsafe { Arc::<AtomicUsize>::from_raw(data.cast()) };
1640        let cloned = wakes.clone();
1641        let _ = Arc::into_raw(wakes);
1642        RawWaker::new(Arc::into_raw(cloned).cast(), &COUNTER_WAKER_VTABLE)
1643    }
1644
1645    unsafe fn wake_counter(data: *const ()) {
1646        // SAFETY: `data` owns one raw `Arc<AtomicUsize>` reference for this
1647        // consuming wake path.
1648        let wakes = unsafe { Arc::<AtomicUsize>::from_raw(data.cast()) };
1649        wakes.fetch_add(1, Ordering::AcqRel);
1650    }
1651
1652    unsafe fn wake_counter_by_ref(data: *const ()) {
1653        // SAFETY: `data` is a borrowed raw `Arc<AtomicUsize>` reference. The
1654        // reference is converted back into raw form before returning.
1655        let wakes = unsafe { Arc::<AtomicUsize>::from_raw(data.cast()) };
1656        wakes.fetch_add(1, Ordering::AcqRel);
1657        let _ = Arc::into_raw(wakes);
1658    }
1659
1660    unsafe fn drop_counter(data: *const ()) {
1661        // SAFETY: `data` owns one raw `Arc<AtomicUsize>` reference that should
1662        // be dropped by the waker.
1663        unsafe {
1664            drop(Arc::<AtomicUsize>::from_raw(data.cast()));
1665        }
1666    }
1667
1668    static COUNTER_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
1669        clone_counter,
1670        wake_counter,
1671        wake_counter_by_ref,
1672        drop_counter,
1673    );
1674
1675    fn counting_waker(wakes: Arc<AtomicUsize>) -> Waker {
1676        let raw = RawWaker::new(Arc::into_raw(wakes).cast(), &COUNTER_WAKER_VTABLE);
1677        // SAFETY: The vtable above reconstructs the same `Arc<AtomicUsize>`
1678        // type and preserves the raw waker reference-counting contract.
1679        unsafe { Waker::from_raw(raw) }
1680    }
1681
1682    #[test]
1683    fn sender_drop_racing_waker_registration_wakes_or_disconnects() {
1684        loom::model(|| {
1685            let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1686            let wakes = Arc::new(AtomicUsize::new(0));
1687            let waker = counting_waker(wakes.clone());
1688            let mut cx = Context::from_waker(&waker);
1689
1690            let close = thread::spawn(move || {
1691                drop(sender);
1692            });
1693
1694            let poll = receiver.state.poll_recv(&mut cx);
1695            close.join().unwrap();
1696
1697            match poll {
1698                Poll::Ready(None) => {}
1699                Poll::Pending => {
1700                    assert!(wakes.load(Ordering::Acquire) > 0);
1701                    assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None));
1702                }
1703                Poll::Ready(Some(_)) => panic!("unexpected message"),
1704            }
1705        });
1706    }
1707
1708    #[test]
1709    fn sender_enqueue_then_drop_racing_poll_recv_drains_message() {
1710        loom::model(|| {
1711            let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1712            let wakes = Arc::new(AtomicUsize::new(0));
1713            let waker = counting_waker(wakes.clone());
1714            let mut cx = Context::from_waker(&waker);
1715
1716            let enqueue = thread::spawn(move || {
1717                assert_eq!(
1718                    sender.enqueue(Message::Spill(0)),
1719                    Unreliable::new(Feedback::Ok)
1720                );
1721            });
1722
1723            let poll = receiver.state.poll_recv(&mut cx);
1724            enqueue.join().unwrap();
1725
1726            match poll {
1727                Poll::Ready(Some(Message::Spill(0))) => {}
1728                Poll::Pending => {
1729                    assert!(wakes.load(Ordering::Acquire) > 0);
1730                    assert_eq!(
1731                        receiver.state.poll_recv(&mut cx),
1732                        Poll::Ready(Some(Message::Spill(0)))
1733                    );
1734                }
1735                Poll::Ready(None) => panic!("disconnected before draining message"),
1736                Poll::Ready(Some(message)) => panic!("unexpected message: {message:?}"),
1737            }
1738
1739            assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None));
1740        });
1741    }
1742
1743    #[test]
1744    fn sender_enqueue_then_drop_racing_try_recv_drains_message() {
1745        loom::model(|| {
1746            let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
1747
1748            let enqueue = thread::spawn(move || {
1749                assert_eq!(
1750                    sender.enqueue(Message::Spill(0)),
1751                    Unreliable::new(Feedback::Ok)
1752                );
1753            });
1754
1755            let result = receiver.try_recv();
1756            enqueue.join().unwrap();
1757
1758            match result {
1759                Ok(Message::Spill(0)) => {}
1760                Err(TryRecvError::Empty) => {
1761                    assert_eq!(receiver.try_recv(), Ok(Message::Spill(0)));
1762                }
1763                Err(TryRecvError::Disconnected) => {
1764                    panic!("disconnected before draining message");
1765                }
1766                Ok(message) => panic!("unexpected message: {message:?}"),
1767            }
1768
1769            assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
1770        });
1771    }
1772
1773    #[test]
1774    fn handled_enqueue_wakes_registered_receiver() {
1775        loom::model(|| {
1776            let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
1777            let wakes = Arc::new(AtomicUsize::new(0));
1778            let waker = counting_waker(wakes.clone());
1779            let mut cx = Context::from_waker(&waker);
1780
1781            let next = receiver.recv();
1782            pin_mut!(next);
1783            assert!(matches!(next.as_mut().poll(&mut cx), Poll::Pending));
1784            assert_eq!(
1785                sender.enqueue(Message::Spill(0)),
1786                Unreliable::new(Feedback::Ok)
1787            );
1788
1789            assert_eq!(wakes.load(Ordering::Acquire), 1);
1790            assert_eq!(
1791                next.as_mut().poll(&mut cx),
1792                Poll::Ready(Some(Message::Spill(0)))
1793            );
1794        });
1795    }
1796
1797    #[test]
1798    fn receiver_drop_racing_ready_fast_path_feedback_wakes_if_ready() {
1799        loom::model(|| {
1800            let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1801            let wakes = Arc::new(AtomicUsize::new(0));
1802            let waker = counting_waker(wakes.clone());
1803            let mut cx = Context::from_waker(&waker);
1804
1805            assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1806
1807            let close = thread::spawn(move || {
1808                drop(receiver);
1809            });
1810            let feedback = sender.enqueue(Message::Spill(0));
1811            close.join().unwrap();
1812
1813            if feedback.accepted() {
1814                assert!(wakes.load(Ordering::Acquire) > 0);
1815            } else {
1816                assert_eq!(feedback, Unreliable::new(Feedback::Closed));
1817            }
1818            assert_eq!(
1819                sender.enqueue(Message::Spill(1)),
1820                Unreliable::new(Feedback::Closed)
1821            );
1822        });
1823    }
1824
1825    #[test]
1826    fn receiver_drop_racing_ready_enqueue_drops_message() {
1827        loom::model(|| {
1828            let (sender, receiver) = new::<TrackedMessage>(NZUsize!(1));
1829            let drops = Arc::new(AtomicUsize::new(0));
1830
1831            let close = thread::spawn(move || {
1832                drop(receiver);
1833            });
1834            let _ = sender.enqueue(TrackedMessage::new(drops.clone()));
1835            close.join().unwrap();
1836
1837            assert_eq!(drops.load(Ordering::Acquire), 1);
1838        });
1839    }
1840
1841    #[test]
1842    fn receiver_drop_racing_overflow_enqueue_drops_messages() {
1843        loom::model(|| {
1844            let (sender, receiver) = new::<TrackedMessage>(NZUsize!(1));
1845            let ready_drops = Arc::new(AtomicUsize::new(0));
1846            let overflow_drops = Arc::new(AtomicUsize::new(0));
1847
1848            assert_eq!(
1849                sender.enqueue(TrackedMessage::new(ready_drops.clone())),
1850                Feedback::Ok
1851            );
1852            let close = thread::spawn(move || {
1853                drop(receiver);
1854            });
1855            let _ = sender.enqueue(TrackedMessage::new(overflow_drops.clone()));
1856            close.join().unwrap();
1857
1858            assert_eq!(ready_drops.load(Ordering::Acquire), 1);
1859            assert_eq!(overflow_drops.load(Ordering::Acquire), 1);
1860        });
1861    }
1862
1863    #[test]
1864    fn receiver_drop_drains_ready_message_published_under_overflow_lock() {
1865        loom::model(|| {
1866            let (sender, receiver) = new::<TrackedMessage>(NZUsize!(1));
1867            let drops = Arc::new(AtomicUsize::new(0));
1868            let mutation = Mutation::begin(&sender.state.overflow.activity);
1869            let queue = lock(&sender.state.overflow.queue);
1870
1871            let close = thread::spawn(move || {
1872                drop(receiver);
1873            });
1874
1875            assert!(sender
1876                .state
1877                .ready
1878                .push(TrackedMessage::new(drops.clone()))
1879                .is_ok());
1880            mutation.publish(queue.is_empty());
1881            drop(queue);
1882            drop(mutation);
1883            close.join().unwrap();
1884
1885            assert_eq!(drops.load(Ordering::Acquire), 1);
1886        });
1887    }
1888
1889    #[test]
1890    fn receiver_drop_drains_overflow_message_published_under_overflow_lock() {
1891        loom::model(|| {
1892            let (sender, receiver) = new::<TrackedMessage>(NZUsize!(1));
1893            let ready_drops = Arc::new(AtomicUsize::new(0));
1894            let overflow_drops = Arc::new(AtomicUsize::new(0));
1895
1896            assert_eq!(
1897                sender.enqueue(TrackedMessage::new(ready_drops.clone())),
1898                Feedback::Ok
1899            );
1900
1901            let mutation = Mutation::begin(&sender.state.overflow.activity);
1902            let mut queue = lock(&sender.state.overflow.queue);
1903            let close = thread::spawn(move || {
1904                drop(receiver);
1905            });
1906
1907            queue.push_back(TrackedMessage::new(overflow_drops.clone()));
1908            mutation.publish(queue.is_empty());
1909            drop(queue);
1910            drop(mutation);
1911            close.join().unwrap();
1912
1913            assert_eq!(ready_drops.load(Ordering::Acquire), 1);
1914            assert_eq!(overflow_drops.load(Ordering::Acquire), 1);
1915        });
1916    }
1917
1918    #[test]
1919    fn receiver_drop_breaks_message_sender_cycle() {
1920        loom::model(|| {
1921            let (sender, receiver) = new::<CyclicMessage>(NZUsize!(1));
1922            let drops = Arc::new(AtomicUsize::new(0));
1923
1924            assert_eq!(
1925                sender.enqueue(CyclicMessage {
1926                    _sender: sender.clone(),
1927                    drops: drops.clone(),
1928                }),
1929                Feedback::Ok
1930            );
1931            assert_eq!(
1932                sender.enqueue(CyclicMessage {
1933                    _sender: sender.clone(),
1934                    drops: drops.clone(),
1935                }),
1936                Feedback::Backoff
1937            );
1938
1939            drop(receiver);
1940
1941            assert_eq!(drops.load(Ordering::Acquire), 2);
1942            assert_eq!(
1943                sender.enqueue(CyclicMessage {
1944                    _sender: sender.clone(),
1945                    drops,
1946                }),
1947                Feedback::Closed
1948            );
1949        });
1950    }
1951
1952    #[test]
1953    fn concurrent_close_and_ready_enqueue_remains_closed() {
1954        loom::model(|| {
1955            let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1956
1957            let enqueue_sender = sender.clone();
1958            let enqueue = thread::spawn(move || {
1959                let _ = enqueue_sender.enqueue(Message::Spill(1));
1960            });
1961
1962            let close = thread::spawn(move || {
1963                drop(receiver);
1964            });
1965
1966            enqueue.join().unwrap();
1967            close.join().unwrap();
1968            assert_eq!(
1969                sender.enqueue(Message::Spill(2)),
1970                Unreliable::new(Feedback::Closed)
1971            );
1972        });
1973    }
1974
1975    #[test]
1976    fn concurrent_close_and_overflow_enqueue_remains_closed() {
1977        loom::model(|| {
1978            let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1979            assert_eq!(
1980                sender.enqueue(Message::Drop(0)),
1981                Unreliable::new(Feedback::Ok)
1982            );
1983
1984            let enqueue_sender = sender.clone();
1985            let enqueue = thread::spawn(move || {
1986                let _ = enqueue_sender.enqueue(Message::Spill(1));
1987            });
1988
1989            let close = thread::spawn(move || {
1990                drop(receiver);
1991            });
1992
1993            enqueue.join().unwrap();
1994            close.join().unwrap();
1995            assert_eq!(
1996                sender.enqueue(Message::Spill(2)),
1997                Unreliable::new(Feedback::Closed)
1998            );
1999        });
2000    }
2001
2002    #[test]
2003    fn concurrent_spill_and_refill_preserves_messages() {
2004        loom::model(|| {
2005            let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
2006            let idle_sender = sender.clone();
2007            assert_eq!(
2008                sender.enqueue(Message::Spill(0)),
2009                Unreliable::new(Feedback::Ok)
2010            );
2011
2012            let seen = Arc::new(AtomicUsize::new(0));
2013            let enqueue = thread::spawn(move || {
2014                let feedback = sender.enqueue(Message::Spill(1));
2015                assert!(feedback.accepted());
2016            });
2017
2018            let seen_by_receiver = seen.clone();
2019            let recv = thread::spawn(move || {
2020                if let Ok(message) = receiver.try_recv() {
2021                    record(&seen_by_receiver, message);
2022                }
2023                receiver
2024            });
2025
2026            enqueue.join().unwrap();
2027            let mut receiver = recv.join().unwrap();
2028
2029            while let Ok(message) = receiver.try_recv() {
2030                record(&seen, message);
2031            }
2032            assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
2033            drop(idle_sender);
2034            assert_eq!(seen.load(Ordering::Acquire), 0b11);
2035        });
2036    }
2037
2038    #[test]
2039    fn concurrent_spill_senders_preserve_messages() {
2040        loom::model(|| {
2041            let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
2042            let idle_sender = sender.clone();
2043            assert_eq!(
2044                sender.enqueue(Message::Spill(0)),
2045                Unreliable::new(Feedback::Ok)
2046            );
2047
2048            let sender_1 = sender.clone();
2049            let enqueue_1 = thread::spawn(move || sender_1.enqueue(Message::Spill(1)));
2050            let enqueue_2 = thread::spawn(move || sender.enqueue(Message::Spill(2)));
2051
2052            let seen = Arc::new(AtomicUsize::new(0));
2053
2054            assert!(enqueue_1.join().unwrap().accepted());
2055            assert!(enqueue_2.join().unwrap().accepted());
2056
2057            while let Ok(message) = receiver.try_recv() {
2058                record(&seen, message);
2059            }
2060            assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
2061            drop(idle_sender);
2062            assert_eq!(seen.load(Ordering::Acquire), 0b111);
2063        });
2064    }
2065
2066    #[test]
2067    fn concurrent_replace_keeps_one_overflow_message() {
2068        loom::model(|| {
2069            let (sender, mut receiver) = new_unreliable::<ReplacingMessage>(NZUsize!(1));
2070            let idle_sender = sender.clone();
2071            assert_eq!(
2072                sender.enqueue(ReplacingMessage::FillReady),
2073                Unreliable::new(Feedback::Ok)
2074            );
2075            assert_eq!(
2076                sender.enqueue(ReplacingMessage::Replace(1)),
2077                Unreliable::new(Feedback::Backoff)
2078            );
2079
2080            let sender_1 = sender.clone();
2081            let replace_1 = thread::spawn(move || sender_1.enqueue(ReplacingMessage::Replace(2)));
2082            let replace_2 = thread::spawn(move || sender.enqueue(ReplacingMessage::Replace(3)));
2083
2084            assert_eq!(
2085                replace_1.join().unwrap(),
2086                Unreliable::new(Feedback::Backoff)
2087            );
2088            assert_eq!(
2089                replace_2.join().unwrap(),
2090                Unreliable::new(Feedback::Backoff)
2091            );
2092            assert_eq!(receiver.try_recv(), Ok(ReplacingMessage::FillReady));
2093
2094            let retained = replacement_value(receiver.try_recv().unwrap()).unwrap();
2095            assert!(retained == 2 || retained == 3);
2096            assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
2097            drop(idle_sender);
2098        });
2099    }
2100
2101    #[test]
2102    fn stale_overflow_hint_retries_ready_before_policy() {
2103        loom::model(|| {
2104            let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(2));
2105            assert_eq!(
2106                sender.enqueue(Message::Drop(0)),
2107                Unreliable::new(Feedback::Ok)
2108            );
2109            assert_eq!(
2110                sender.enqueue(Message::Drop(1)),
2111                Unreliable::new(Feedback::Ok)
2112            );
2113            assert_eq!(
2114                sender.enqueue(Message::Spill(2)),
2115                Unreliable::new(Feedback::Backoff)
2116            );
2117
2118            assert_eq!(receiver.try_recv(), Ok(Message::Drop(0)));
2119            assert_eq!(receiver.try_recv(), Ok(Message::Drop(1)));
2120
2121            assert_eq!(
2122                sender.enqueue(Message::Drop(3)),
2123                Unreliable::new(Feedback::Ok)
2124            );
2125            assert_eq!(receiver.try_recv(), Ok(Message::Spill(2)));
2126            assert_eq!(receiver.try_recv(), Ok(Message::Drop(3)));
2127        });
2128    }
2129
2130    #[test]
2131    fn concurrent_overflow_cannot_be_bypassed_by_ready_fast_path() {
2132        loom::model(|| {
2133            let (sender, mut receiver) = new::<OrderedMessage>(NZUsize!(2));
2134            assert_eq!(sender.enqueue(OrderedMessage::Item(0)), Feedback::Ok);
2135            assert_eq!(sender.enqueue(OrderedMessage::Item(1)), Feedback::Ok);
2136
2137            let gate = Arc::new(AtomicUsize::new(0));
2138            let overflow_sender = sender.clone();
2139            let overflow_gate = gate.clone();
2140            let overflow = thread::spawn(move || {
2141                assert_eq!(
2142                    overflow_sender.enqueue(OrderedMessage::Coordinated(2, overflow_gate)),
2143                    Feedback::Backoff
2144                );
2145            });
2146
2147            while gate.load(Ordering::Acquire) == 0 {
2148                thread::yield_now();
2149            }
2150
2151            // Message 2 has already been spilled. Even without cross-sender
2152            // FIFO, later enqueue calls must not bypass retained overflow.
2153            let mut observed = vec![value(receiver.try_recv().unwrap())];
2154            gate.store(2, Ordering::Release);
2155            let feedback = sender.enqueue(OrderedMessage::Item(3));
2156            assert!(feedback.accepted());
2157
2158            overflow.join().unwrap();
2159            while let Ok(message) = receiver.try_recv() {
2160                observed.push(value(message));
2161            }
2162
2163            assert_eq!(observed, vec![0, 1, 2, 3]);
2164        });
2165    }
2166
2167    #[test]
2168    fn concurrent_overflow_mutation_does_not_hide_published_overflow() {
2169        loom::model(|| {
2170            let (sender, mut receiver) = new::<OrderedMessage>(NZUsize!(1));
2171            assert_eq!(sender.enqueue(OrderedMessage::Item(0)), Feedback::Ok);
2172            assert_eq!(sender.enqueue(OrderedMessage::Item(1)), Feedback::Backoff);
2173
2174            let gate = Arc::new(AtomicUsize::new(0));
2175            let overflow_gate = gate.clone();
2176            let overflow = thread::spawn(move || {
2177                sender.enqueue(OrderedMessage::Coordinated(2, overflow_gate))
2178            });
2179
2180            while gate.load(Ordering::Acquire) == 0 {
2181                thread::yield_now();
2182            }
2183
2184            let release_gate = gate;
2185            let release = thread::spawn(move || {
2186                release_gate.store(2, Ordering::Release);
2187            });
2188
2189            let receive = thread::spawn(move || {
2190                assert_eq!(receiver.try_recv().map(value), Ok(0));
2191                assert_eq!(receiver.try_recv().map(value), Ok(1));
2192                receiver
2193            });
2194
2195            release.join().unwrap();
2196            let mut receiver = receive.join().unwrap();
2197            assert_eq!(overflow.join().unwrap(), Feedback::Backoff);
2198            assert_eq!(receiver.try_recv().map(value), Ok(2));
2199        });
2200    }
2201
2202    #[test]
2203    fn published_overflow_wakes_pending_receiver() {
2204        loom::model(|| {
2205            let (sender, mut receiver) = new::<OrderedMessage>(NZUsize!(1));
2206            let wakes = Arc::new(AtomicUsize::new(0));
2207            let waker = counting_waker(wakes.clone());
2208            let mut cx = Context::from_waker(&waker);
2209
2210            let gate = Arc::new(AtomicUsize::new(0));
2211            let overflow = {
2212                let next = receiver.recv();
2213                pin_mut!(next);
2214                assert!(matches!(next.as_mut().poll(&mut cx), Poll::Pending));
2215
2216                assert_eq!(sender.enqueue(OrderedMessage::Item(0)), Feedback::Ok);
2217                while wakes.load(Ordering::Acquire) == 0 {
2218                    thread::yield_now();
2219                }
2220
2221                let overflow_gate = gate.clone();
2222                let overflow = thread::spawn(move || {
2223                    sender.enqueue(OrderedMessage::Coordinated(1, overflow_gate))
2224                });
2225
2226                while gate.load(Ordering::Acquire) == 0 {
2227                    thread::yield_now();
2228                }
2229
2230                assert_eq!(
2231                    next.as_mut()
2232                        .poll(&mut cx)
2233                        .map(|message| message.map(value)),
2234                    Poll::Ready(Some(0))
2235                );
2236                overflow
2237            };
2238
2239            {
2240                let next = receiver.recv();
2241                pin_mut!(next);
2242                assert!(matches!(next.as_mut().poll(&mut cx), Poll::Pending));
2243                assert_eq!(wakes.load(Ordering::Acquire), 1);
2244
2245                gate.store(2, Ordering::Release);
2246                while wakes.load(Ordering::Acquire) < 2 {
2247                    thread::yield_now();
2248                }
2249
2250                assert_eq!(
2251                    next.as_mut()
2252                        .poll(&mut cx)
2253                        .map(|message| message.map(value)),
2254                    Poll::Ready(Some(1))
2255                );
2256            }
2257            assert_eq!(overflow.join().unwrap(), Feedback::Backoff);
2258        });
2259    }
2260
2261    #[test]
2262    fn concurrent_refill_and_enqueue_preserves_overflow_order() {
2263        loom::model(|| {
2264            let (sender, mut receiver) = new::<OrderedMessage>(NZUsize!(1));
2265            assert_eq!(sender.enqueue(OrderedMessage::Item(0)), Feedback::Ok);
2266            assert_eq!(sender.enqueue(OrderedMessage::Item(1)), Feedback::Backoff);
2267
2268            let enqueue = thread::spawn(move || sender.enqueue(OrderedMessage::Item(2)));
2269            let receive = thread::spawn(move || {
2270                assert_eq!(receiver.try_recv().map(value), Ok(0));
2271                receiver
2272            });
2273
2274            let mut receiver = receive.join().unwrap();
2275            assert_eq!(enqueue.join().unwrap(), Feedback::Backoff);
2276            assert_eq!(receiver.try_recv().map(value), Ok(1));
2277            assert_eq!(receiver.try_recv().map(value), Ok(2));
2278        });
2279    }
2280}