1use crate::{Feedback, Unreliable};
44use commonware_runtime::{
45 telemetry::metrics::{Counter, MetricsExt as _},
46 Metrics,
47};
48use std::{
49 collections::VecDeque,
50 fmt,
51 future::poll_fn,
52 marker::PhantomData,
53 num::NonZeroUsize,
54 sync::mpsc::TryRecvError,
55 task::{Context, Poll},
56};
57
58pub trait Overflow<T>: Default {
60 fn is_empty(&self) -> bool;
62
63 fn drain<F>(&mut self, push: F)
69 where
70 F: FnMut(T) -> Option<T>;
71}
72
73impl<T> Overflow<T> for VecDeque<T> {
74 fn is_empty(&self) -> bool {
75 self.is_empty()
76 }
77
78 fn drain<F>(&mut self, mut push: F)
79 where
80 F: FnMut(T) -> Option<T>,
81 {
82 while let Some(message) = self.pop_front() {
83 if let Some(message) = push(message) {
84 self.push_front(message);
85 break;
86 }
87 }
88 }
89}
90
91pub trait Policy: Sized {
93 type Overflow: Overflow<Self>;
95
96 fn handle(overflow: &mut Self::Overflow, message: Self);
112}
113
114pub trait UnreliablePolicy: Sized {
116 type Overflow: Overflow<Self>;
118
119 fn handle(overflow: &mut Self::Overflow, message: Self) -> bool;
140}
141
142mod mode {
144 pub(super) struct Reliable;
146
147 pub(super) struct Unreliable;
149}
150
151trait Mode<T>: Sized {
152 type Overflow: Overflow<T>;
154 type Feedback;
156
157 fn handle(overflow: &mut Self::Overflow, message: T) -> bool;
159 fn ready_feedback(feedback: Feedback) -> Self::Feedback;
161 fn overflow_feedback(handled: bool) -> Self::Feedback;
163 fn is_backoff(feedback: &Self::Feedback) -> bool;
165 fn is_closed(feedback: &Self::Feedback) -> bool;
167}
168
169impl<T: Policy> Mode<T> for mode::Reliable {
170 type Overflow = T::Overflow;
171 type Feedback = Feedback;
172
173 fn handle(overflow: &mut Self::Overflow, message: T) -> bool {
174 T::handle(overflow, message);
175 true
176 }
177
178 fn ready_feedback(feedback: Feedback) -> Self::Feedback {
179 feedback
180 }
181
182 fn overflow_feedback(_handled: bool) -> Self::Feedback {
183 Feedback::Backoff
184 }
185
186 fn is_backoff(feedback: &Self::Feedback) -> bool {
187 *feedback == Feedback::Backoff
188 }
189
190 fn is_closed(feedback: &Self::Feedback) -> bool {
191 *feedback == Feedback::Closed
192 }
193}
194
195impl<T: UnreliablePolicy> Mode<T> for mode::Unreliable {
196 type Overflow = T::Overflow;
197 type Feedback = Unreliable<Feedback>;
198
199 fn handle(overflow: &mut Self::Overflow, message: T) -> bool {
200 T::handle(overflow, message)
201 }
202
203 fn ready_feedback(feedback: Feedback) -> Self::Feedback {
204 Unreliable::new(feedback)
205 }
206
207 fn overflow_feedback(handled: bool) -> Self::Feedback {
208 if handled {
209 Unreliable::new(Feedback::Backoff)
210 } else {
211 Unreliable::Rejected
212 }
213 }
214
215 fn is_backoff(feedback: &Self::Feedback) -> bool {
216 *feedback == Unreliable::new(Feedback::Backoff)
217 }
218
219 fn is_closed(feedback: &Self::Feedback) -> bool {
220 *feedback == Unreliable::new(Feedback::Closed)
221 }
222}
223
224pub struct Sender<T: Policy> {
226 state: Arc<State<T, mode::Reliable>>,
227}
228
229pub struct UnreliableSender<T: UnreliablePolicy> {
231 state: Arc<State<T, mode::Unreliable>>,
232}
233
234impl<T: Policy> Clone for Sender<T> {
235 fn clone(&self) -> Self {
236 Self {
237 state: clone_sender_state(&self.state),
238 }
239 }
240}
241
242impl<T: UnreliablePolicy> Clone for UnreliableSender<T> {
243 fn clone(&self) -> Self {
244 Self {
245 state: clone_sender_state(&self.state),
246 }
247 }
248}
249
250impl<T: Policy> Drop for Sender<T> {
251 fn drop(&mut self) {
252 drop_sender_state(&self.state);
253 }
254}
255
256impl<T: UnreliablePolicy> Drop for UnreliableSender<T> {
257 fn drop(&mut self) {
258 drop_sender_state(&self.state);
259 }
260}
261
262impl<T: Policy> fmt::Debug for Sender<T> {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 fmt_sender_state("Sender", &self.state, f)
265 }
266}
267
268impl<T: UnreliablePolicy> fmt::Debug for UnreliableSender<T> {
269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270 fmt_sender_state("UnreliableSender", &self.state, f)
271 }
272}
273
274impl<T: Policy> Sender<T> {
275 #[must_use = "caller must handle enqueue feedback"]
277 pub fn enqueue(&self, message: T) -> Feedback {
278 self.state.enqueue(message)
279 }
280}
281
282impl<T: UnreliablePolicy> UnreliableSender<T> {
283 #[must_use = "caller must handle enqueue feedback"]
285 pub fn enqueue(&self, message: T) -> Unreliable<Feedback> {
286 self.state.enqueue(message)
287 }
288}
289
290pub struct Receiver<T: Policy> {
297 state: Arc<State<T, mode::Reliable>>,
298}
299
300pub struct UnreliableReceiver<T: UnreliablePolicy> {
307 state: Arc<State<T, mode::Unreliable>>,
308}
309
310impl<T: Policy> Receiver<T> {
311 pub async fn recv(&mut self) -> Option<T> {
316 recv_from(&self.state).await
317 }
318
319 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
324 try_recv_from(&self.state)
325 }
326}
327
328impl<T: UnreliablePolicy> UnreliableReceiver<T> {
329 pub async fn recv(&mut self) -> Option<T> {
334 recv_from(&self.state).await
335 }
336
337 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
342 try_recv_from(&self.state)
343 }
344}
345
346impl<T: Policy> Drop for Receiver<T> {
347 fn drop(&mut self) {
348 self.state.close();
349 }
350}
351
352impl<T: UnreliablePolicy> Drop for UnreliableReceiver<T> {
353 fn drop(&mut self) {
354 self.state.close();
355 }
356}
357
358pub fn new<T: Policy>(metrics: impl Metrics, capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
360 let state = new_state(metrics, capacity);
361 (
362 Sender {
363 state: state.clone(),
364 },
365 Receiver { state },
366 )
367}
368
369pub fn new_unreliable<T: UnreliablePolicy>(
371 metrics: impl Metrics,
372 capacity: NonZeroUsize,
373) -> (UnreliableSender<T>, UnreliableReceiver<T>) {
374 let state = new_state(metrics, capacity);
375 (
376 UnreliableSender {
377 state: state.clone(),
378 },
379 UnreliableReceiver { state },
380 )
381}
382
383const OVERFLOW_HAS_MESSAGES: usize = 1;
409const OVERFLOW_MUTATION: usize = 2;
410
411cfg_if::cfg_if! {
412 if #[cfg(feature = "loom")] {
413 use loom::{
414 future::AtomicWaker,
415 sync::{
416 atomic::{AtomicBool, AtomicUsize, Ordering},
417 Arc, Mutex, MutexGuard,
418 },
419 };
420
421 fn register_waker(waker: &AtomicWaker, task: &std::task::Waker) {
422 waker.register_by_ref(task);
423 }
424
425 fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
426 mutex.lock().unwrap()
427 }
428
429 struct ReadyState<T> {
430 published: VecDeque<T>,
431 reserved: usize,
432 }
433
434 struct Ready<T> {
435 state: Mutex<ReadyState<T>>,
436 capacity: usize,
437 }
438
439 impl<T> Ready<T> {
440 fn new(capacity: usize) -> Self {
441 Self {
442 state: Mutex::new(ReadyState {
443 published: VecDeque::new(),
444 reserved: 0,
445 }),
446 capacity,
447 }
448 }
449
450 const fn capacity(&self) -> usize {
451 self.capacity
452 }
453
454 fn push(&self, message: T) -> Result<(), T> {
455 {
456 let mut state = lock(&self.state);
457 if state.published.len() + state.reserved >= self.capacity {
458 return Err(message);
459 }
460 state.reserved += 1;
461 }
462
463 loom::thread::yield_now();
464
465 let mut state = lock(&self.state);
466 state.reserved -= 1;
467 state.published.push_back(message);
468 Ok(())
469 }
470
471 fn pop(&self) -> Option<T> {
472 loop {
473 let mut state = lock(&self.state);
474 if let Some(message) = state.published.pop_front() {
475 return Some(message);
476 }
477 if state.reserved == 0 {
478 return None;
479 }
480 drop(state);
481 loom::thread::yield_now();
482 }
483 }
484 }
485 } else {
486 use crossbeam_queue::ArrayQueue;
487 use futures_util::task::AtomicWaker;
488 use parking_lot::{Mutex, MutexGuard};
489 use std::sync::{
490 atomic::{AtomicBool, AtomicUsize, Ordering},
491 Arc,
492 };
493
494 fn register_waker(waker: &AtomicWaker, task: &std::task::Waker) {
495 waker.register(task);
496 }
497
498 fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
499 mutex.lock()
500 }
501
502 struct Ready<T> {
503 queue: ArrayQueue<T>,
504 }
505
506 impl<T> Ready<T> {
507 fn new(capacity: usize) -> Self {
508 Self {
509 queue: ArrayQueue::new(capacity),
510 }
511 }
512
513 fn capacity(&self) -> usize {
514 self.queue.capacity()
515 }
516
517 fn push(&self, message: T) -> Result<(), T> {
518 self.queue.push(message)
519 }
520
521 fn pop(&self) -> Option<T> {
522 self.queue.pop()
523 }
524 }
525 }
526}
527
528struct OverflowState<T, M: Mode<T>> {
529 queue: Mutex<M::Overflow>,
530 activity: AtomicUsize,
531 _phantom: PhantomData<fn() -> T>,
532}
533
534impl<T, M: Mode<T>> OverflowState<T, M> {
535 #[allow(clippy::missing_const_for_fn)]
536 fn new() -> Self {
537 Self {
538 queue: Mutex::new(M::Overflow::default()),
539 activity: AtomicUsize::new(0),
540 _phantom: PhantomData,
541 }
542 }
543
544 fn try_ready(&self, ready: &Ready<T>, message: T) -> Result<(), T> {
545 if self.activity.load(Ordering::Relaxed) != 0 {
547 return Err(message);
548 }
549 ready.push(message)
550 }
551
552 fn enqueue_overflow(
553 &self,
554 ready: &Ready<T>,
555 message: T,
556 is_closed: impl Fn() -> bool,
557 ) -> M::Feedback {
558 let mutation = Mutation::begin(&self.activity);
560 let mut queue = lock(&self.queue);
561 if is_closed() {
562 mutation.publish(queue.is_empty());
563 return M::ready_feedback(Feedback::Closed);
564 }
565
566 let message = if queue.is_empty() {
570 match ready.push(message) {
571 Ok(()) => {
572 mutation.publish(queue.is_empty());
573 return M::ready_feedback(Feedback::Ok);
574 }
575 Err(message) => message,
576 }
577 } else {
578 message
579 };
580
581 let handled = M::handle(&mut queue, message);
583 mutation.publish(queue.is_empty());
584 M::overflow_feedback(handled)
585 }
586
587 fn refill(&self, ready: &Ready<T>) {
588 if self.activity.load(Ordering::Relaxed) & OVERFLOW_HAS_MESSAGES == 0 {
590 return;
591 }
592
593 let mutation = Mutation::begin(&self.activity);
594 let mut queue = lock(&self.queue);
595 queue.drain(|message| ready.push(message).err());
596 mutation.publish(queue.is_empty());
597 }
598
599 fn drain(&self, ready: &Ready<T>) {
600 let mutation = Mutation::begin(&self.activity);
602 while ready.pop().is_some() {}
603
604 let mut drained = Vec::new();
607 let mut queue = lock(&self.queue);
608 queue.drain(|message| {
609 drained.push(message);
610 None
611 });
612 mutation.publish(queue.is_empty());
613 drop(queue);
614 drop(drained);
615
616 while ready.pop().is_some() {}
619 }
620}
621
622struct Mutation<'a> {
623 activity: &'a AtomicUsize,
624}
625
626impl<'a> Mutation<'a> {
627 fn begin(activity: &'a AtomicUsize) -> Self {
628 activity.fetch_add(OVERFLOW_MUTATION, Ordering::Relaxed);
629 Self { activity }
630 }
631
632 fn publish(&self, is_empty: bool) {
633 if is_empty {
634 self.activity
635 .fetch_and(!OVERFLOW_HAS_MESSAGES, Ordering::Relaxed);
636 } else {
637 self.activity
638 .fetch_or(OVERFLOW_HAS_MESSAGES, Ordering::Relaxed);
639 }
640 }
641}
642
643impl Drop for Mutation<'_> {
644 fn drop(&mut self) {
645 let previous = self
646 .activity
647 .fetch_sub(OVERFLOW_MUTATION, Ordering::Relaxed);
648 assert!(previous >= OVERFLOW_MUTATION);
649 }
650}
651
652struct State<T, M: Mode<T>> {
653 ready: Ready<T>,
654 overflow: OverflowState<T, M>,
655 backoff: Counter,
656 closed: AtomicBool,
657 senders: AtomicUsize,
658 waker: AtomicWaker,
659}
660
661impl<T, M: Mode<T>> State<T, M> {
662 fn enqueue(&self, message: T) -> M::Feedback {
663 if self.closed.load(Ordering::Acquire) {
665 return M::ready_feedback(Feedback::Closed);
666 }
667
668 let message = match self.overflow.try_ready(&self.ready, message) {
670 Ok(()) => {
671 if self.closed.load(Ordering::Acquire) {
672 self.overflow.drain(&self.ready);
673 return M::ready_feedback(Feedback::Closed);
674 }
675 self.waker.wake();
676 return M::ready_feedback(Feedback::Ok);
677 }
678 Err(message) => message,
679 };
680
681 let feedback = self
683 .overflow
684 .enqueue_overflow(&self.ready, message, || self.closed.load(Ordering::Acquire));
685
686 if M::is_backoff(&feedback) {
688 self.backoff.inc();
689 }
690
691 if !M::is_closed(&feedback) {
696 self.waker.wake();
697 }
698 feedback
699 }
700
701 fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
702 if let Some(message) = self.pop() {
704 return Poll::Ready(Some(message));
705 }
706
707 if self.is_disconnected() {
708 return Poll::Ready(self.pop());
709 }
710
711 register_waker(&self.waker, cx.waker());
712
713 if let Some(message) = self.pop() {
716 return Poll::Ready(Some(message));
717 }
718
719 if self.is_disconnected() {
720 Poll::Ready(self.pop())
721 } else {
722 Poll::Pending
723 }
724 }
725
726 fn pop(&self) -> Option<T> {
727 if let Some(message) = self.ready.pop() {
728 self.overflow.refill(&self.ready);
730 return Some(message);
731 }
732
733 self.overflow.refill(&self.ready);
736 self.ready.pop()
737 }
738
739 fn is_disconnected(&self) -> bool {
740 self.closed.load(Ordering::Acquire) || self.senders.load(Ordering::Acquire) == 0
741 }
742
743 fn close(&self) {
744 self.closed.store(true, Ordering::Release);
745 self.overflow.drain(&self.ready);
746 }
747}
748
749fn new_state<T, M: Mode<T>>(metrics: impl Metrics, capacity: NonZeroUsize) -> Arc<State<T, M>> {
750 Arc::new(State {
751 ready: Ready::new(capacity.get()),
752 overflow: OverflowState::new(),
753 backoff: metrics.counter("backoff", "number of enqueue calls that requested backoff"),
754 closed: AtomicBool::new(false),
755 senders: AtomicUsize::new(1),
756 waker: AtomicWaker::new(),
757 })
758}
759
760fn clone_sender_state<T, M: Mode<T>>(state: &Arc<State<T, M>>) -> Arc<State<T, M>> {
761 state.senders.fetch_add(1, Ordering::Relaxed);
763 state.clone()
764}
765
766fn drop_sender_state<T, M: Mode<T>>(state: &State<T, M>) {
767 let previous = state.senders.fetch_sub(1, Ordering::AcqRel);
768 assert!(previous > 0);
769 if previous == 1 {
771 state.waker.wake();
772 }
773}
774
775fn fmt_sender_state<T, M: Mode<T>>(
776 name: &str,
777 state: &State<T, M>,
778 f: &mut fmt::Formatter<'_>,
779) -> fmt::Result {
780 f.debug_struct(name)
781 .field("capacity", &state.ready.capacity())
782 .field("closed", &state.closed.load(Ordering::Acquire))
783 .finish()
784}
785
786async fn recv_from<T, M: Mode<T>>(state: &State<T, M>) -> Option<T> {
787 poll_fn(|cx| state.poll_recv(cx)).await
788}
789
790fn try_recv_from<T, M: Mode<T>>(state: &State<T, M>) -> Result<T, TryRecvError> {
791 if let Some(message) = state.pop() {
792 return Ok(message);
793 }
794 if state.is_disconnected() {
795 return state.pop().ok_or(TryRecvError::Disconnected);
796 }
797 Err(TryRecvError::Empty)
798}
799
800#[cfg(test)]
801mod mocks {
802 use commonware_runtime::{
803 telemetry::metrics::{Metric, Registered, Registration},
804 Metrics as RuntimeMetrics, Name, Supervisor,
805 };
806 use std::fmt;
807
808 #[derive(Clone, Copy, Debug, Default)]
809 pub(super) struct Metrics;
810
811 impl Supervisor for Metrics {
812 fn name(&self) -> Name {
813 Name::default()
814 }
815
816 fn child(&self, _label: &'static str) -> Self {
817 Self
818 }
819
820 fn with_attribute(self, _key: &'static str, _value: impl fmt::Display) -> Self {
821 self
822 }
823 }
824
825 impl RuntimeMetrics for Metrics {
826 fn register<N: Into<String>, H: Into<String>, M: Metric>(
827 &self,
828 _name: N,
829 _help: H,
830 metric: M,
831 ) -> Registered<M> {
832 Registered::with_registration(metric, Registration::from(()))
833 }
834
835 fn encode(&self) -> String {
836 String::new()
837 }
838 }
839}
840
841#[cfg(all(test, not(feature = "loom")))]
842mod tests {
843 use super::{mocks, *};
844 use commonware_macros::test_async;
845 use commonware_runtime::{deterministic, Runner as _, Supervisor};
846 use commonware_utils::{channel::oneshot, NZUsize};
847 use futures::{
848 pin_mut,
849 task::{waker_ref, ArcWake},
850 FutureExt,
851 };
852 use std::sync::{
853 atomic::{AtomicUsize, Ordering},
854 mpsc::TryRecvError,
855 Arc,
856 };
857
858 fn new<T: Policy>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
859 super::new(mocks::Metrics, capacity)
860 }
861
862 fn new_unreliable<T: UnreliablePolicy>(
863 capacity: NonZeroUsize,
864 ) -> (UnreliableSender<T>, UnreliableReceiver<T>) {
865 super::new_unreliable(mocks::Metrics, capacity)
866 }
867
868 #[derive(Debug, PartialEq, Eq)]
869 enum Message {
870 Update(u64),
871 Vote(u64),
872 Required(u64),
873 Buffered(u64),
874 Hint(u64),
875 }
876
877 impl UnreliablePolicy for Message {
878 type Overflow = VecDeque<Self>;
879
880 fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
881 match message {
882 Self::Update(value) => {
883 if let Some(index) = overflow
884 .iter()
885 .rposition(|pending| matches!(pending, Self::Update(_)))
886 {
887 overflow.remove(index);
888 }
889 overflow.push_back(Self::Update(value));
890 true
891 }
892 Self::Required(_) | Self::Buffered(_) => {
893 overflow.push_back(message);
894 true
895 }
896 Self::Hint(value) => {
897 let Some(index) = overflow
898 .iter()
899 .rposition(|pending| matches!(pending, Self::Update(_)))
900 else {
901 return true;
902 };
903 overflow.remove(index);
904 overflow.push_back(Self::Hint(value));
905 true
906 }
907 Self::Vote(_) => false,
908 }
909 }
910 }
911
912 struct Ack {
913 _sender: oneshot::Sender<()>,
914 }
915
916 impl Policy for Ack {
917 type Overflow = VecDeque<Self>;
918
919 fn handle(overflow: &mut VecDeque<Self>, message: Self) {
920 overflow.push_back(message);
921 }
922 }
923
924 #[derive(Default)]
925 struct WakeCounter {
926 wakes: AtomicUsize,
927 }
928
929 impl WakeCounter {
930 fn count(&self) -> usize {
931 self.wakes.load(Ordering::Acquire)
932 }
933 }
934
935 impl ArcWake for WakeCounter {
936 fn wake_by_ref(arc_self: &Arc<Self>) {
937 arc_self.wakes.fetch_add(1, Ordering::AcqRel);
938 }
939 }
940
941 #[test]
942 fn vecdeque_overflow_drain_stops_after_rejected_message() {
943 let mut overflow = VecDeque::from([Message::Vote(1), Message::Vote(2), Message::Vote(3)]);
944 let mut drained = VecDeque::new();
945
946 Overflow::drain(&mut overflow, |message| {
947 drained.push_back(message);
948 if drained.len() == 2 {
949 drained.pop_back()
950 } else {
951 None
952 }
953 });
954
955 assert_eq!(drained, VecDeque::from([Message::Vote(1)]));
956 assert_eq!(
957 overflow,
958 VecDeque::from([Message::Vote(2), Message::Vote(3)])
959 );
960 }
961
962 #[test_async]
963 async fn full_inbox_replaces_stale_overflow_message() {
964 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
965 assert_eq!(
966 sender.enqueue(Message::Update(1)),
967 Unreliable::new(Feedback::Ok)
968 );
969 assert_eq!(
970 sender.enqueue(Message::Update(2)),
971 Unreliable::new(Feedback::Backoff)
972 );
973 assert_eq!(
974 sender.enqueue(Message::Update(3)),
975 Unreliable::new(Feedback::Backoff)
976 );
977
978 assert_eq!(receiver.recv().await, Some(Message::Update(1)));
979 assert_eq!(receiver.recv().await, Some(Message::Update(3)));
980 }
981
982 #[test_async]
983 async fn policy_can_replace_stale_overflow_at_back() {
984 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
985 assert_eq!(
986 sender.enqueue(Message::Vote(1)),
987 Unreliable::new(Feedback::Ok)
988 );
989 assert_eq!(
990 sender.enqueue(Message::Update(2)),
991 Unreliable::new(Feedback::Backoff)
992 );
993 assert_eq!(
994 sender.enqueue(Message::Required(3)),
995 Unreliable::new(Feedback::Backoff)
996 );
997 assert_eq!(
998 sender.enqueue(Message::Update(4)),
999 Unreliable::new(Feedback::Backoff)
1000 );
1001
1002 assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1003 assert_eq!(receiver.recv().await, Some(Message::Required(3)));
1004 assert_eq!(receiver.recv().await, Some(Message::Update(4)));
1005 }
1006
1007 #[test_async]
1008 async fn full_inbox_rejects_non_replaceable_message() {
1009 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1010 assert_eq!(
1011 sender.enqueue(Message::Vote(1)),
1012 Unreliable::new(Feedback::Ok)
1013 );
1014 assert_eq!(sender.enqueue(Message::Vote(2)), Unreliable::Rejected);
1015
1016 assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1017 }
1018
1019 #[test_async]
1020 async fn full_inbox_retains_required_message() {
1021 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1022 assert_eq!(
1023 sender.enqueue(Message::Vote(1)),
1024 Unreliable::new(Feedback::Ok)
1025 );
1026 assert_eq!(
1027 sender.enqueue(Message::Buffered(2)),
1028 Unreliable::new(Feedback::Backoff)
1029 );
1030
1031 assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1032 assert_eq!(receiver.recv().await, Some(Message::Buffered(2)));
1033 }
1034
1035 #[test]
1036 fn try_recv_refills_from_overflow() {
1037 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1038 assert_eq!(
1039 sender.enqueue(Message::Vote(1)),
1040 Unreliable::new(Feedback::Ok)
1041 );
1042 assert_eq!(
1043 sender.enqueue(Message::Buffered(2)),
1044 Unreliable::new(Feedback::Backoff)
1045 );
1046
1047 assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1048 assert_eq!(receiver.try_recv(), Ok(Message::Buffered(2)));
1049 }
1050
1051 #[test]
1052 fn backoff_metric_counts_backoff_feedback() {
1053 let executor = deterministic::Runner::default();
1054 executor.start(|context| async move {
1055 let (sender, _receiver) = super::new_unreliable(context.child("mailbox"), NZUsize!(1));
1056 assert_eq!(
1057 sender.enqueue(Message::Vote(1)),
1058 Unreliable::new(Feedback::Ok)
1059 );
1060 assert_eq!(
1061 sender.enqueue(Message::Buffered(2)),
1062 Unreliable::new(Feedback::Backoff)
1063 );
1064 assert_eq!(
1065 sender.enqueue(Message::Buffered(3)),
1066 Unreliable::new(Feedback::Backoff)
1067 );
1068
1069 let buffer = context.encode();
1070 assert!(
1071 buffer.contains("mailbox_backoff_total 2"),
1072 "missing backoff count in metrics: {buffer}"
1073 );
1074 });
1075 }
1076
1077 #[test]
1078 fn unreliable_rejected_feedback_is_not_accepted_or_counted_as_backoff() {
1079 let executor = deterministic::Runner::default();
1080 executor.start(|context| async move {
1081 let (sender, _receiver) = super::new_unreliable(context.child("mailbox"), NZUsize!(1));
1082 assert_eq!(
1083 sender.enqueue(Message::Vote(1)),
1084 Unreliable::new(Feedback::Ok)
1085 );
1086 let feedback = sender.enqueue(Message::Vote(2));
1087
1088 assert_eq!(feedback, Unreliable::Rejected);
1089 assert!(!feedback.accepted());
1090
1091 let buffer = context.encode();
1092 assert!(
1093 buffer.contains("mailbox_backoff_total 0"),
1094 "unexpected backoff count in metrics: {buffer}"
1095 );
1096 });
1097 }
1098
1099 #[test]
1100 fn try_recv_drains_buffered_messages_after_senders_drop() {
1101 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1102 assert_eq!(
1103 sender.enqueue(Message::Vote(1)),
1104 Unreliable::new(Feedback::Ok)
1105 );
1106 assert_eq!(
1107 sender.enqueue(Message::Buffered(2)),
1108 Unreliable::new(Feedback::Backoff)
1109 );
1110 drop(sender);
1111
1112 assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1113 assert_eq!(receiver.try_recv(), Ok(Message::Buffered(2)));
1114 assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
1115 }
1116
1117 #[test]
1118 fn poll_recv_drains_buffered_messages_after_senders_drop() {
1119 let (sender, receiver) = new_unreliable(NZUsize!(1));
1120 let wakes = Arc::new(WakeCounter::default());
1121 let waker = waker_ref(&wakes);
1122 let mut cx = Context::from_waker(&waker);
1123
1124 assert_eq!(
1125 sender.enqueue(Message::Vote(1)),
1126 Unreliable::new(Feedback::Ok)
1127 );
1128 assert_eq!(
1129 sender.enqueue(Message::Buffered(2)),
1130 Unreliable::new(Feedback::Backoff)
1131 );
1132 drop(sender);
1133
1134 assert_eq!(
1135 receiver.state.poll_recv(&mut cx),
1136 Poll::Ready(Some(Message::Vote(1)))
1137 );
1138 assert_eq!(
1139 receiver.state.poll_recv(&mut cx),
1140 Poll::Ready(Some(Message::Buffered(2)))
1141 );
1142 assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None));
1143 }
1144
1145 #[test]
1146 fn enqueue_uses_ready_capacity_after_partial_drain() {
1147 let (sender, mut receiver) = new_unreliable(NZUsize!(2));
1148 assert_eq!(
1149 sender.enqueue(Message::Vote(1)),
1150 Unreliable::new(Feedback::Ok)
1151 );
1152 assert_eq!(
1153 sender.enqueue(Message::Vote(2)),
1154 Unreliable::new(Feedback::Ok)
1155 );
1156 assert_eq!(
1157 sender.enqueue(Message::Required(3)),
1158 Unreliable::new(Feedback::Backoff)
1159 );
1160
1161 assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1162 assert_eq!(receiver.try_recv(), Ok(Message::Vote(2)));
1163
1164 assert_eq!(
1165 sender.enqueue(Message::Vote(4)),
1166 Unreliable::new(Feedback::Ok)
1167 );
1168 assert_eq!(receiver.try_recv(), Ok(Message::Required(3)));
1169 assert_eq!(receiver.try_recv(), Ok(Message::Vote(4)));
1170 }
1171
1172 #[test]
1173 fn receiver_refills_overflow_after_partial_drain() {
1174 let (sender, mut receiver) = new_unreliable(NZUsize!(3));
1175 assert_eq!(
1176 sender.enqueue(Message::Vote(1)),
1177 Unreliable::new(Feedback::Ok)
1178 );
1179 assert_eq!(
1180 sender.enqueue(Message::Vote(2)),
1181 Unreliable::new(Feedback::Ok)
1182 );
1183 assert_eq!(
1184 sender.enqueue(Message::Vote(3)),
1185 Unreliable::new(Feedback::Ok)
1186 );
1187 assert_eq!(
1188 sender.enqueue(Message::Required(4)),
1189 Unreliable::new(Feedback::Backoff)
1190 );
1191
1192 assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1193 assert_eq!(receiver.try_recv(), Ok(Message::Vote(2)));
1194
1195 assert_eq!(
1196 sender.enqueue(Message::Vote(5)),
1197 Unreliable::new(Feedback::Ok)
1198 );
1199 assert_eq!(receiver.try_recv(), Ok(Message::Vote(3)));
1200 assert_eq!(receiver.try_recv(), Ok(Message::Required(4)));
1201 assert_eq!(receiver.try_recv(), Ok(Message::Vote(5)));
1202 }
1203
1204 #[test_async]
1205 async fn full_inbox_retains_unmatched_replaceable_message() {
1206 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1207 assert_eq!(
1208 sender.enqueue(Message::Vote(1)),
1209 Unreliable::new(Feedback::Ok)
1210 );
1211 assert_eq!(
1212 sender.enqueue(Message::Required(2)),
1213 Unreliable::new(Feedback::Backoff)
1214 );
1215
1216 assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1217 assert_eq!(receiver.recv().await, Some(Message::Required(2)));
1218 }
1219
1220 #[test_async]
1221 async fn full_inbox_replaces_stale_overflow_after_ready_fills() {
1222 let (sender, mut receiver) = new_unreliable(NZUsize!(2));
1223 assert_eq!(
1224 sender.enqueue(Message::Vote(1)),
1225 Unreliable::new(Feedback::Ok)
1226 );
1227 assert_eq!(
1228 sender.enqueue(Message::Update(2)),
1229 Unreliable::new(Feedback::Ok)
1230 );
1231 assert_eq!(
1232 sender.enqueue(Message::Update(3)),
1233 Unreliable::new(Feedback::Backoff)
1234 );
1235 assert_eq!(
1236 sender.enqueue(Message::Update(4)),
1237 Unreliable::new(Feedback::Backoff)
1238 );
1239
1240 assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1241 assert_eq!(receiver.recv().await, Some(Message::Update(2)));
1242 assert_eq!(receiver.recv().await, Some(Message::Update(4)));
1243 }
1244
1245 #[test_async]
1246 async fn mailbox_capacity_is_soft_limit_for_required_messages() {
1247 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1248 assert_eq!(
1249 sender.enqueue(Message::Vote(1)),
1250 Unreliable::new(Feedback::Ok)
1251 );
1252 assert_eq!(
1253 sender.enqueue(Message::Required(2)),
1254 Unreliable::new(Feedback::Backoff)
1255 );
1256 assert_eq!(
1257 sender.enqueue(Message::Required(3)),
1258 Unreliable::new(Feedback::Backoff)
1259 );
1260
1261 assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1262 assert_eq!(receiver.recv().await, Some(Message::Required(2)));
1263 assert_eq!(receiver.recv().await, Some(Message::Required(3)));
1264 }
1265
1266 #[test_async]
1267 async fn full_inbox_rejects_hint() {
1268 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1269 assert_eq!(
1270 sender.enqueue(Message::Vote(1)),
1271 Unreliable::new(Feedback::Ok)
1272 );
1273 assert_eq!(
1274 sender.enqueue(Message::Hint(2)),
1275 Unreliable::new(Feedback::Backoff)
1276 );
1277
1278 assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1279 }
1280
1281 #[test_async]
1282 async fn full_inbox_can_replace_or_drop_by_message() {
1283 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1284 assert_eq!(
1285 sender.enqueue(Message::Vote(1)),
1286 Unreliable::new(Feedback::Ok)
1287 );
1288 assert_eq!(
1289 sender.enqueue(Message::Update(2)),
1290 Unreliable::new(Feedback::Backoff)
1291 );
1292 assert_eq!(
1293 sender.enqueue(Message::Hint(3)),
1294 Unreliable::new(Feedback::Backoff)
1295 );
1296
1297 assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
1298 assert_eq!(receiver.recv().await, Some(Message::Hint(3)));
1299 }
1300
1301 #[test_async]
1302 async fn empty_inbox_wakes_on_enqueue() {
1303 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1304
1305 let next = receiver.recv();
1306 pin_mut!(next);
1307 assert!(next.as_mut().now_or_never().is_none());
1308
1309 assert_eq!(
1310 sender.enqueue(Message::Vote(1)),
1311 Unreliable::new(Feedback::Ok)
1312 );
1313 assert_eq!(next.await, Some(Message::Vote(1)));
1314 }
1315
1316 #[test]
1317 fn pending_recv_wakes_when_senders_drop() {
1318 let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1319 let wakes = Arc::new(WakeCounter::default());
1320 let waker = waker_ref(&wakes);
1321 let mut cx = Context::from_waker(&waker);
1322
1323 assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1324 assert_eq!(wakes.count(), 0);
1325
1326 drop(sender);
1327
1328 assert_eq!(wakes.count(), 1);
1329 assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None));
1330 }
1331
1332 #[test]
1333 fn pending_recv_wakes_on_handled_overflow_enqueue() {
1334 let (sender, mut receiver) = new_unreliable(NZUsize!(1));
1335 let wakes = Arc::new(WakeCounter::default());
1336 let waker = waker_ref(&wakes);
1337 let mut cx = Context::from_waker(&waker);
1338
1339 assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1340 assert_eq!(wakes.count(), 0);
1341
1342 assert_eq!(sender.state.ready.push(Message::Vote(1)), Ok(()));
1344 assert_eq!(
1345 sender.enqueue(Message::Buffered(2)),
1346 Unreliable::new(Feedback::Backoff)
1347 );
1348
1349 assert_eq!(wakes.count(), 1);
1350 assert_eq!(receiver.try_recv(), Ok(Message::Vote(1)));
1351 assert_eq!(receiver.try_recv(), Ok(Message::Buffered(2)));
1352 }
1353
1354 #[test]
1355 fn receiver_drop_blocks_ready_fast_path_feedback() {
1356 let (sender, receiver) = new_unreliable(NZUsize!(1));
1357 let wakes = Arc::new(WakeCounter::default());
1358 let waker = waker_ref(&wakes);
1359 let mut cx = Context::from_waker(&waker);
1360
1361 assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1362 drop(receiver);
1363
1364 assert_eq!(
1365 sender.enqueue(Message::Vote(1)),
1366 Unreliable::new(Feedback::Closed)
1367 );
1368 assert_eq!(wakes.count(), 0);
1369 }
1370
1371 #[test_async]
1372 async fn empty_inbox_closes_when_senders_drop() {
1373 let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
1374 drop(sender);
1375
1376 assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
1377 assert_eq!(receiver.recv().await, None);
1378 }
1379
1380 #[test]
1381 fn enqueue_after_receiver_drop_returns_closed() {
1382 let (sender, receiver) = new_unreliable(NZUsize!(1));
1383 drop(receiver);
1384
1385 assert_eq!(
1386 sender.enqueue(Message::Vote(1)),
1387 Unreliable::new(Feedback::Closed)
1388 );
1389 }
1390
1391 #[test_async]
1392 async fn receiver_drop_cancels_buffered_responders() {
1393 let (sender, receiver) = new(NZUsize!(1));
1394 let (ready_tx, ready_rx) = oneshot::channel();
1395 let (overflow_tx, overflow_rx) = oneshot::channel();
1396
1397 assert_eq!(sender.enqueue(Ack { _sender: ready_tx }), Feedback::Ok);
1398 assert_eq!(
1399 sender.enqueue(Ack {
1400 _sender: overflow_tx
1401 }),
1402 Feedback::Backoff
1403 );
1404 drop(receiver);
1405
1406 assert!(ready_rx.await.is_err());
1407 assert!(overflow_rx.await.is_err());
1408 }
1409
1410 #[derive(Debug, PartialEq, Eq)]
1411 enum ClearingMessage {
1412 FillReady,
1413 ClearOverflow,
1414 }
1415
1416 impl Policy for ClearingMessage {
1417 type Overflow = VecDeque<Self>;
1418
1419 fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1420 overflow.push_back(message);
1421 overflow.clear();
1422 }
1423 }
1424
1425 #[test]
1426 fn policy_can_clear_overflow_and_request_backoff() {
1427 let (sender, mut receiver) = new(NZUsize!(1));
1428 assert_eq!(sender.enqueue(ClearingMessage::FillReady), Feedback::Ok);
1429 assert_eq!(
1430 sender.enqueue(ClearingMessage::ClearOverflow),
1431 Feedback::Backoff
1432 );
1433
1434 assert!(matches!(
1435 receiver.try_recv(),
1436 Ok(ClearingMessage::FillReady)
1437 ));
1438 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
1439 }
1440
1441 #[derive(Debug, PartialEq, Eq)]
1442 enum SpillMessage {
1443 FillReady,
1444 Spill,
1445 }
1446
1447 impl Policy for SpillMessage {
1448 type Overflow = VecDeque<Self>;
1449
1450 fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1451 overflow.push_back(message);
1452 }
1453 }
1454
1455 #[test]
1456 fn pending_recv_wakes_when_policy_spills() {
1457 let (sender, mut receiver) = new(NZUsize!(1));
1458 let wakes = Arc::new(WakeCounter::default());
1459 let waker = waker_ref(&wakes);
1460 let mut cx = Context::from_waker(&waker);
1461
1462 assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1463 assert_eq!(wakes.count(), 0);
1464
1465 assert_eq!(sender.state.ready.push(SpillMessage::FillReady), Ok(()));
1466 assert_eq!(sender.enqueue(SpillMessage::Spill), Feedback::Backoff);
1467
1468 assert_eq!(wakes.count(), 1);
1469 assert_eq!(receiver.try_recv(), Ok(SpillMessage::FillReady));
1470 assert_eq!(receiver.try_recv(), Ok(SpillMessage::Spill));
1471 }
1472}
1473
1474#[cfg(all(test, feature = "loom"))]
1475mod loom_tests {
1476 use super::{mocks, *};
1477 use commonware_utils::NZUsize;
1478 use futures::pin_mut;
1479 use loom::{
1480 sync::{
1481 atomic::{AtomicUsize, Ordering},
1482 Arc,
1483 },
1484 thread,
1485 };
1486 use std::{
1487 future::Future,
1488 task::{RawWaker, RawWakerVTable, Waker},
1489 };
1490
1491 fn new<T: Policy>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
1492 super::new(mocks::Metrics, capacity)
1493 }
1494
1495 fn new_unreliable<T: UnreliablePolicy>(
1496 capacity: NonZeroUsize,
1497 ) -> (UnreliableSender<T>, UnreliableReceiver<T>) {
1498 super::new_unreliable(mocks::Metrics, capacity)
1499 }
1500
1501 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1502 enum Message {
1503 Drop(u8),
1504 Spill(u8),
1505 }
1506
1507 #[derive(Clone, Debug)]
1508 enum OrderedMessage {
1509 Item(u8),
1510 Coordinated(u8, Arc<AtomicUsize>),
1511 }
1512
1513 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1514 enum ReplacingMessage {
1515 FillReady,
1516 Replace(u8),
1517 }
1518
1519 struct TrackedMessage {
1520 drops: Arc<AtomicUsize>,
1521 }
1522
1523 struct CyclicMessage {
1524 _sender: Sender<Self>,
1525 drops: Arc<AtomicUsize>,
1526 }
1527
1528 impl TrackedMessage {
1529 const fn new(drops: Arc<AtomicUsize>) -> Self {
1530 Self { drops }
1531 }
1532 }
1533
1534 impl Drop for TrackedMessage {
1535 fn drop(&mut self) {
1536 self.drops.fetch_add(1, Ordering::AcqRel);
1537 }
1538 }
1539
1540 impl Drop for CyclicMessage {
1541 fn drop(&mut self) {
1542 self.drops.fetch_add(1, Ordering::AcqRel);
1543 }
1544 }
1545
1546 impl UnreliablePolicy for Message {
1547 type Overflow = VecDeque<Self>;
1548
1549 fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
1550 match message {
1551 Self::Drop(_) => false,
1552 Self::Spill(_) => {
1553 overflow.push_back(message);
1554 true
1555 }
1556 }
1557 }
1558 }
1559
1560 impl Policy for OrderedMessage {
1561 type Overflow = VecDeque<Self>;
1562
1563 fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1564 let gate = match &message {
1565 Self::Item(_) => None,
1566 Self::Coordinated(_, gate) => Some(gate.clone()),
1567 };
1568 overflow.push_back(message);
1569 if let Some(gate) = gate {
1570 gate.store(1, Ordering::Release);
1571 while gate.load(Ordering::Acquire) == 1 {
1572 thread::yield_now();
1573 }
1574 }
1575 }
1576 }
1577
1578 impl UnreliablePolicy for ReplacingMessage {
1579 type Overflow = VecDeque<Self>;
1580
1581 fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
1582 match message {
1583 Self::FillReady => false,
1584 Self::Replace(_) => {
1585 if let Some(pending) = overflow
1586 .iter_mut()
1587 .rev()
1588 .find(|pending| matches!(pending, Self::Replace(_)))
1589 {
1590 *pending = message;
1591 } else {
1592 overflow.push_back(message);
1593 }
1594 true
1595 }
1596 }
1597 }
1598 }
1599
1600 impl Policy for TrackedMessage {
1601 type Overflow = VecDeque<Self>;
1602
1603 fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1604 overflow.push_back(message);
1605 }
1606 }
1607
1608 impl Policy for CyclicMessage {
1609 type Overflow = VecDeque<Self>;
1610
1611 fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1612 overflow.push_back(message);
1613 }
1614 }
1615
1616 fn record(seen: &AtomicUsize, message: Message) {
1617 let value = match message {
1618 Message::Drop(value) | Message::Spill(value) => value,
1619 };
1620 seen.fetch_or(1usize << usize::from(value), Ordering::AcqRel);
1621 }
1622
1623 fn value(message: OrderedMessage) -> u8 {
1624 match message {
1625 OrderedMessage::Item(value) | OrderedMessage::Coordinated(value, _) => value,
1626 }
1627 }
1628
1629 const fn replacement_value(message: ReplacingMessage) -> Option<u8> {
1630 match message {
1631 ReplacingMessage::FillReady => None,
1632 ReplacingMessage::Replace(value) => Some(value),
1633 }
1634 }
1635
1636 unsafe fn clone_counter(data: *const ()) -> RawWaker {
1637 let wakes = unsafe { Arc::<AtomicUsize>::from_raw(data.cast()) };
1640 let cloned = wakes.clone();
1641 let _ = Arc::into_raw(wakes);
1642 RawWaker::new(Arc::into_raw(cloned).cast(), &COUNTER_WAKER_VTABLE)
1643 }
1644
1645 unsafe fn wake_counter(data: *const ()) {
1646 let wakes = unsafe { Arc::<AtomicUsize>::from_raw(data.cast()) };
1649 wakes.fetch_add(1, Ordering::AcqRel);
1650 }
1651
1652 unsafe fn wake_counter_by_ref(data: *const ()) {
1653 let wakes = unsafe { Arc::<AtomicUsize>::from_raw(data.cast()) };
1656 wakes.fetch_add(1, Ordering::AcqRel);
1657 let _ = Arc::into_raw(wakes);
1658 }
1659
1660 unsafe fn drop_counter(data: *const ()) {
1661 unsafe {
1664 drop(Arc::<AtomicUsize>::from_raw(data.cast()));
1665 }
1666 }
1667
1668 static COUNTER_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
1669 clone_counter,
1670 wake_counter,
1671 wake_counter_by_ref,
1672 drop_counter,
1673 );
1674
1675 fn counting_waker(wakes: Arc<AtomicUsize>) -> Waker {
1676 let raw = RawWaker::new(Arc::into_raw(wakes).cast(), &COUNTER_WAKER_VTABLE);
1677 unsafe { Waker::from_raw(raw) }
1680 }
1681
1682 #[test]
1683 fn sender_drop_racing_waker_registration_wakes_or_disconnects() {
1684 loom::model(|| {
1685 let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1686 let wakes = Arc::new(AtomicUsize::new(0));
1687 let waker = counting_waker(wakes.clone());
1688 let mut cx = Context::from_waker(&waker);
1689
1690 let close = thread::spawn(move || {
1691 drop(sender);
1692 });
1693
1694 let poll = receiver.state.poll_recv(&mut cx);
1695 close.join().unwrap();
1696
1697 match poll {
1698 Poll::Ready(None) => {}
1699 Poll::Pending => {
1700 assert!(wakes.load(Ordering::Acquire) > 0);
1701 assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None));
1702 }
1703 Poll::Ready(Some(_)) => panic!("unexpected message"),
1704 }
1705 });
1706 }
1707
1708 #[test]
1709 fn sender_enqueue_then_drop_racing_poll_recv_drains_message() {
1710 loom::model(|| {
1711 let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1712 let wakes = Arc::new(AtomicUsize::new(0));
1713 let waker = counting_waker(wakes.clone());
1714 let mut cx = Context::from_waker(&waker);
1715
1716 let enqueue = thread::spawn(move || {
1717 assert_eq!(
1718 sender.enqueue(Message::Spill(0)),
1719 Unreliable::new(Feedback::Ok)
1720 );
1721 });
1722
1723 let poll = receiver.state.poll_recv(&mut cx);
1724 enqueue.join().unwrap();
1725
1726 match poll {
1727 Poll::Ready(Some(Message::Spill(0))) => {}
1728 Poll::Pending => {
1729 assert!(wakes.load(Ordering::Acquire) > 0);
1730 assert_eq!(
1731 receiver.state.poll_recv(&mut cx),
1732 Poll::Ready(Some(Message::Spill(0)))
1733 );
1734 }
1735 Poll::Ready(None) => panic!("disconnected before draining message"),
1736 Poll::Ready(Some(message)) => panic!("unexpected message: {message:?}"),
1737 }
1738
1739 assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None));
1740 });
1741 }
1742
1743 #[test]
1744 fn sender_enqueue_then_drop_racing_try_recv_drains_message() {
1745 loom::model(|| {
1746 let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
1747
1748 let enqueue = thread::spawn(move || {
1749 assert_eq!(
1750 sender.enqueue(Message::Spill(0)),
1751 Unreliable::new(Feedback::Ok)
1752 );
1753 });
1754
1755 let result = receiver.try_recv();
1756 enqueue.join().unwrap();
1757
1758 match result {
1759 Ok(Message::Spill(0)) => {}
1760 Err(TryRecvError::Empty) => {
1761 assert_eq!(receiver.try_recv(), Ok(Message::Spill(0)));
1762 }
1763 Err(TryRecvError::Disconnected) => {
1764 panic!("disconnected before draining message");
1765 }
1766 Ok(message) => panic!("unexpected message: {message:?}"),
1767 }
1768
1769 assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected));
1770 });
1771 }
1772
1773 #[test]
1774 fn handled_enqueue_wakes_registered_receiver() {
1775 loom::model(|| {
1776 let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
1777 let wakes = Arc::new(AtomicUsize::new(0));
1778 let waker = counting_waker(wakes.clone());
1779 let mut cx = Context::from_waker(&waker);
1780
1781 let next = receiver.recv();
1782 pin_mut!(next);
1783 assert!(matches!(next.as_mut().poll(&mut cx), Poll::Pending));
1784 assert_eq!(
1785 sender.enqueue(Message::Spill(0)),
1786 Unreliable::new(Feedback::Ok)
1787 );
1788
1789 assert_eq!(wakes.load(Ordering::Acquire), 1);
1790 assert_eq!(
1791 next.as_mut().poll(&mut cx),
1792 Poll::Ready(Some(Message::Spill(0)))
1793 );
1794 });
1795 }
1796
1797 #[test]
1798 fn receiver_drop_racing_ready_fast_path_feedback_wakes_if_ready() {
1799 loom::model(|| {
1800 let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1801 let wakes = Arc::new(AtomicUsize::new(0));
1802 let waker = counting_waker(wakes.clone());
1803 let mut cx = Context::from_waker(&waker);
1804
1805 assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending);
1806
1807 let close = thread::spawn(move || {
1808 drop(receiver);
1809 });
1810 let feedback = sender.enqueue(Message::Spill(0));
1811 close.join().unwrap();
1812
1813 if feedback.accepted() {
1814 assert!(wakes.load(Ordering::Acquire) > 0);
1815 } else {
1816 assert_eq!(feedback, Unreliable::new(Feedback::Closed));
1817 }
1818 assert_eq!(
1819 sender.enqueue(Message::Spill(1)),
1820 Unreliable::new(Feedback::Closed)
1821 );
1822 });
1823 }
1824
1825 #[test]
1826 fn receiver_drop_racing_ready_enqueue_drops_message() {
1827 loom::model(|| {
1828 let (sender, receiver) = new::<TrackedMessage>(NZUsize!(1));
1829 let drops = Arc::new(AtomicUsize::new(0));
1830
1831 let close = thread::spawn(move || {
1832 drop(receiver);
1833 });
1834 let _ = sender.enqueue(TrackedMessage::new(drops.clone()));
1835 close.join().unwrap();
1836
1837 assert_eq!(drops.load(Ordering::Acquire), 1);
1838 });
1839 }
1840
1841 #[test]
1842 fn receiver_drop_racing_overflow_enqueue_drops_messages() {
1843 loom::model(|| {
1844 let (sender, receiver) = new::<TrackedMessage>(NZUsize!(1));
1845 let ready_drops = Arc::new(AtomicUsize::new(0));
1846 let overflow_drops = Arc::new(AtomicUsize::new(0));
1847
1848 assert_eq!(
1849 sender.enqueue(TrackedMessage::new(ready_drops.clone())),
1850 Feedback::Ok
1851 );
1852 let close = thread::spawn(move || {
1853 drop(receiver);
1854 });
1855 let _ = sender.enqueue(TrackedMessage::new(overflow_drops.clone()));
1856 close.join().unwrap();
1857
1858 assert_eq!(ready_drops.load(Ordering::Acquire), 1);
1859 assert_eq!(overflow_drops.load(Ordering::Acquire), 1);
1860 });
1861 }
1862
1863 #[test]
1864 fn receiver_drop_drains_ready_message_published_under_overflow_lock() {
1865 loom::model(|| {
1866 let (sender, receiver) = new::<TrackedMessage>(NZUsize!(1));
1867 let drops = Arc::new(AtomicUsize::new(0));
1868 let mutation = Mutation::begin(&sender.state.overflow.activity);
1869 let queue = lock(&sender.state.overflow.queue);
1870
1871 let close = thread::spawn(move || {
1872 drop(receiver);
1873 });
1874
1875 assert!(sender
1876 .state
1877 .ready
1878 .push(TrackedMessage::new(drops.clone()))
1879 .is_ok());
1880 mutation.publish(queue.is_empty());
1881 drop(queue);
1882 drop(mutation);
1883 close.join().unwrap();
1884
1885 assert_eq!(drops.load(Ordering::Acquire), 1);
1886 });
1887 }
1888
1889 #[test]
1890 fn receiver_drop_drains_overflow_message_published_under_overflow_lock() {
1891 loom::model(|| {
1892 let (sender, receiver) = new::<TrackedMessage>(NZUsize!(1));
1893 let ready_drops = Arc::new(AtomicUsize::new(0));
1894 let overflow_drops = Arc::new(AtomicUsize::new(0));
1895
1896 assert_eq!(
1897 sender.enqueue(TrackedMessage::new(ready_drops.clone())),
1898 Feedback::Ok
1899 );
1900
1901 let mutation = Mutation::begin(&sender.state.overflow.activity);
1902 let mut queue = lock(&sender.state.overflow.queue);
1903 let close = thread::spawn(move || {
1904 drop(receiver);
1905 });
1906
1907 queue.push_back(TrackedMessage::new(overflow_drops.clone()));
1908 mutation.publish(queue.is_empty());
1909 drop(queue);
1910 drop(mutation);
1911 close.join().unwrap();
1912
1913 assert_eq!(ready_drops.load(Ordering::Acquire), 1);
1914 assert_eq!(overflow_drops.load(Ordering::Acquire), 1);
1915 });
1916 }
1917
1918 #[test]
1919 fn receiver_drop_breaks_message_sender_cycle() {
1920 loom::model(|| {
1921 let (sender, receiver) = new::<CyclicMessage>(NZUsize!(1));
1922 let drops = Arc::new(AtomicUsize::new(0));
1923
1924 assert_eq!(
1925 sender.enqueue(CyclicMessage {
1926 _sender: sender.clone(),
1927 drops: drops.clone(),
1928 }),
1929 Feedback::Ok
1930 );
1931 assert_eq!(
1932 sender.enqueue(CyclicMessage {
1933 _sender: sender.clone(),
1934 drops: drops.clone(),
1935 }),
1936 Feedback::Backoff
1937 );
1938
1939 drop(receiver);
1940
1941 assert_eq!(drops.load(Ordering::Acquire), 2);
1942 assert_eq!(
1943 sender.enqueue(CyclicMessage {
1944 _sender: sender.clone(),
1945 drops,
1946 }),
1947 Feedback::Closed
1948 );
1949 });
1950 }
1951
1952 #[test]
1953 fn concurrent_close_and_ready_enqueue_remains_closed() {
1954 loom::model(|| {
1955 let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1956
1957 let enqueue_sender = sender.clone();
1958 let enqueue = thread::spawn(move || {
1959 let _ = enqueue_sender.enqueue(Message::Spill(1));
1960 });
1961
1962 let close = thread::spawn(move || {
1963 drop(receiver);
1964 });
1965
1966 enqueue.join().unwrap();
1967 close.join().unwrap();
1968 assert_eq!(
1969 sender.enqueue(Message::Spill(2)),
1970 Unreliable::new(Feedback::Closed)
1971 );
1972 });
1973 }
1974
1975 #[test]
1976 fn concurrent_close_and_overflow_enqueue_remains_closed() {
1977 loom::model(|| {
1978 let (sender, receiver) = new_unreliable::<Message>(NZUsize!(1));
1979 assert_eq!(
1980 sender.enqueue(Message::Drop(0)),
1981 Unreliable::new(Feedback::Ok)
1982 );
1983
1984 let enqueue_sender = sender.clone();
1985 let enqueue = thread::spawn(move || {
1986 let _ = enqueue_sender.enqueue(Message::Spill(1));
1987 });
1988
1989 let close = thread::spawn(move || {
1990 drop(receiver);
1991 });
1992
1993 enqueue.join().unwrap();
1994 close.join().unwrap();
1995 assert_eq!(
1996 sender.enqueue(Message::Spill(2)),
1997 Unreliable::new(Feedback::Closed)
1998 );
1999 });
2000 }
2001
2002 #[test]
2003 fn concurrent_spill_and_refill_preserves_messages() {
2004 loom::model(|| {
2005 let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
2006 let idle_sender = sender.clone();
2007 assert_eq!(
2008 sender.enqueue(Message::Spill(0)),
2009 Unreliable::new(Feedback::Ok)
2010 );
2011
2012 let seen = Arc::new(AtomicUsize::new(0));
2013 let enqueue = thread::spawn(move || {
2014 let feedback = sender.enqueue(Message::Spill(1));
2015 assert!(feedback.accepted());
2016 });
2017
2018 let seen_by_receiver = seen.clone();
2019 let recv = thread::spawn(move || {
2020 if let Ok(message) = receiver.try_recv() {
2021 record(&seen_by_receiver, message);
2022 }
2023 receiver
2024 });
2025
2026 enqueue.join().unwrap();
2027 let mut receiver = recv.join().unwrap();
2028
2029 while let Ok(message) = receiver.try_recv() {
2030 record(&seen, message);
2031 }
2032 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
2033 drop(idle_sender);
2034 assert_eq!(seen.load(Ordering::Acquire), 0b11);
2035 });
2036 }
2037
2038 #[test]
2039 fn concurrent_spill_senders_preserve_messages() {
2040 loom::model(|| {
2041 let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(1));
2042 let idle_sender = sender.clone();
2043 assert_eq!(
2044 sender.enqueue(Message::Spill(0)),
2045 Unreliable::new(Feedback::Ok)
2046 );
2047
2048 let sender_1 = sender.clone();
2049 let enqueue_1 = thread::spawn(move || sender_1.enqueue(Message::Spill(1)));
2050 let enqueue_2 = thread::spawn(move || sender.enqueue(Message::Spill(2)));
2051
2052 let seen = Arc::new(AtomicUsize::new(0));
2053
2054 assert!(enqueue_1.join().unwrap().accepted());
2055 assert!(enqueue_2.join().unwrap().accepted());
2056
2057 while let Ok(message) = receiver.try_recv() {
2058 record(&seen, message);
2059 }
2060 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
2061 drop(idle_sender);
2062 assert_eq!(seen.load(Ordering::Acquire), 0b111);
2063 });
2064 }
2065
2066 #[test]
2067 fn concurrent_replace_keeps_one_overflow_message() {
2068 loom::model(|| {
2069 let (sender, mut receiver) = new_unreliable::<ReplacingMessage>(NZUsize!(1));
2070 let idle_sender = sender.clone();
2071 assert_eq!(
2072 sender.enqueue(ReplacingMessage::FillReady),
2073 Unreliable::new(Feedback::Ok)
2074 );
2075 assert_eq!(
2076 sender.enqueue(ReplacingMessage::Replace(1)),
2077 Unreliable::new(Feedback::Backoff)
2078 );
2079
2080 let sender_1 = sender.clone();
2081 let replace_1 = thread::spawn(move || sender_1.enqueue(ReplacingMessage::Replace(2)));
2082 let replace_2 = thread::spawn(move || sender.enqueue(ReplacingMessage::Replace(3)));
2083
2084 assert_eq!(
2085 replace_1.join().unwrap(),
2086 Unreliable::new(Feedback::Backoff)
2087 );
2088 assert_eq!(
2089 replace_2.join().unwrap(),
2090 Unreliable::new(Feedback::Backoff)
2091 );
2092 assert_eq!(receiver.try_recv(), Ok(ReplacingMessage::FillReady));
2093
2094 let retained = replacement_value(receiver.try_recv().unwrap()).unwrap();
2095 assert!(retained == 2 || retained == 3);
2096 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
2097 drop(idle_sender);
2098 });
2099 }
2100
2101 #[test]
2102 fn stale_overflow_hint_retries_ready_before_policy() {
2103 loom::model(|| {
2104 let (sender, mut receiver) = new_unreliable::<Message>(NZUsize!(2));
2105 assert_eq!(
2106 sender.enqueue(Message::Drop(0)),
2107 Unreliable::new(Feedback::Ok)
2108 );
2109 assert_eq!(
2110 sender.enqueue(Message::Drop(1)),
2111 Unreliable::new(Feedback::Ok)
2112 );
2113 assert_eq!(
2114 sender.enqueue(Message::Spill(2)),
2115 Unreliable::new(Feedback::Backoff)
2116 );
2117
2118 assert_eq!(receiver.try_recv(), Ok(Message::Drop(0)));
2119 assert_eq!(receiver.try_recv(), Ok(Message::Drop(1)));
2120
2121 assert_eq!(
2122 sender.enqueue(Message::Drop(3)),
2123 Unreliable::new(Feedback::Ok)
2124 );
2125 assert_eq!(receiver.try_recv(), Ok(Message::Spill(2)));
2126 assert_eq!(receiver.try_recv(), Ok(Message::Drop(3)));
2127 });
2128 }
2129
2130 #[test]
2131 fn concurrent_overflow_cannot_be_bypassed_by_ready_fast_path() {
2132 loom::model(|| {
2133 let (sender, mut receiver) = new::<OrderedMessage>(NZUsize!(2));
2134 assert_eq!(sender.enqueue(OrderedMessage::Item(0)), Feedback::Ok);
2135 assert_eq!(sender.enqueue(OrderedMessage::Item(1)), Feedback::Ok);
2136
2137 let gate = Arc::new(AtomicUsize::new(0));
2138 let overflow_sender = sender.clone();
2139 let overflow_gate = gate.clone();
2140 let overflow = thread::spawn(move || {
2141 assert_eq!(
2142 overflow_sender.enqueue(OrderedMessage::Coordinated(2, overflow_gate)),
2143 Feedback::Backoff
2144 );
2145 });
2146
2147 while gate.load(Ordering::Acquire) == 0 {
2148 thread::yield_now();
2149 }
2150
2151 let mut observed = vec![value(receiver.try_recv().unwrap())];
2154 gate.store(2, Ordering::Release);
2155 let feedback = sender.enqueue(OrderedMessage::Item(3));
2156 assert!(feedback.accepted());
2157
2158 overflow.join().unwrap();
2159 while let Ok(message) = receiver.try_recv() {
2160 observed.push(value(message));
2161 }
2162
2163 assert_eq!(observed, vec![0, 1, 2, 3]);
2164 });
2165 }
2166
2167 #[test]
2168 fn concurrent_overflow_mutation_does_not_hide_published_overflow() {
2169 loom::model(|| {
2170 let (sender, mut receiver) = new::<OrderedMessage>(NZUsize!(1));
2171 assert_eq!(sender.enqueue(OrderedMessage::Item(0)), Feedback::Ok);
2172 assert_eq!(sender.enqueue(OrderedMessage::Item(1)), Feedback::Backoff);
2173
2174 let gate = Arc::new(AtomicUsize::new(0));
2175 let overflow_gate = gate.clone();
2176 let overflow = thread::spawn(move || {
2177 sender.enqueue(OrderedMessage::Coordinated(2, overflow_gate))
2178 });
2179
2180 while gate.load(Ordering::Acquire) == 0 {
2181 thread::yield_now();
2182 }
2183
2184 let release_gate = gate;
2185 let release = thread::spawn(move || {
2186 release_gate.store(2, Ordering::Release);
2187 });
2188
2189 let receive = thread::spawn(move || {
2190 assert_eq!(receiver.try_recv().map(value), Ok(0));
2191 assert_eq!(receiver.try_recv().map(value), Ok(1));
2192 receiver
2193 });
2194
2195 release.join().unwrap();
2196 let mut receiver = receive.join().unwrap();
2197 assert_eq!(overflow.join().unwrap(), Feedback::Backoff);
2198 assert_eq!(receiver.try_recv().map(value), Ok(2));
2199 });
2200 }
2201
2202 #[test]
2203 fn published_overflow_wakes_pending_receiver() {
2204 loom::model(|| {
2205 let (sender, mut receiver) = new::<OrderedMessage>(NZUsize!(1));
2206 let wakes = Arc::new(AtomicUsize::new(0));
2207 let waker = counting_waker(wakes.clone());
2208 let mut cx = Context::from_waker(&waker);
2209
2210 let gate = Arc::new(AtomicUsize::new(0));
2211 let overflow = {
2212 let next = receiver.recv();
2213 pin_mut!(next);
2214 assert!(matches!(next.as_mut().poll(&mut cx), Poll::Pending));
2215
2216 assert_eq!(sender.enqueue(OrderedMessage::Item(0)), Feedback::Ok);
2217 while wakes.load(Ordering::Acquire) == 0 {
2218 thread::yield_now();
2219 }
2220
2221 let overflow_gate = gate.clone();
2222 let overflow = thread::spawn(move || {
2223 sender.enqueue(OrderedMessage::Coordinated(1, overflow_gate))
2224 });
2225
2226 while gate.load(Ordering::Acquire) == 0 {
2227 thread::yield_now();
2228 }
2229
2230 assert_eq!(
2231 next.as_mut()
2232 .poll(&mut cx)
2233 .map(|message| message.map(value)),
2234 Poll::Ready(Some(0))
2235 );
2236 overflow
2237 };
2238
2239 {
2240 let next = receiver.recv();
2241 pin_mut!(next);
2242 assert!(matches!(next.as_mut().poll(&mut cx), Poll::Pending));
2243 assert_eq!(wakes.load(Ordering::Acquire), 1);
2244
2245 gate.store(2, Ordering::Release);
2246 while wakes.load(Ordering::Acquire) < 2 {
2247 thread::yield_now();
2248 }
2249
2250 assert_eq!(
2251 next.as_mut()
2252 .poll(&mut cx)
2253 .map(|message| message.map(value)),
2254 Poll::Ready(Some(1))
2255 );
2256 }
2257 assert_eq!(overflow.join().unwrap(), Feedback::Backoff);
2258 });
2259 }
2260
2261 #[test]
2262 fn concurrent_refill_and_enqueue_preserves_overflow_order() {
2263 loom::model(|| {
2264 let (sender, mut receiver) = new::<OrderedMessage>(NZUsize!(1));
2265 assert_eq!(sender.enqueue(OrderedMessage::Item(0)), Feedback::Ok);
2266 assert_eq!(sender.enqueue(OrderedMessage::Item(1)), Feedback::Backoff);
2267
2268 let enqueue = thread::spawn(move || sender.enqueue(OrderedMessage::Item(2)));
2269 let receive = thread::spawn(move || {
2270 assert_eq!(receiver.try_recv().map(value), Ok(0));
2271 receiver
2272 });
2273
2274 let mut receiver = receive.join().unwrap();
2275 assert_eq!(enqueue.join().unwrap(), Feedback::Backoff);
2276 assert_eq!(receiver.try_recv().map(value), Ok(1));
2277 assert_eq!(receiver.try_recv().map(value), Ok(2));
2278 });
2279 }
2280}