ring_channel/
channel.rs

1use crate::{control::ControlBlockRef, error::*};
2use core::{mem::ManuallyDrop, num::NonZeroUsize, sync::atomic::*};
3use derivative::Derivative;
4
5#[cfg(feature = "futures_api")]
6use crate::waitlist::Slot;
7
8#[cfg(feature = "futures_api")]
9use futures::{task::*, Sink, Stream};
10
11#[cfg(feature = "futures_api")]
12use core::{mem, pin::Pin};
13
14/// The sending end of a [`ring_channel`].
15#[derive(Derivative)]
16#[derivative(Debug(bound = ""), Eq(bound = ""), PartialEq(bound = ""))]
17pub struct RingSender<T> {
18    handle: ManuallyDrop<ControlBlockRef<T>>,
19
20    #[cfg(feature = "futures_api")]
21    #[derivative(PartialEq = "ignore")]
22    backoff: bool,
23}
24
25unsafe impl<T: Send> Send for RingSender<T> {}
26unsafe impl<T: Send> Sync for RingSender<T> {}
27
28impl<T> RingSender<T> {
29    fn new(handle: ManuallyDrop<ControlBlockRef<T>>) -> Self {
30        Self {
31            handle,
32
33            #[cfg(feature = "futures_api")]
34            backoff: false,
35        }
36    }
37
38    /// Sends a message through the channel without blocking.
39    ///
40    /// * If the channel is not disconnected, the message is pushed into the internal ring buffer.
41    ///     * If the internal ring buffer is full, the oldest pending message is overwritten
42    ///       and returned as `Ok(Some(_))`, otherwise `Ok(None)` is returned.
43    /// * If the channel is disconnected, [`SendError::Disconnected`] is returned.
44    pub fn send(&self, message: T) -> Result<Option<T>, SendError<T>> {
45        if self.handle.receivers.load(Ordering::Acquire) > 0 {
46            let overwritten = self.handle.buffer.push(message);
47
48            #[cfg(feature = "futures_api")]
49            if overwritten.is_none() {
50                // A full memory barrier is necessary to ensure that storing the message
51                // happens before waking receivers.
52                fence(Ordering::SeqCst);
53
54                if let Some(waker) = self.handle.waitlist.pop() {
55                    waker.wake();
56                }
57            }
58
59            Ok(overwritten)
60        } else {
61            Err(SendError::Disconnected(message))
62        }
63    }
64}
65
66impl<T> Clone for RingSender<T> {
67    fn clone(&self) -> Self {
68        self.handle.senders.fetch_add(1, Ordering::Relaxed);
69        RingSender::new(self.handle.clone())
70    }
71}
72
73impl<T> Drop for RingSender<T> {
74    fn drop(&mut self) {
75        // Synchronizes with other senders.
76        if self.handle.senders.fetch_sub(1, Ordering::AcqRel) == 1 {
77            #[cfg(feature = "futures_api")]
78            {
79                // A full memory barrier is necessary to ensure that updating senders
80                // happens before waking receivers.
81                fence(Ordering::SeqCst);
82
83                while let Some(waker) = self.handle.waitlist.pop() {
84                    waker.wake();
85                }
86            }
87
88            // Synchronizes the last sender and receiver with each other.
89            if !self.handle.connected.swap(false, Ordering::AcqRel) {
90                unsafe { ManuallyDrop::drop(&mut self.handle) }
91            }
92        }
93    }
94}
95
96/// Requires [feature] `"futures_api"`
97///
98/// [feature]: index.html#optional-features
99#[cfg(feature = "futures_api")]
100impl<T> Sink<T> for RingSender<T> {
101    type Error = SendError<T>;
102
103    #[inline]
104    fn poll_ready(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
105        self.poll_flush(ctx)
106    }
107
108    #[inline]
109    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
110        self.backoff = self.send(item)?.is_some();
111        Ok(())
112    }
113
114    #[inline]
115    fn poll_flush(
116        mut self: Pin<&mut Self>,
117        ctx: &mut Context<'_>,
118    ) -> Poll<Result<(), Self::Error>> {
119        if mem::take(&mut self.backoff) {
120            ctx.waker().wake_by_ref();
121            Poll::Pending
122        } else {
123            Poll::Ready(Ok(()))
124        }
125    }
126
127    #[inline]
128    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129        Poll::Ready(Ok(()))
130    }
131}
132
133/// The receiving end of a [`ring_channel`].
134#[derive(Derivative)]
135#[derivative(Debug(bound = ""), Eq(bound = ""), PartialEq(bound = ""))]
136pub struct RingReceiver<T> {
137    handle: ManuallyDrop<ControlBlockRef<T>>,
138
139    #[cfg(feature = "futures_api")]
140    #[derivative(PartialEq = "ignore")]
141    slot: Slot,
142}
143
144unsafe impl<T: Send> Send for RingReceiver<T> {}
145unsafe impl<T: Send> Sync for RingReceiver<T> {}
146
147impl<T> RingReceiver<T> {
148    fn new(handle: ManuallyDrop<ControlBlockRef<T>>) -> Self {
149        Self {
150            #[cfg(feature = "futures_api")]
151            slot: handle.waitlist.register(),
152
153            handle,
154        }
155    }
156
157    /// Receives a message through the channel without blocking.
158    ///
159    /// * If the internal ring buffer isn't empty, the oldest pending message is returned.
160    /// * If the internal ring buffer is empty, [`TryRecvError::Empty`] is returned.
161    /// * If the channel is disconnected and the internal ring buffer is empty,
162    /// [`TryRecvError::Disconnected`] is returned.
163    pub fn try_recv(&self) -> Result<T, TryRecvError> {
164        // We must check whether the channel is connected using acquire ordering before we look at
165        // the buffer, in order to ensure that the loads associated with popping from the buffer
166        // happen after the stores associated with a push into the buffer that may have happened
167        // immediately before the channel was disconnected.
168        if self.handle.senders.load(Ordering::Acquire) > 0 {
169            self.handle.buffer.pop().ok_or(TryRecvError::Empty)
170        } else {
171            self.handle.buffer.pop().ok_or(TryRecvError::Disconnected)
172        }
173    }
174
175    /// Receives a message through the channel (requires [feature] `"futures_api"`).
176    ///
177    /// * If the internal ring buffer isn't empty, the oldest pending message is returned.
178    /// * If the internal ring buffer is empty, the call blocks until a message is sent
179    /// or the channel disconnects.
180    /// * If the channel is disconnected and the internal ring buffer is empty,
181    /// [`RecvError::Disconnected`] is returned.
182    ///
183    /// [feature]: index.html#optional-features
184    #[cfg(feature = "futures_api")]
185    pub fn recv(&self) -> Result<T, RecvError> {
186        futures::executor::block_on(futures::future::poll_fn(|ctx| self.poll(ctx)))
187            .ok_or(RecvError::Disconnected)
188    }
189
190    #[cfg(feature = "futures_api")]
191    fn poll(&self, ctx: &mut Context<'_>) -> Poll<Option<T>> {
192        match self.try_recv() {
193            result @ Ok(_) | result @ Err(TryRecvError::Disconnected) => {
194                self.handle.waitlist.remove(self.slot);
195                Poll::Ready(result.ok())
196            }
197
198            Err(TryRecvError::Empty) => {
199                self.handle.waitlist.insert(self.slot, ctx.waker().clone());
200
201                // A full memory barrier is necessary to ensure that storing the waker
202                // happens before attempting to retrieve a message from the buffer.
203                fence(Ordering::SeqCst);
204
205                // Look at the buffer again in case a new message has been sent in the meantime.
206                match self.try_recv() {
207                    result @ Ok(_) | result @ Err(TryRecvError::Disconnected) => {
208                        self.handle.waitlist.remove(self.slot);
209                        Poll::Ready(result.ok())
210                    }
211
212                    Err(TryRecvError::Empty) => Poll::Pending,
213                }
214            }
215        }
216    }
217}
218
219impl<T> Clone for RingReceiver<T> {
220    fn clone(&self) -> Self {
221        self.handle.receivers.fetch_add(1, Ordering::Relaxed);
222        RingReceiver::new(self.handle.clone())
223    }
224}
225
226impl<T> Drop for RingReceiver<T> {
227    fn drop(&mut self) {
228        #[cfg(feature = "futures_api")]
229        if self.handle.waitlist.deregister(self.slot).is_none() {
230            // Wake some other receiver in case we have been woken in the meantime.
231            if let Some(waker) = self.handle.waitlist.pop() {
232                waker.wake();
233            }
234        }
235
236        // Synchronizes with other receivers.
237        if self.handle.receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
238            // Synchronizes the last sender and receiver with each other.
239            if !self.handle.connected.swap(false, Ordering::AcqRel) {
240                unsafe { ManuallyDrop::drop(&mut self.handle) }
241            }
242        }
243    }
244}
245
246/// Requires [feature] `"futures_api"`.
247///
248/// [feature]: index.html#optional-features
249#[cfg(feature = "futures_api")]
250impl<T> Stream for RingReceiver<T> {
251    type Item = T;
252
253    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254        self.poll(ctx)
255    }
256}
257
258/// Opens a multi-producer multi-consumer channel backed by a ring buffer.
259///
260/// The associated ring buffer can contain up to `capacity` pending messages.
261///
262/// Sending and receiving messages through this channel _never blocks_, however
263/// pending messages may be overwritten if the internal ring buffer overflows.
264///
265/// # Panics
266///
267/// Panics if the `capacity` is `0`.
268///
269/// # Examples
270///
271/// ```no_run
272/// # fn doctest() -> Result<(), Box<dyn std::error::Error>> {
273/// use ring_channel::*;
274/// use std::{num::NonZeroUsize, thread};
275/// use std::time::{Duration, Instant};
276///
277/// // Open a channel to transmit the time elapsed since the beginning of the countdown.
278/// // We only need a buffer of size 1, since we're only interested in the current value.
279/// let (tx, rx) = ring_channel(NonZeroUsize::try_from(1)?);
280///
281/// thread::spawn(move || {
282///     let countdown = Instant::now() + Duration::from_secs(10);
283///
284///     // Update the channel with the time elapsed so far.
285///     while let Ok(_) = tx.send(countdown - Instant::now()) {
286///
287///         // We only need millisecond precision.
288///         thread::sleep(Duration::from_millis(1));
289///
290///         if Instant::now() > countdown {
291///             break;
292///         }
293///     }
294/// });
295///
296/// loop {
297///     match rx.try_recv() {
298///         // Print the current time elapsed.
299///         Ok(timer) => {
300///             print!("\r{:02}.{:03}", timer.as_secs(), timer.as_millis() % 1000);
301///
302///             if timer <= Duration::from_millis(6600) {
303///                 print!(" - Main engine start                           ");
304///             } else {
305///                 print!(" - Activate main engine hydrogen burnoff system");
306///             }
307///         }
308///         Err(TryRecvError::Disconnected) => break,
309///         Err(TryRecvError::Empty) => thread::yield_now(),
310///     }
311/// }
312///
313/// println!("\r00.0000 - Solid rocket booster ignition and liftoff!");
314/// # Ok(())
315/// # }
316/// ```
317pub fn ring_channel<T>(capacity: NonZeroUsize) -> (RingSender<T>, RingReceiver<T>) {
318    let handle = ManuallyDrop::new(ControlBlockRef::new(capacity.get()));
319    (RingSender::new(handle.clone()), RingReceiver::new(handle))
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use crate::Void;
326    use alloc::vec::Vec;
327    use core::{cmp::min, iter};
328    use futures::stream::{iter, repeat};
329    use futures::{future::try_join_all, prelude::*};
330    use proptest::collection::size_range;
331    use test_strategy::proptest;
332    use tokio::runtime;
333    use tokio::task::spawn_blocking;
334
335    #[cfg(feature = "futures_api")]
336    use core::time::Duration;
337
338    #[cfg(feature = "futures_api")]
339    use alloc::{sync::Arc, task::Wake};
340
341    #[cfg(feature = "futures_api")]
342    use futures::future::try_join;
343
344    #[cfg(feature = "futures_api")]
345    use tokio::{task::spawn, time::timeout};
346
347    #[cfg(feature = "futures_api")]
348    #[derive(Debug, Copy, Clone, Eq, PartialEq)]
349    struct MockWaker;
350
351    #[cfg(feature = "futures_api")]
352    impl Wake for MockWaker {
353        fn wake(self: Arc<Self>) {}
354    }
355
356    #[proptest]
357    fn ring_channel_is_associated_with_a_single_control_block() {
358        let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
359        assert_eq!(tx.handle, rx.handle);
360    }
361
362    #[proptest]
363    fn senders_are_equal_if_they_are_associated_with_the_same_ring_channel() {
364        let (s1, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
365        let (s2, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
366
367        assert_eq!(s1, s1.clone());
368        assert_eq!(s2, s2.clone());
369        assert_ne!(s1, s2);
370    }
371
372    #[cfg(feature = "futures_api")]
373    #[proptest]
374    fn senders_are_equal_even_if_backoff_is_different() {
375        let (mut tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
376        tx.backoff = true;
377        assert_eq!(tx, tx.clone());
378    }
379
380    #[proptest]
381    fn receivers_are_equal_if_they_are_associated_with_the_same_ring_channel() {
382        let (_, r1) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
383        let (_, r2) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
384
385        assert_eq!(r1, r1.clone());
386        assert_eq!(r2, r2.clone());
387        assert_ne!(r1, r2);
388    }
389
390    #[proptest]
391    fn cloning_sender_increments_senders() {
392        let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
393        #[allow(clippy::redundant_clone)]
394        let tx = tx.clone();
395        assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 2);
396        assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 1);
397    }
398
399    #[cfg(feature = "futures_api")]
400    #[proptest]
401    fn cloning_sender_resets_backoff_flag() {
402        let (mut tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
403        tx.backoff = true;
404        assert_ne!(tx.clone().backoff, tx.backoff);
405    }
406
407    #[proptest]
408    fn cloning_receiver_increments_receivers_counter() {
409        let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
410        #[allow(clippy::redundant_clone)]
411        let rx = rx.clone();
412        assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 1);
413        assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 2);
414    }
415
416    #[cfg(feature = "futures_api")]
417    #[proptest]
418    fn cloning_receiver_registers_waitlist_slot() {
419        let (_, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
420        assert_ne!(rx.clone().slot, rx.slot);
421    }
422
423    #[proptest]
424    fn dropping_sender_decrements_senders_counter() {
425        let (_, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
426        assert_eq!(rx.handle.senders.load(Ordering::SeqCst), 0);
427        assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 1);
428    }
429
430    #[proptest]
431    fn dropping_receiver_decrements_receivers_counter() {
432        let (tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
433        assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 1);
434        assert_eq!(tx.handle.receivers.load(Ordering::SeqCst), 0);
435    }
436
437    #[cfg(feature = "futures_api")]
438    #[proptest]
439    fn dropping_receiver_deregisters_waitlist_slot() {
440        let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
441        assert_eq!(tx.handle.waitlist.len(), 1);
442        drop(rx);
443        assert_eq!(tx.handle.waitlist.len(), 0);
444    }
445
446    #[proptest]
447    fn channel_is_disconnected_if_there_are_no_senders() {
448        let (_, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
449        assert_eq!(rx.handle.senders.load(Ordering::SeqCst), 0);
450        assert!(!rx.handle.connected.load(Ordering::SeqCst));
451    }
452
453    #[proptest]
454    fn channel_is_disconnected_if_there_are_no_receivers() {
455        let (tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
456        assert_eq!(tx.handle.receivers.load(Ordering::SeqCst), 0);
457        assert!(!tx.handle.connected.load(Ordering::SeqCst));
458    }
459
460    #[proptest]
461    fn endpoints_are_safe_to_send_across_threads() {
462        fn must_be_send(_: impl Send) {}
463        must_be_send(ring_channel::<Void>(NonZeroUsize::try_from(1)?));
464    }
465
466    #[proptest]
467    fn endpoints_are_safe_to_share_across_threads() {
468        fn must_be_sync(_: impl Sync) {}
469        must_be_sync(ring_channel::<Void>(NonZeroUsize::try_from(1)?));
470    }
471
472    #[proptest]
473    fn send_succeeds_on_connected_channel(
474        #[strategy(1..=10usize)] cap: usize,
475        #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
476    ) {
477        let rt = runtime::Builder::new_multi_thread().build()?;
478        let (tx, _rx) = ring_channel(NonZeroUsize::try_from(cap)?);
479
480        rt.block_on(iter(msgs).map(Ok).try_for_each_concurrent(None, |msg| {
481            let tx = tx.clone();
482            spawn_blocking(move || assert!(tx.send(msg).is_ok()))
483        }))?;
484    }
485
486    #[proptest]
487    fn send_fails_on_disconnected_channel(
488        #[strategy(1..=10usize)] cap: usize,
489        #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
490    ) {
491        let rt = runtime::Builder::new_multi_thread().build()?;
492        let (tx, _) = ring_channel(NonZeroUsize::try_from(cap)?);
493
494        rt.block_on(iter(msgs).map(Ok).try_for_each_concurrent(None, |msg| {
495            let tx = tx.clone();
496            spawn_blocking(move || assert_eq!(tx.send(msg), Err(SendError::Disconnected(msg))))
497        }))?;
498    }
499
500    #[proptest]
501    fn send_overwrites_old_messages(
502        #[strategy(1..=10usize)] cap: usize,
503        #[any(size_range(#cap..=10).lift())] msgs: Vec<u8>,
504    ) {
505        let (tx, rx) = ring_channel(NonZeroUsize::try_from(cap)?);
506        let overwritten = msgs.len() - min(msgs.len(), cap);
507
508        for &msg in &msgs[..cap] {
509            assert_eq!(tx.send(msg), Ok(None));
510        }
511
512        for (&prev, &msg) in msgs.iter().zip(&msgs[cap..]) {
513            assert_eq!(tx.send(msg), Ok(Some(prev)));
514        }
515
516        assert_eq!(
517            iter::from_fn(|| rx.handle.buffer.pop()).collect::<Vec<_>>(),
518            &msgs[overwritten..]
519        );
520    }
521
522    #[proptest]
523    fn try_recv_succeeds_on_non_empty_connected_channel(
524        #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
525    ) {
526        let rt = runtime::Builder::new_multi_thread().build()?;
527        let (tx, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
528
529        for msg in msgs.iter().cloned().enumerate() {
530            tx.handle.buffer.push(msg);
531        }
532
533        let mut received = rt.block_on(async {
534            try_join_all(
535                iter::repeat(rx)
536                    .take(msgs.len())
537                    .map(|rx| spawn_blocking(move || rx.try_recv().unwrap())),
538            )
539            .await
540        })?;
541
542        received.sort_by_key(|(k, _)| *k);
543        assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
544    }
545
546    #[proptest]
547    fn try_recv_succeeds_on_non_empty_disconnected_channel(
548        #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
549    ) {
550        let rt = runtime::Builder::new_multi_thread().build()?;
551        let (_, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
552
553        for msg in msgs.iter().cloned().enumerate() {
554            rx.handle.buffer.push(msg);
555        }
556
557        let mut received = rt.block_on(async {
558            try_join_all(
559                iter::repeat(rx)
560                    .take(msgs.len())
561                    .map(|rx| spawn_blocking(move || rx.try_recv().unwrap())),
562            )
563            .await
564        })?;
565
566        received.sort_by_key(|(k, _)| *k);
567        assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
568    }
569
570    #[proptest]
571    fn try_recv_fails_on_empty_connected_channel(
572        #[strategy(1..=10usize)] cap: usize,
573        #[strategy(1..=10usize)] n: usize,
574    ) {
575        let rt = runtime::Builder::new_multi_thread().build()?;
576        let (_tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
577
578        rt.block_on(
579            repeat(rx)
580                .take(n)
581                .map(Ok)
582                .try_for_each_concurrent(None, |rx| {
583                    spawn_blocking(move || assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)))
584                }),
585        )?;
586    }
587
588    #[proptest]
589    fn try_recv_fails_on_empty_disconnected_channel(
590        #[strategy(1..=10usize)] cap: usize,
591        #[strategy(1..=10usize)] n: usize,
592    ) {
593        let rt = runtime::Builder::new_multi_thread().build()?;
594        let (_, rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
595
596        rt.block_on(
597            repeat(rx)
598                .take(n)
599                .map(Ok)
600                .try_for_each_concurrent(None, |rx| {
601                    spawn_blocking(move || {
602                        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected))
603                    })
604                }),
605        )?;
606    }
607
608    #[cfg(feature = "futures_api")]
609    #[proptest]
610    fn recv_succeeds_on_non_empty_connected_channel(
611        #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
612    ) {
613        let rt = runtime::Builder::new_multi_thread().build()?;
614        let (tx, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
615
616        for msg in msgs.iter().cloned().enumerate() {
617            tx.handle.buffer.push(msg);
618        }
619
620        let mut received = rt.block_on(async {
621            try_join_all(
622                iter::repeat(rx)
623                    .take(msgs.len())
624                    .map(|rx| spawn_blocking(move || rx.recv().unwrap())),
625            )
626            .await
627        })?;
628
629        received.sort_by_key(|(k, _)| *k);
630        assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
631    }
632
633    #[cfg(feature = "futures_api")]
634    #[proptest]
635    fn recv_succeeds_on_non_empty_disconnected_channel(
636        #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
637    ) {
638        let rt = runtime::Builder::new_multi_thread().build()?;
639        let (_, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
640
641        for msg in msgs.iter().cloned().enumerate() {
642            rx.handle.buffer.push(msg);
643        }
644
645        let mut received = rt.block_on(async {
646            try_join_all(
647                iter::repeat(rx)
648                    .take(msgs.len())
649                    .map(|rx| spawn_blocking(move || rx.recv().unwrap())),
650            )
651            .await
652        })?;
653
654        received.sort_by_key(|(k, _)| *k);
655        assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
656    }
657
658    #[cfg(feature = "futures_api")]
659    #[proptest]
660    fn recv_fails_on_empty_disconnected_channel(
661        #[strategy(1..=10usize)] cap: usize,
662        #[strategy(1..=10usize)] n: usize,
663    ) {
664        let rt = runtime::Builder::new_multi_thread().build()?;
665        let (_, rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
666
667        rt.block_on(
668            repeat(rx)
669                .take(n)
670                .map(Ok)
671                .try_for_each_concurrent(None, |rx| {
672                    spawn_blocking(move || assert_eq!(rx.recv(), Err(RecvError::Disconnected)))
673                }),
674        )?;
675    }
676
677    #[cfg(feature = "futures_api")]
678    #[proptest]
679    fn recv_wakes_on_disconnect(
680        #[strategy(1..=10usize)] m: usize,
681        #[strategy(1..=10usize)] n: usize,
682    ) {
683        let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
684        let (tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?);
685
686        let producer = repeat(tx)
687            .take(m)
688            .map(Ok)
689            .try_for_each_concurrent(None, |tx| spawn_blocking(move || drop(tx)));
690
691        let consumer = repeat(rx)
692            .take(n)
693            .map(Ok)
694            .try_for_each_concurrent(None, |rx| {
695                spawn_blocking(move || assert_eq!(rx.recv(), Err(RecvError::Disconnected)))
696            });
697
698        rt.block_on(async move {
699            timeout(Duration::from_secs(60), try_join(consumer, producer)).await
700        })??;
701    }
702
703    #[cfg(feature = "futures_api")]
704    #[proptest]
705    fn recv_wakes_on_send(#[strategy(1..=10usize)] n: usize) {
706        let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
707        let (tx, rx) = ring_channel(NonZeroUsize::try_from(n)?);
708        let _prevent_disconnection = tx.clone();
709
710        let producer = repeat(tx)
711            .take(n)
712            .map(Ok)
713            .try_for_each_concurrent(None, |tx| {
714                spawn_blocking(move || assert!(tx.send(()).is_ok()))
715            });
716
717        let consumer = repeat(rx)
718            .take(n)
719            .map(Ok)
720            .try_for_each_concurrent(None, |rx| {
721                spawn_blocking(move || assert_eq!(rx.recv(), Ok(())))
722            });
723
724        rt.block_on(async move {
725            timeout(Duration::from_secs(60), try_join(consumer, producer)).await
726        })??;
727    }
728
729    #[cfg(feature = "futures_api")]
730    #[proptest]
731    fn sender_implements_sink(
732        #[strategy(1..=10usize)] cap: usize,
733        #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
734    ) {
735        let rt = runtime::Builder::new_multi_thread().build()?;
736        let (mut tx, rx) = ring_channel(NonZeroUsize::try_from(cap)?);
737        let overwritten = msgs.len() - min(msgs.len(), cap);
738
739        assert_eq!(rt.block_on(iter(&msgs).map(Ok).forward(&mut tx)), Ok(()));
740
741        drop(tx); // hang-up
742
743        assert_eq!(
744            iter::from_fn(|| rx.try_recv().ok().copied()).collect::<Vec<_>>(),
745            &msgs[overwritten..]
746        );
747    }
748
749    #[cfg(feature = "futures_api")]
750    #[proptest]
751    fn sender_sets_backoff_to_true_if_sink_overwrites_on_send() {
752        let (mut tx, _rx) = ring_channel(NonZeroUsize::try_from(1)?);
753
754        assert!(!tx.backoff);
755        assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
756        assert!(!tx.backoff);
757        assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
758        assert!(tx.backoff);
759    }
760
761    #[cfg(feature = "futures_api")]
762    #[proptest]
763    fn sender_yields_once_on_poll_ready_if_backoff_is_true(#[strategy(1..=10usize)] cap: usize) {
764        let (mut tx, _) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
765        tx.backoff = true;
766
767        let waker = Arc::new(MockWaker).into();
768        let mut ctx = Context::from_waker(&waker);
769
770        assert_eq!(Pin::new(&mut tx).poll_ready(&mut ctx), Poll::Pending);
771        assert!(!tx.backoff);
772        assert_eq!(Pin::new(&mut tx).poll_ready(&mut ctx), Poll::Ready(Ok(())));
773    }
774
775    #[cfg(feature = "futures_api")]
776    #[proptest]
777    fn receiver_implements_stream(
778        #[strategy(1..=10usize)] cap: usize,
779        #[any(size_range(1..=10).lift())] msgs: Vec<u8>,
780    ) {
781        let rt = runtime::Builder::new_multi_thread().build()?;
782        let (tx, rx) = ring_channel(NonZeroUsize::try_from(cap)?);
783        let overwritten = msgs.len() - min(msgs.len(), cap);
784
785        for &msg in &msgs {
786            assert!(tx.send(msg).is_ok());
787        }
788
789        drop(tx); // hang-up
790
791        assert_eq!(rt.block_on(rx.collect::<Vec<_>>()), &msgs[overwritten..]);
792    }
793
794    #[cfg(feature = "futures_api")]
795    #[cfg(not(miri))] // will_wake sometimes returns false under miri
796    #[proptest]
797    fn receiver_stores_most_recent_waker_if_channel_is_empty(#[strategy(1..=10usize)] cap: usize) {
798        let (_tx, mut rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
799
800        let a = Arc::new(MockWaker).into();
801        let mut ctx = Context::from_waker(&a);
802
803        assert_eq!(Pin::new(&mut rx).poll_next(&mut ctx), Poll::Pending);
804        assert!(rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&a));
805
806        let b = Arc::new(MockWaker).into();
807        let mut ctx = Context::from_waker(&b);
808
809        assert_eq!(Pin::new(&mut rx).poll_next(&mut ctx), Poll::Pending);
810        assert!(!rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&a));
811        assert!(rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&b));
812    }
813
814    #[cfg(feature = "futures_api")]
815    #[cfg(not(miri))] // will_wake sometimes returns false under miri
816    #[proptest]
817    fn receiver_withdraws_waker_if_channel_not_empty(#[strategy(1..=10usize)] cap: usize, msg: u8) {
818        let (tx, mut rx) = ring_channel(NonZeroUsize::try_from(cap)?);
819
820        let waker = Arc::new(MockWaker).into();
821        let mut ctx = Context::from_waker(&waker);
822
823        assert_eq!(Pin::new(&mut rx).poll_next(&mut ctx), Poll::Pending);
824        assert!(rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&waker));
825
826        assert_eq!(tx.send(&msg), Ok(None));
827
828        assert_eq!(
829            Pin::new(&mut rx).poll_next(&mut ctx),
830            Poll::Ready(Some(&msg))
831        );
832
833        assert!(rx.handle.waitlist.get(rx.slot).is_none());
834    }
835
836    #[cfg(feature = "futures_api")]
837    #[proptest]
838    fn stream_wakes_on_disconnect(
839        #[strategy(1..=10usize)] m: usize,
840        #[strategy(1..=10usize)] n: usize,
841    ) {
842        let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
843        let (tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?);
844
845        let producer = repeat(tx)
846            .take(m)
847            .map(Ok)
848            .try_for_each_concurrent(None, |mut tx| {
849                spawn(async move { assert_eq!(tx.close().await, Ok(())) })
850            });
851
852        let consumer = repeat(rx)
853            .take(n)
854            .map(Ok)
855            .try_for_each_concurrent(None, |mut rx| {
856                spawn(async move { assert_eq!(rx.next().await, None) })
857            });
858
859        rt.block_on(async move {
860            timeout(Duration::from_secs(60), try_join(consumer, producer)).await
861        })??;
862    }
863
864    #[cfg(feature = "futures_api")]
865    #[proptest]
866    fn stream_wakes_on_sink(#[strategy(1..=10usize)] n: usize) {
867        let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
868        let (tx, rx) = ring_channel(NonZeroUsize::try_from(n)?);
869        let _prevent_disconnection = tx.clone();
870
871        let producer = repeat(tx)
872            .take(n)
873            .map(Ok)
874            .try_for_each_concurrent(None, |tx| {
875                spawn(async move { assert_eq!(iter(Some(Ok(()))).forward(tx).await, Ok(())) })
876            });
877
878        let consumer = repeat(rx)
879            .take(n)
880            .map(Ok)
881            .try_for_each_concurrent(None, |mut rx| {
882                spawn(async move { assert_eq!(rx.next().await, Some(())) })
883            });
884
885        rt.block_on(async move {
886            timeout(Duration::from_secs(60), try_join(consumer, producer)).await
887        })??;
888    }
889}