use crate::{control::ControlBlockRef, error::*};
use core::{mem::ManuallyDrop, num::NonZeroUsize, sync::atomic::*};
use derivative::Derivative;
#[cfg(feature = "futures_api")]
use crate::waitlist::Slot;
#[cfg(feature = "futures_api")]
use futures::{task::*, Sink, Stream};
#[cfg(feature = "futures_api")]
use core::{mem, pin::Pin};
#[derive(Derivative)]
#[derivative(Debug(bound = ""), Eq(bound = ""), PartialEq(bound = ""))]
pub struct RingSender<T> {
handle: ManuallyDrop<ControlBlockRef<T>>,
#[cfg(feature = "futures_api")]
#[derivative(PartialEq = "ignore")]
backoff: bool,
}
unsafe impl<T: Send> Send for RingSender<T> {}
unsafe impl<T: Send> Sync for RingSender<T> {}
impl<T> RingSender<T> {
fn new(handle: ManuallyDrop<ControlBlockRef<T>>) -> Self {
Self {
handle,
#[cfg(feature = "futures_api")]
backoff: false,
}
}
pub fn send(&mut self, message: T) -> Result<Option<T>, SendError<T>> {
if self.handle.receivers.load(Ordering::Acquire) > 0 {
let overwritten = self.handle.buffer.push(message);
#[cfg(feature = "futures_api")]
if overwritten.is_none() {
fence(Ordering::SeqCst);
if let Some(waker) = self.handle.waitlist.pop() {
waker.wake();
}
}
Ok(overwritten)
} else {
Err(SendError::Disconnected(message))
}
}
}
impl<T> Clone for RingSender<T> {
fn clone(&self) -> Self {
self.handle.senders.fetch_add(1, Ordering::Relaxed);
RingSender::new(self.handle.clone())
}
}
impl<T> Drop for RingSender<T> {
fn drop(&mut self) {
if self.handle.senders.fetch_sub(1, Ordering::AcqRel) == 1 {
#[cfg(feature = "futures_api")]
{
fence(Ordering::SeqCst);
while let Some(waker) = self.handle.waitlist.pop() {
waker.wake();
}
}
if !self.handle.connected.swap(false, Ordering::AcqRel) {
unsafe { ManuallyDrop::drop(&mut self.handle) }
}
}
}
}
#[cfg(feature = "futures_api")]
impl<T> Sink<T> for RingSender<T> {
type Error = SendError<T>;
#[inline]
fn poll_ready(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(ctx)
}
#[inline]
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.backoff = self.send(item)?.is_some();
Ok(())
}
#[inline]
fn poll_flush(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
if mem::take(&mut self.backoff) {
ctx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
#[inline]
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
#[derive(Derivative)]
#[derivative(Debug(bound = ""), Eq(bound = ""), PartialEq(bound = ""))]
pub struct RingReceiver<T> {
handle: ManuallyDrop<ControlBlockRef<T>>,
#[cfg(feature = "futures_api")]
#[derivative(PartialEq = "ignore")]
slot: Slot,
}
unsafe impl<T: Send> Send for RingReceiver<T> {}
unsafe impl<T: Send> Sync for RingReceiver<T> {}
impl<T> RingReceiver<T> {
fn new(handle: ManuallyDrop<ControlBlockRef<T>>) -> Self {
Self {
#[cfg(feature = "futures_api")]
slot: handle.waitlist.register(),
handle,
}
}
#[cfg(feature = "futures_api")]
pub fn recv(&mut self) -> Result<T, RecvError> {
futures::executor::block_on(futures::StreamExt::next(self)).ok_or(RecvError::Disconnected)
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
if self.handle.senders.load(Ordering::Acquire) > 0 {
self.handle.buffer.pop().ok_or(TryRecvError::Empty)
} else {
self.handle.buffer.pop().ok_or(TryRecvError::Disconnected)
}
}
}
impl<T> Clone for RingReceiver<T> {
fn clone(&self) -> Self {
self.handle.receivers.fetch_add(1, Ordering::Relaxed);
RingReceiver::new(self.handle.clone())
}
}
impl<T> Drop for RingReceiver<T> {
fn drop(&mut self) {
#[cfg(feature = "futures_api")]
if self.handle.waitlist.deregister(self.slot).is_none() {
if let Some(waker) = self.handle.waitlist.pop() {
waker.wake();
}
}
if self.handle.receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
if !self.handle.connected.swap(false, Ordering::AcqRel) {
unsafe { ManuallyDrop::drop(&mut self.handle) }
}
}
}
}
#[cfg(feature = "futures_api")]
impl<T> Stream for RingReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.try_recv() {
result @ Ok(_) | result @ Err(TryRecvError::Disconnected) => {
self.handle.waitlist.remove(self.slot);
Poll::Ready(result.ok())
}
Err(TryRecvError::Empty) => {
self.handle.waitlist.insert(self.slot, ctx.waker().clone());
fence(Ordering::SeqCst);
match self.try_recv() {
result @ Ok(_) | result @ Err(TryRecvError::Disconnected) => {
self.handle.waitlist.remove(self.slot);
Poll::Ready(result.ok())
}
Err(TryRecvError::Empty) => Poll::Pending,
}
}
}
}
}
pub fn ring_channel<T>(capacity: NonZeroUsize) -> (RingSender<T>, RingReceiver<T>) {
let handle = ManuallyDrop::new(ControlBlockRef::new(capacity.get()));
(RingSender::new(handle.clone()), RingReceiver::new(handle))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Void;
use alloc::vec::Vec;
use core::{cmp::min, iter};
use futures::stream::{iter, repeat};
use futures::{future::try_join_all, prelude::*};
use proptest::collection::size_range;
use test_strategy::proptest;
use tokio::runtime;
use tokio::task::spawn_blocking;
#[cfg(feature = "futures_api")]
use core::time::Duration;
#[cfg(feature = "futures_api")]
use alloc::{sync::Arc, task::Wake};
#[cfg(feature = "futures_api")]
use futures::future::try_join;
#[cfg(feature = "futures_api")]
use tokio::{task::spawn, time::timeout};
#[cfg(feature = "futures_api")]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
struct MockWaker;
#[cfg(feature = "futures_api")]
impl Wake for MockWaker {
fn wake(self: Arc<Self>) {}
}
#[proptest]
fn ring_channel_is_associated_with_a_single_control_block() {
let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
assert_eq!(tx.handle, rx.handle);
}
#[proptest]
fn senders_are_equal_if_they_are_associated_with_the_same_ring_channel() {
let (s1, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
let (s2, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
assert_eq!(s1, s1.clone());
assert_eq!(s2, s2.clone());
assert_ne!(s1, s2);
}
#[cfg(feature = "futures_api")]
#[proptest]
fn senders_are_equal_even_if_backoff_is_different() {
let (mut tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
tx.backoff = true;
assert_eq!(tx, tx.clone());
}
#[proptest]
fn receivers_are_equal_if_they_are_associated_with_the_same_ring_channel() {
let (_, r1) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
let (_, r2) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
assert_eq!(r1, r1.clone());
assert_eq!(r2, r2.clone());
assert_ne!(r1, r2);
}
#[proptest]
fn cloning_sender_increments_senders() {
let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
#[allow(clippy::redundant_clone)]
let tx = tx.clone();
assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 2);
assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 1);
}
#[cfg(feature = "futures_api")]
#[proptest]
fn cloning_sender_resets_backoff_flag() {
let (mut tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
tx.backoff = true;
assert_ne!(tx.clone().backoff, tx.backoff);
}
#[proptest]
fn cloning_receiver_increments_receivers_counter() {
let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
#[allow(clippy::redundant_clone)]
let rx = rx.clone();
assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 1);
assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 2);
}
#[cfg(feature = "futures_api")]
#[proptest]
fn cloning_receiver_registers_waitlist_slot() {
let (_, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
assert_ne!(rx.clone().slot, rx.slot);
}
#[proptest]
fn dropping_sender_decrements_senders_counter() {
let (_, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
assert_eq!(rx.handle.senders.load(Ordering::SeqCst), 0);
assert_eq!(rx.handle.receivers.load(Ordering::SeqCst), 1);
}
#[proptest]
fn dropping_receiver_decrements_receivers_counter() {
let (tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
assert_eq!(tx.handle.senders.load(Ordering::SeqCst), 1);
assert_eq!(tx.handle.receivers.load(Ordering::SeqCst), 0);
}
#[cfg(feature = "futures_api")]
#[proptest]
fn dropping_receiver_deregisters_waitlist_slot() {
let (tx, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
assert_eq!(tx.handle.waitlist.len(), 1);
drop(rx);
assert_eq!(tx.handle.waitlist.len(), 0);
}
#[proptest]
fn channel_is_disconnected_if_there_are_no_senders() {
let (_, rx) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
assert_eq!(rx.handle.senders.load(Ordering::SeqCst), 0);
assert!(!rx.handle.connected.load(Ordering::SeqCst));
}
#[proptest]
fn channel_is_disconnected_if_there_are_no_receivers() {
let (tx, _) = ring_channel::<Void>(NonZeroUsize::try_from(1)?);
assert_eq!(tx.handle.receivers.load(Ordering::SeqCst), 0);
assert!(!tx.handle.connected.load(Ordering::SeqCst));
}
#[proptest]
fn endpoints_are_safe_to_send_across_threads() {
fn must_be_send(_: impl Send) {}
must_be_send(ring_channel::<Void>(NonZeroUsize::try_from(1)?));
}
#[proptest]
fn endpoints_are_safe_to_share_across_threads() {
fn must_be_sync(_: impl Sync) {}
must_be_sync(ring_channel::<Void>(NonZeroUsize::try_from(1)?));
}
#[proptest]
fn send_succeeds_on_connected_channel(
#[strategy(1..=10usize)] cap: usize,
#[any(size_range(1..=10).lift())] msgs: Vec<u8>,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (tx, _rx) = ring_channel(NonZeroUsize::try_from(cap)?);
rt.block_on(iter(msgs).map(Ok).try_for_each_concurrent(None, |msg| {
let mut tx = tx.clone();
spawn_blocking(move || assert!(tx.send(msg).is_ok()))
}))?;
}
#[proptest]
fn send_fails_on_disconnected_channel(
#[strategy(1..=10usize)] cap: usize,
#[any(size_range(1..=10).lift())] msgs: Vec<u8>,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (tx, _) = ring_channel(NonZeroUsize::try_from(cap)?);
rt.block_on(iter(msgs).map(Ok).try_for_each_concurrent(None, |msg| {
let mut tx = tx.clone();
spawn_blocking(move || assert_eq!(tx.send(msg), Err(SendError::Disconnected(msg))))
}))?;
}
#[proptest]
fn send_overwrites_old_messages(
#[strategy(1..=10usize)] cap: usize,
#[any(size_range(#cap..=10).lift())] msgs: Vec<u8>,
) {
let (mut tx, rx) = ring_channel(NonZeroUsize::try_from(cap)?);
let overwritten = msgs.len() - min(msgs.len(), cap);
for &msg in &msgs[..cap] {
assert_eq!(tx.send(msg), Ok(None));
}
for (&prev, &msg) in msgs.iter().zip(&msgs[cap..]) {
assert_eq!(tx.send(msg), Ok(Some(prev)));
}
assert_eq!(
iter::from_fn(|| rx.handle.buffer.pop()).collect::<Vec<_>>(),
&msgs[overwritten..]
);
}
#[proptest]
fn try_recv_succeeds_on_non_empty_connected_channel(
#[any(size_range(1..=10).lift())] msgs: Vec<u8>,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (tx, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
for msg in msgs.iter().cloned().enumerate() {
tx.handle.buffer.push(msg);
}
let mut received = rt.block_on(async {
try_join_all(
iter::repeat(rx)
.take(msgs.len())
.map(|mut rx| spawn_blocking(move || rx.try_recv().unwrap())),
)
.await
})?;
received.sort_by_key(|(k, _)| *k);
assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
}
#[proptest]
fn try_recv_succeeds_on_non_empty_disconnected_channel(
#[any(size_range(1..=10).lift())] msgs: Vec<u8>,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (_, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
for msg in msgs.iter().cloned().enumerate() {
rx.handle.buffer.push(msg);
}
let mut received = rt.block_on(async {
try_join_all(
iter::repeat(rx)
.take(msgs.len())
.map(|mut rx| spawn_blocking(move || rx.try_recv().unwrap())),
)
.await
})?;
received.sort_by_key(|(k, _)| *k);
assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
}
#[proptest]
fn try_recv_fails_on_empty_connected_channel(
#[strategy(1..=10usize)] cap: usize,
#[strategy(1..=10usize)] n: usize,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (_tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
rt.block_on(
repeat(rx)
.take(n)
.map(Ok)
.try_for_each_concurrent(None, |mut rx| {
spawn_blocking(move || assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)))
}),
)?;
}
#[proptest]
fn try_recv_fails_on_empty_disconnected_channel(
#[strategy(1..=10usize)] cap: usize,
#[strategy(1..=10usize)] n: usize,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (_, rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
rt.block_on(
repeat(rx)
.take(n)
.map(Ok)
.try_for_each_concurrent(None, |mut rx| {
spawn_blocking(move || {
assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected))
})
}),
)?;
}
#[cfg(feature = "futures_api")]
#[proptest]
fn recv_succeeds_on_non_empty_connected_channel(
#[any(size_range(1..=10).lift())] msgs: Vec<u8>,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (tx, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
for msg in msgs.iter().cloned().enumerate() {
tx.handle.buffer.push(msg);
}
let mut received = rt.block_on(async {
try_join_all(
iter::repeat(rx)
.take(msgs.len())
.map(|mut rx| spawn_blocking(move || rx.recv().unwrap())),
)
.await
})?;
received.sort_by_key(|(k, _)| *k);
assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
}
#[cfg(feature = "futures_api")]
#[proptest]
fn recv_succeeds_on_non_empty_disconnected_channel(
#[any(size_range(1..=10).lift())] msgs: Vec<u8>,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (_, rx) = ring_channel(NonZeroUsize::try_from(msgs.len())?);
for msg in msgs.iter().cloned().enumerate() {
rx.handle.buffer.push(msg);
}
let mut received = rt.block_on(async {
try_join_all(
iter::repeat(rx)
.take(msgs.len())
.map(|mut rx| spawn_blocking(move || rx.recv().unwrap())),
)
.await
})?;
received.sort_by_key(|(k, _)| *k);
assert_eq!(received, msgs.into_iter().enumerate().collect::<Vec<_>>());
}
#[cfg(feature = "futures_api")]
#[proptest]
fn recv_fails_on_empty_disconnected_channel(
#[strategy(1..=10usize)] cap: usize,
#[strategy(1..=10usize)] n: usize,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (_, rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
rt.block_on(
repeat(rx)
.take(n)
.map(Ok)
.try_for_each_concurrent(None, |mut rx| {
spawn_blocking(move || assert_eq!(rx.recv(), Err(RecvError::Disconnected)))
}),
)?;
}
#[cfg(feature = "futures_api")]
#[proptest]
fn recv_wakes_on_disconnect(
#[strategy(1..=10usize)] m: usize,
#[strategy(1..=10usize)] n: usize,
) {
let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
let (tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?);
let producer = repeat(tx)
.take(m)
.map(Ok)
.try_for_each_concurrent(None, |tx| spawn_blocking(move || drop(tx)));
let consumer = repeat(rx)
.take(n)
.map(Ok)
.try_for_each_concurrent(None, |mut rx| {
spawn_blocking(move || assert_eq!(rx.recv(), Err(RecvError::Disconnected)))
});
rt.block_on(async move {
timeout(Duration::from_secs(60), try_join(consumer, producer)).await
})??;
}
#[cfg(feature = "futures_api")]
#[proptest]
fn recv_wakes_on_send(#[strategy(1..=10usize)] n: usize) {
let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
let (tx, rx) = ring_channel(NonZeroUsize::try_from(n)?);
let _prevent_disconnection = tx.clone();
let producer = repeat(tx)
.take(n)
.map(Ok)
.try_for_each_concurrent(None, |mut tx| {
spawn_blocking(move || assert!(tx.send(()).is_ok()))
});
let consumer = repeat(rx)
.take(n)
.map(Ok)
.try_for_each_concurrent(None, |mut rx| {
spawn_blocking(move || assert_eq!(rx.recv(), Ok(())))
});
rt.block_on(async move {
timeout(Duration::from_secs(60), try_join(consumer, producer)).await
})??;
}
#[cfg(feature = "futures_api")]
#[proptest]
fn sender_implements_sink(
#[strategy(1..=10usize)] cap: usize,
#[any(size_range(1..=10).lift())] msgs: Vec<u8>,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (mut tx, mut rx) = ring_channel(NonZeroUsize::try_from(cap)?);
let overwritten = msgs.len() - min(msgs.len(), cap);
assert_eq!(rt.block_on(iter(&msgs).map(Ok).forward(&mut tx)), Ok(()));
drop(tx);
assert_eq!(
iter::from_fn(|| rx.try_recv().ok().copied()).collect::<Vec<_>>(),
&msgs[overwritten..]
);
}
#[cfg(feature = "futures_api")]
#[proptest]
fn sender_sets_backoff_to_true_if_sink_overwrites_on_send() {
let (mut tx, _rx) = ring_channel(NonZeroUsize::try_from(1)?);
assert!(!tx.backoff);
assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
assert!(!tx.backoff);
assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
assert!(tx.backoff);
}
#[cfg(feature = "futures_api")]
#[proptest]
fn sender_yields_once_on_poll_ready_if_backoff_is_true(#[strategy(1..=10usize)] cap: usize) {
let (mut tx, _) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
tx.backoff = true;
let waker = Arc::new(MockWaker).into();
let mut ctx = Context::from_waker(&waker);
assert_eq!(Pin::new(&mut tx).poll_ready(&mut ctx), Poll::Pending);
assert!(!tx.backoff);
assert_eq!(Pin::new(&mut tx).poll_ready(&mut ctx), Poll::Ready(Ok(())));
}
#[cfg(feature = "futures_api")]
#[proptest]
fn receiver_implements_stream(
#[strategy(1..=10usize)] cap: usize,
#[any(size_range(1..=10).lift())] msgs: Vec<u8>,
) {
let rt = runtime::Builder::new_multi_thread().build()?;
let (mut tx, rx) = ring_channel(NonZeroUsize::try_from(cap)?);
let overwritten = msgs.len() - min(msgs.len(), cap);
for &msg in &msgs {
assert!(tx.send(msg).is_ok());
}
drop(tx);
assert_eq!(rt.block_on(rx.collect::<Vec<_>>()), &msgs[overwritten..]);
}
#[cfg(feature = "futures_api")]
#[cfg(not(miri))] #[proptest]
fn receiver_stores_most_recent_waker_if_channel_is_empty(#[strategy(1..=10usize)] cap: usize) {
let (_tx, mut rx) = ring_channel::<()>(NonZeroUsize::try_from(cap)?);
let a = Arc::new(MockWaker).into();
let mut ctx = Context::from_waker(&a);
assert_eq!(Pin::new(&mut rx).poll_next(&mut ctx), Poll::Pending);
assert!(rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&a));
let b = Arc::new(MockWaker).into();
let mut ctx = Context::from_waker(&b);
assert_eq!(Pin::new(&mut rx).poll_next(&mut ctx), Poll::Pending);
assert!(!rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&a));
assert!(rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&b));
}
#[cfg(feature = "futures_api")]
#[cfg(not(miri))] #[proptest]
fn receiver_withdraws_waker_if_channel_not_empty(#[strategy(1..=10usize)] cap: usize, msg: u8) {
let (mut tx, mut rx) = ring_channel(NonZeroUsize::try_from(cap)?);
let waker = Arc::new(MockWaker).into();
let mut ctx = Context::from_waker(&waker);
assert_eq!(Pin::new(&mut rx).poll_next(&mut ctx), Poll::Pending);
assert!(rx.handle.waitlist.get(rx.slot).unwrap().will_wake(&waker));
assert_eq!(tx.send(&msg), Ok(None));
assert_eq!(
Pin::new(&mut rx).poll_next(&mut ctx),
Poll::Ready(Some(&msg))
);
assert!(rx.handle.waitlist.get(rx.slot).is_none());
}
#[cfg(feature = "futures_api")]
#[proptest]
fn stream_wakes_on_disconnect(
#[strategy(1..=10usize)] m: usize,
#[strategy(1..=10usize)] n: usize,
) {
let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
let (tx, rx) = ring_channel::<()>(NonZeroUsize::try_from(1)?);
let producer = repeat(tx)
.take(m)
.map(Ok)
.try_for_each_concurrent(None, |mut tx| {
spawn(async move { assert_eq!(tx.close().await, Ok(())) })
});
let consumer = repeat(rx)
.take(n)
.map(Ok)
.try_for_each_concurrent(None, |mut rx| {
spawn(async move { assert_eq!(rx.next().await, None) })
});
rt.block_on(async move {
timeout(Duration::from_secs(60), try_join(consumer, producer)).await
})??;
}
#[cfg(feature = "futures_api")]
#[proptest]
fn stream_wakes_on_sink(#[strategy(1..=10usize)] n: usize) {
let rt = runtime::Builder::new_multi_thread().enable_time().build()?;
let (tx, rx) = ring_channel(NonZeroUsize::try_from(n)?);
let _prevent_disconnection = tx.clone();
let producer = repeat(tx)
.take(n)
.map(Ok)
.try_for_each_concurrent(None, |tx| {
spawn(async move { assert_eq!(iter(Some(Ok(()))).forward(tx).await, Ok(())) })
});
let consumer = repeat(rx)
.take(n)
.map(Ok)
.try_for_each_concurrent(None, |mut rx| {
spawn(async move { assert_eq!(rx.next().await, Some(())) })
});
rt.block_on(async move {
timeout(Duration::from_secs(60), try_join(consumer, producer)).await
})??;
}
}