1use crate::{control::ControlBlockRef, error::*};
2use core::{mem::ManuallyDrop, num::NonZeroUsize, sync::atomic::*};
3use derivative::Derivative;
4
5#[cfg(feature = "futures_api")]
6use crate::waitlist::Slot;
7
8#[cfg(feature = "futures_api")]
9use futures::{task::*, Sink, Stream};
10
11#[cfg(feature = "futures_api")]
12use core::{mem, pin::Pin};
13
14#[derive(Derivative)]
16#[derivative(Debug(bound = ""), Eq(bound = ""), PartialEq(bound = ""))]
17pub struct RingSender<T> {
18 handle: ManuallyDrop<ControlBlockRef<T>>,
19
20 #[cfg(feature = "futures_api")]
21 #[derivative(PartialEq = "ignore")]
22 backoff: bool,
23}
24
25unsafe impl<T: Send> Send for RingSender<T> {}
26unsafe impl<T: Send> Sync for RingSender<T> {}
27
28impl<T> RingSender<T> {
29 fn new(handle: ManuallyDrop<ControlBlockRef<T>>) -> Self {
30 Self {
31 handle,
32
33 #[cfg(feature = "futures_api")]
34 backoff: false,
35 }
36 }
37
38 pub fn send(&self, message: T) -> Result<Option<T>, SendError<T>> {
45 if self.handle.receivers.load(Ordering::Acquire) > 0 {
46 let overwritten = self.handle.buffer.push(message);
47
48 #[cfg(feature = "futures_api")]
49 if overwritten.is_none() {
50 fence(Ordering::SeqCst);
53
54 if let Some(waker) = self.handle.waitlist.pop() {
55 waker.wake();
56 }
57 }
58
59 Ok(overwritten)
60 } else {
61 Err(SendError::Disconnected(message))
62 }
63 }
64}
65
66impl<T> Clone for RingSender<T> {
67 fn clone(&self) -> Self {
68 self.handle.senders.fetch_add(1, Ordering::Relaxed);
69 RingSender::new(self.handle.clone())
70 }
71}
72
73impl<T> Drop for RingSender<T> {
74 fn drop(&mut self) {
75 if self.handle.senders.fetch_sub(1, Ordering::AcqRel) == 1 {
77 #[cfg(feature = "futures_api")]
78 {
79 fence(Ordering::SeqCst);
82
83 while let Some(waker) = self.handle.waitlist.pop() {
84 waker.wake();
85 }
86 }
87
88 if !self.handle.connected.swap(false, Ordering::AcqRel) {
90 unsafe { ManuallyDrop::drop(&mut self.handle) }
91 }
92 }
93 }
94}
95
96#[cfg(feature = "futures_api")]
100impl<T> Sink<T> for RingSender<T> {
101 type Error = SendError<T>;
102
103 #[inline]
104 fn poll_ready(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
105 self.poll_flush(ctx)
106 }
107
108 #[inline]
109 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
110 self.backoff = self.send(item)?.is_some();
111 Ok(())
112 }
113
114 #[inline]
115 fn poll_flush(
116 mut self: Pin<&mut Self>,
117 ctx: &mut Context<'_>,
118 ) -> Poll<Result<(), Self::Error>> {
119 if mem::take(&mut self.backoff) {
120 ctx.waker().wake_by_ref();
121 Poll::Pending
122 } else {
123 Poll::Ready(Ok(()))
124 }
125 }
126
127 #[inline]
128 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129 Poll::Ready(Ok(()))
130 }
131}
132
133#[derive(Derivative)]
135#[derivative(Debug(bound = ""), Eq(bound = ""), PartialEq(bound = ""))]
136pub struct RingReceiver<T> {
137 handle: ManuallyDrop<ControlBlockRef<T>>,
138
139 #[cfg(feature = "futures_api")]
140 #[derivative(PartialEq = "ignore")]
141 slot: Slot,
142}
143
144unsafe impl<T: Send> Send for RingReceiver<T> {}
145unsafe impl<T: Send> Sync for RingReceiver<T> {}
146
147impl<T> RingReceiver<T> {
148 fn new(handle: ManuallyDrop<ControlBlockRef<T>>) -> Self {
149 Self {
150 #[cfg(feature = "futures_api")]
151 slot: handle.waitlist.register(),
152
153 handle,
154 }
155 }
156
157 pub fn try_recv(&self) -> Result<T, TryRecvError> {
164 if self.handle.senders.load(Ordering::Acquire) > 0 {
169 self.handle.buffer.pop().ok_or(TryRecvError::Empty)
170 } else {
171 self.handle.buffer.pop().ok_or(TryRecvError::Disconnected)
172 }
173 }
174
175 #[cfg(feature = "futures_api")]
185 pub fn recv(&self) -> Result<T, RecvError> {
186 futures::executor::block_on(futures::future::poll_fn(|ctx| self.poll(ctx)))
187 .ok_or(RecvError::Disconnected)
188 }
189
190 #[cfg(feature = "futures_api")]
191 fn poll(&self, ctx: &mut Context<'_>) -> Poll<Option<T>> {
192 match self.try_recv() {
193 result @ Ok(_) | result @ Err(TryRecvError::Disconnected) => {
194 self.handle.waitlist.remove(self.slot);
195 Poll::Ready(result.ok())
196 }
197
198 Err(TryRecvError::Empty) => {
199 self.handle.waitlist.insert(self.slot, ctx.waker().clone());
200
201 fence(Ordering::SeqCst);
204
205 match self.try_recv() {
207 result @ Ok(_) | result @ Err(TryRecvError::Disconnected) => {
208 self.handle.waitlist.remove(self.slot);
209 Poll::Ready(result.ok())
210 }
211
212 Err(TryRecvError::Empty) => Poll::Pending,
213 }
214 }
215 }
216 }
217}
218
219impl<T> Clone for RingReceiver<T> {
220 fn clone(&self) -> Self {
221 self.handle.receivers.fetch_add(1, Ordering::Relaxed);
222 RingReceiver::new(self.handle.clone())
223 }
224}
225
226impl<T> Drop for RingReceiver<T> {
227 fn drop(&mut self) {
228 #[cfg(feature = "futures_api")]
229 if self.handle.waitlist.deregister(self.slot).is_none() {
230 if let Some(waker) = self.handle.waitlist.pop() {
232 waker.wake();
233 }
234 }
235
236 if self.handle.receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
238 if !self.handle.connected.swap(false, Ordering::AcqRel) {
240 unsafe { ManuallyDrop::drop(&mut self.handle) }
241 }
242 }
243 }
244}
245
246#[cfg(feature = "futures_api")]
250impl<T> Stream for RingReceiver<T> {
251 type Item = T;
252
253 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254 self.poll(ctx)
255 }
256}
257
258pub fn ring_channel<T>(capacity: NonZeroUsize) -> (RingSender<T>, RingReceiver<T>) {
318 let handle = ManuallyDrop::new(ControlBlockRef::new(capacity.get()));
319 (RingSender::new(handle.clone()), RingReceiver::new(handle))
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use crate::Void;
326 use alloc::vec::Vec;
327 use core::{cmp::min, iter};
328 use futures::stream::{iter, repeat};
329 use futures::{future::try_join_all, prelude::*};
330 use proptest::collection::size_range;
331 use test_strategy::proptest;
332 use tokio::runtime;
333 use tokio::task::spawn_blocking;
334
335 #[cfg(feature = "futures_api")]
336 use core::time::Duration;
337
338 #[cfg(feature = "futures_api")]
339 use alloc::{sync::Arc, task::Wake};
340
341 #[cfg(feature = "futures_api")]
342 use futures::future::try_join;
343
344 #[cfg(feature = "futures_api")]
345 use tokio::{task::spawn, time::timeout};
346
347 #[cfg(feature = "futures_api")]
348 #[derive(Debug, Copy, Clone, Eq, PartialEq)]
349 struct MockWaker;
350
351 #[cfg(feature = "futures_api")]
352 impl Wake for MockWaker {
353 fn wake(self: Arc<Self>) {}
354 }
355
356 #[proptest]
357 fn ring_channel_is_associated_with_a_single_control_block() {
358 let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
359 assert_eq!(tx.handle, rx.handle);
360 }
361
362 #[proptest]
363 fn senders_are_equal_if_they_are_associated_with_the_same_ring_channel() {
364 let (s1, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
365 let (s2, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
366
367 assert_eq!(s1, s1.clone());
368 assert_eq!(s2, s2.clone());
369 assert_ne!(s1, s2);
370 }
371
372 #[cfg(feature = "futures_api")]
373 #[proptest]
374 fn senders_are_equal_even_if_backoff_is_different() {
375 let (mut tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
376 tx.backoff = true;
377 assert_eq!(tx, tx.clone());
378 }
379
380 #[proptest]
381 fn receivers_are_equal_if_they_are_associated_with_the_same_ring_channel() {
382 let (_, r1) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
383 let (_, r2) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
384
385 assert_eq!(r1, r1.clone());
386 assert_eq!(r2, r2.clone());
387 assert_ne!(r1, r2);
388 }
389
390 #[proptest]
391 fn cloning_sender_increments_senders() {
392 let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
393 #[allow(clippy::redundant_clone)]
394 let tx = tx.clone();
395 assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 2);
396 assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 1);
397 }
398
399 #[cfg(feature = "futures_api")]
400 #[proptest]
401 fn cloning_sender_resets_backoff_flag() {
402 let (mut tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
403 tx.backoff = true;
404 assert_ne!(tx.clone().backoff, tx.backoff);
405 }
406
407 #[proptest]
408 fn cloning_receiver_increments_receivers_counter() {
409 let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
410 #[allow(clippy::redundant_clone)]
411 let rx = rx.clone();
412 assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 1);
413 assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 2);
414 }
415
416 #[cfg(feature = "futures_api")]
417 #[proptest]
418 fn cloning_receiver_registers_waitlist_slot() {
419 let (_, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
420 assert_ne!(rx.clone().slot, rx.slot);
421 }
422
423 #[proptest]
424 fn dropping_sender_decrements_senders_counter() {
425 let (_, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
426 assert_eq!(rx.handle.senders.load(Ordering::SeqCst), 0);
427 assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 1);
428 }
429
430 #[proptest]
431 fn dropping_receiver_decrements_receivers_counter() {
432 let (tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
433 assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 1);
434 assert_eq!(tx.handle.receivers.load(Ordering::SeqCst), 0);
435 }
436
437 #[cfg(feature = "futures_api")]
438 #[proptest]
439 fn dropping_receiver_deregisters_waitlist_slot() {
440 let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
441 assert_eq!(tx.handle.waitlist.len(), 1);
442 drop(rx);
443 assert_eq!(tx.handle.waitlist.len(), 0);
444 }
445
446 #[proptest]
447 fn channel_is_disconnected_if_there_are_no_senders() {
448 let (_, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
449 assert_eq!(rx.handle.senders.load(Ordering::SeqCst), 0);
450 assert!(!rx.handle.connected.load(Ordering::SeqCst));
451 }
452
453 #[proptest]
454 fn channel_is_disconnected_if_there_are_no_receivers() {
455 let (tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
456 assert_eq!(tx.handle.receivers.load(Ordering::SeqCst), 0);
457 assert!(!tx.handle.connected.load(Ordering::SeqCst));
458 }
459
460 #[proptest]
461 fn endpoints_are_safe_to_send_across_threads() {
462 fn must_be_send(_: impl Send) {}
463 must_be_send(ring_channel::<Void>(NonZeroUsize::try_from(1)?));
464 }
465
466 #[proptest]
467 fn endpoints_are_safe_to_share_across_threads() {
468 fn must_be_sync(_: impl Sync) {}
469 must_be_sync(ring_channel::<Void>(NonZeroUsize::try_from(1)?));
470 }
471
472 #[proptest]
473 fn send_succeeds_on_connected_channel(
474 #[strategy(1..=10usize)] cap: usize,
475 #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
476 ) {
477 let rt = runtime::Builder::new_multi_thread().build()?;
478 let (tx, _rx) = ring_channel(NonZeroUsize::try_from(cap)?);
479
480 rt.block_on(iter(msgs).map(Ok).try_for_each_concurrent(None, |msg| {
481 let tx = tx.clone();
482 spawn_blocking(move || assert!(tx.send(msg).is_ok()))
483 }))?;
484 }
485
486 #[proptest]
487 fn send_fails_on_disconnected_channel(
488 #[strategy(1..=10usize)] cap: usize,
489 #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
490 ) {
491 let rt = runtime::Builder::new_multi_thread().build()?;
492 let (tx, _) = ring_channel(NonZeroUsize::try_from(cap)?);
493
494 rt.block_on(iter(msgs).map(Ok).try_for_each_concurrent(None, |msg| {
495 let tx = tx.clone();
496 spawn_blocking(move || assert_eq!(tx.send(msg), Err(SendError::Disconnected(msg))))
497 }))?;
498 }
499
500 #[proptest]
501 fn send_overwrites_old_messages(
502 #[strategy(1..=10usize)] cap: usize,
503 #[any(size_range(#cap..=10).lift())] msgs: Vec<u8>,
504 ) {
505 let (tx, rx) = ring_channel(NonZeroUsize::try_from(cap)?);
506 let overwritten = msgs.len() - min(msgs.len(), cap);
507
508 for &msg in &msgs[..cap] {
509 assert_eq!(tx.send(msg), Ok(None));
510 }
511
512 for (&prev, &msg) in msgs.iter().zip(&msgs[cap..]) {
513 assert_eq!(tx.send(msg), Ok(Some(prev)));
514 }
515
516 assert_eq!(
517 iter::from_fn(|| rx.handle.buffer.pop()).collect::<Vec<_>>(),
518 &msgs[overwritten..]
519 );
520 }
521
522 #[proptest]
523 fn try_recv_succeeds_on_non_empty_connected_channel(
524 #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
525 ) {
526 let rt = runtime::Builder::new_multi_thread().build()?;
527 let (tx, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
528
529 for msg in msgs.iter().cloned().enumerate() {
530 tx.handle.buffer.push(msg);
531 }
532
533 let mut received = rt.block_on(async {
534 try_join_all(
535 iter::repeat(rx)
536 .take(msgs.len())
537 .map(|rx| spawn_blocking(move || rx.try_recv().unwrap())),
538 )
539 .await
540 })?;
541
542 received.sort_by_key(|(k, _)| *k);
543 assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
544 }
545
546 #[proptest]
547 fn try_recv_succeeds_on_non_empty_disconnected_channel(
548 #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
549 ) {
550 let rt = runtime::Builder::new_multi_thread().build()?;
551 let (_, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
552
553 for msg in msgs.iter().cloned().enumerate() {
554 rx.handle.buffer.push(msg);
555 }
556
557 let mut received = rt.block_on(async {
558 try_join_all(
559 iter::repeat(rx)
560 .take(msgs.len())
561 .map(|rx| spawn_blocking(move || rx.try_recv().unwrap())),
562 )
563 .await
564 })?;
565
566 received.sort_by_key(|(k, _)| *k);
567 assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
568 }
569
570 #[proptest]
571 fn try_recv_fails_on_empty_connected_channel(
572 #[strategy(1..=10usize)] cap: usize,
573 #[strategy(1..=10usize)] n: usize,
574 ) {
575 let rt = runtime::Builder::new_multi_thread().build()?;
576 let (_tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
577
578 rt.block_on(
579 repeat(rx)
580 .take(n)
581 .map(Ok)
582 .try_for_each_concurrent(None, |rx| {
583 spawn_blocking(move || assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)))
584 }),
585 )?;
586 }
587
588 #[proptest]
589 fn try_recv_fails_on_empty_disconnected_channel(
590 #[strategy(1..=10usize)] cap: usize,
591 #[strategy(1..=10usize)] n: usize,
592 ) {
593 let rt = runtime::Builder::new_multi_thread().build()?;
594 let (_, rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
595
596 rt.block_on(
597 repeat(rx)
598 .take(n)
599 .map(Ok)
600 .try_for_each_concurrent(None, |rx| {
601 spawn_blocking(move || {
602 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected))
603 })
604 }),
605 )?;
606 }
607
608 #[cfg(feature = "futures_api")]
609 #[proptest]
610 fn recv_succeeds_on_non_empty_connected_channel(
611 #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
612 ) {
613 let rt = runtime::Builder::new_multi_thread().build()?;
614 let (tx, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
615
616 for msg in msgs.iter().cloned().enumerate() {
617 tx.handle.buffer.push(msg);
618 }
619
620 let mut received = rt.block_on(async {
621 try_join_all(
622 iter::repeat(rx)
623 .take(msgs.len())
624 .map(|rx| spawn_blocking(move || rx.recv().unwrap())),
625 )
626 .await
627 })?;
628
629 received.sort_by_key(|(k, _)| *k);
630 assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
631 }
632
633 #[cfg(feature = "futures_api")]
634 #[proptest]
635 fn recv_succeeds_on_non_empty_disconnected_channel(
636 #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
637 ) {
638 let rt = runtime::Builder::new_multi_thread().build()?;
639 let (_, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
640
641 for msg in msgs.iter().cloned().enumerate() {
642 rx.handle.buffer.push(msg);
643 }
644
645 let mut received = rt.block_on(async {
646 try_join_all(
647 iter::repeat(rx)
648 .take(msgs.len())
649 .map(|rx| spawn_blocking(move || rx.recv().unwrap())),
650 )
651 .await
652 })?;
653
654 received.sort_by_key(|(k, _)| *k);
655 assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
656 }
657
658 #[cfg(feature = "futures_api")]
659 #[proptest]
660 fn recv_fails_on_empty_disconnected_channel(
661 #[strategy(1..=10usize)] cap: usize,
662 #[strategy(1..=10usize)] n: usize,
663 ) {
664 let rt = runtime::Builder::new_multi_thread().build()?;
665 let (_, rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
666
667 rt.block_on(
668 repeat(rx)
669 .take(n)
670 .map(Ok)
671 .try_for_each_concurrent(None, |rx| {
672 spawn_blocking(move || assert_eq!(rx.recv(), Err(RecvError::Disconnected)))
673 }),
674 )?;
675 }
676
677 #[cfg(feature = "futures_api")]
678 #[proptest]
679 fn recv_wakes_on_disconnect(
680 #[strategy(1..=10usize)] m: usize,
681 #[strategy(1..=10usize)] n: usize,
682 ) {
683 let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
684 let (tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?);
685
686 let producer = repeat(tx)
687 .take(m)
688 .map(Ok)
689 .try_for_each_concurrent(None, |tx| spawn_blocking(move || drop(tx)));
690
691 let consumer = repeat(rx)
692 .take(n)
693 .map(Ok)
694 .try_for_each_concurrent(None, |rx| {
695 spawn_blocking(move || assert_eq!(rx.recv(), Err(RecvError::Disconnected)))
696 });
697
698 rt.block_on(async move {
699 timeout(Duration::from_secs(60), try_join(consumer, producer)).await
700 })??;
701 }
702
703 #[cfg(feature = "futures_api")]
704 #[proptest]
705 fn recv_wakes_on_send(#[strategy(1..=10usize)] n: usize) {
706 let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
707 let (tx, rx) = ring_channel(NonZeroUsize::try_from(n)?);
708 let _prevent_disconnection = tx.clone();
709
710 let producer = repeat(tx)
711 .take(n)
712 .map(Ok)
713 .try_for_each_concurrent(None, |tx| {
714 spawn_blocking(move || assert!(tx.send(()).is_ok()))
715 });
716
717 let consumer = repeat(rx)
718 .take(n)
719 .map(Ok)
720 .try_for_each_concurrent(None, |rx| {
721 spawn_blocking(move || assert_eq!(rx.recv(), Ok(())))
722 });
723
724 rt.block_on(async move {
725 timeout(Duration::from_secs(60), try_join(consumer, producer)).await
726 })??;
727 }
728
729 #[cfg(feature = "futures_api")]
730 #[proptest]
731 fn sender_implements_sink(
732 #[strategy(1..=10usize)] cap: usize,
733 #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
734 ) {
735 let rt = runtime::Builder::new_multi_thread().build()?;
736 let (mut tx, rx) = ring_channel(NonZeroUsize::try_from(cap)?);
737 let overwritten = msgs.len() - min(msgs.len(), cap);
738
739 assert_eq!(rt.block_on(iter(&msgs).map(Ok).forward(&mut tx)), Ok(()));
740
741 drop(tx); assert_eq!(
744 iter::from_fn(|| rx.try_recv().ok().copied()).collect::<Vec<_>>(),
745 &msgs[overwritten..]
746 );
747 }
748
749 #[cfg(feature = "futures_api")]
750 #[proptest]
751 fn sender_sets_backoff_to_true_if_sink_overwrites_on_send() {
752 let (mut tx, _rx) = ring_channel(NonZeroUsize::try_from(1)?);
753
754 assert!(!tx.backoff);
755 assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
756 assert!(!tx.backoff);
757 assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
758 assert!(tx.backoff);
759 }
760
761 #[cfg(feature = "futures_api")]
762 #[proptest]
763 fn sender_yields_once_on_poll_ready_if_backoff_is_true(#[strategy(1..=10usize)] cap: usize) {
764 let (mut tx, _) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
765 tx.backoff = true;
766
767 let waker = Arc::new(MockWaker).into();
768 let mut ctx = Context::from_waker(&waker);
769
770 assert_eq!(Pin::new(&mut tx).poll_ready(&mut ctx), Poll::Pending);
771 assert!(!tx.backoff);
772 assert_eq!(Pin::new(&mut tx).poll_ready(&mut ctx), Poll::Ready(Ok(())));
773 }
774
775 #[cfg(feature = "futures_api")]
776 #[proptest]
777 fn receiver_implements_stream(
778 #[strategy(1..=10usize)] cap: usize,
779 #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
780 ) {
781 let rt = runtime::Builder::new_multi_thread().build()?;
782 let (tx, rx) = ring_channel(NonZeroUsize::try_from(cap)?);
783 let overwritten = msgs.len() - min(msgs.len(), cap);
784
785 for &msg in &msgs {
786 assert!(tx.send(msg).is_ok());
787 }
788
789 drop(tx); assert_eq!(rt.block_on(rx.collect::<Vec<_>>()), &msgs[overwritten..]);
792 }
793
794 #[cfg(feature = "futures_api")]
795 #[cfg(not(miri))] #[proptest]
797 fn receiver_stores_most_recent_waker_if_channel_is_empty(#[strategy(1..=10usize)] cap: usize) {
798 let (_tx, mut rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
799
800 let a = Arc::new(MockWaker).into();
801 let mut ctx = Context::from_waker(&a);
802
803 assert_eq!(Pin::new(&mut rx).poll_next(&mut ctx), Poll::Pending);
804 assert!(rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&a));
805
806 let b = Arc::new(MockWaker).into();
807 let mut ctx = Context::from_waker(&b);
808
809 assert_eq!(Pin::new(&mut rx).poll_next(&mut ctx), Poll::Pending);
810 assert!(!rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&a));
811 assert!(rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&b));
812 }
813
814 #[cfg(feature = "futures_api")]
815 #[cfg(not(miri))] #[proptest]
817 fn receiver_withdraws_waker_if_channel_not_empty(#[strategy(1..=10usize)] cap: usize, msg: u8) {
818 let (tx, mut rx) = ring_channel(NonZeroUsize::try_from(cap)?);
819
820 let waker = Arc::new(MockWaker).into();
821 let mut ctx = Context::from_waker(&waker);
822
823 assert_eq!(Pin::new(&mut rx).poll_next(&mut ctx), Poll::Pending);
824 assert!(rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&waker));
825
826 assert_eq!(tx.send(&msg), Ok(None));
827
828 assert_eq!(
829 Pin::new(&mut rx).poll_next(&mut ctx),
830 Poll::Ready(Some(&msg))
831 );
832
833 assert!(rx.handle.waitlist.get(rx.slot).is_none());
834 }
835
836 #[cfg(feature = "futures_api")]
837 #[proptest]
838 fn stream_wakes_on_disconnect(
839 #[strategy(1..=10usize)] m: usize,
840 #[strategy(1..=10usize)] n: usize,
841 ) {
842 let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
843 let (tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?);
844
845 let producer = repeat(tx)
846 .take(m)
847 .map(Ok)
848 .try_for_each_concurrent(None, |mut tx| {
849 spawn(async move { assert_eq!(tx.close().await, Ok(())) })
850 });
851
852 let consumer = repeat(rx)
853 .take(n)
854 .map(Ok)
855 .try_for_each_concurrent(None, |mut rx| {
856 spawn(async move { assert_eq!(rx.next().await, None) })
857 });
858
859 rt.block_on(async move {
860 timeout(Duration::from_secs(60), try_join(consumer, producer)).await
861 })??;
862 }
863
864 #[cfg(feature = "futures_api")]
865 #[proptest]
866 fn stream_wakes_on_sink(#[strategy(1..=10usize)] n: usize) {
867 let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
868 let (tx, rx) = ring_channel(NonZeroUsize::try_from(n)?);
869 let _prevent_disconnection = tx.clone();
870
871 let producer = repeat(tx)
872 .take(n)
873 .map(Ok)
874 .try_for_each_concurrent(None, |tx| {
875 spawn(async move { assert_eq!(iter(Some(Ok(()))).forward(tx).await, Ok(())) })
876 });
877
878 let consumer = repeat(rx)
879 .take(n)
880 .map(Ok)
881 .try_for_each_concurrent(None, |mut rx| {
882 spawn(async move { assert_eq!(rx.next().await, Some(())) })
883 });
884
885 rt.block_on(async move {
886 timeout(Duration::from_secs(60), try_join(consumer, producer)).await
887 })??;
888 }
889}