asupersync 0.3.4

Spec-first, cancel-correct, capability-secure async runtime for Rust.
Documentation
//! Stream adapters for channel receivers.
//!
//! These adapters provide a `Stream` view over channel receivers while
//! preserving Asupersync's explicit-capability model. A `Cx` is required
//! to perform receive operations.
//!
//! Phase 0 note: channel receive operations are currently blocking. These
//! adapters therefore block inside `poll_next` until a message arrives or
//! the channel closes. This will be replaced by non-blocking waker-based
//! integration in a later phase.

use crate::channel::mpsc;
use crate::channel::mpsc::RecvError;
use crate::cx::Cx;
use crate::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Stream wrapper for `mpsc::Receiver`.
#[derive(Debug)]
pub struct ReceiverStream<T> {
    inner: mpsc::Receiver<T>,
    cx: Cx,
    terminated: bool,
}

impl<T> ReceiverStream<T> {
    /// Creates a new stream wrapper with an explicit capability context.
    #[inline]
    #[must_use]
    pub fn new(cx: Cx, inner: mpsc::Receiver<T>) -> Self {
        cx.trace("stream::ReceiverStream created");
        Self {
            inner,
            cx,
            terminated: false,
        }
    }

    /// Returns a reference to the inner receiver.
    #[inline]
    #[must_use]
    pub fn get_ref(&self) -> &mpsc::Receiver<T> {
        &self.inner
    }

    /// Returns a mutable reference to the inner receiver.
    #[inline]
    pub fn get_mut(&mut self) -> &mut mpsc::Receiver<T> {
        &mut self.inner
    }

    /// Returns a reference to the capability context.
    #[inline]
    #[must_use]
    pub fn cx(&self) -> &Cx {
        &self.cx
    }

    /// Unwraps the stream into the inner receiver.
    #[inline]
    #[must_use]
    pub fn into_inner(mut self) -> mpsc::Receiver<T> {
        self.inner.clear_recv_waker();
        self.inner
    }
}

impl<T> Stream for ReceiverStream<T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, poll_cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        if this.terminated {
            return Poll::Ready(None);
        }

        match this.inner.poll_recv(&this.cx, poll_cx) {
            Poll::Ready(Ok(item)) => {
                this.cx.trace("stream::ReceiverStream yielded item");
                Poll::Ready(Some(item))
            }
            Poll::Ready(Err(RecvError::Disconnected | RecvError::Cancelled)) => {
                this.terminated = true;
                Poll::Ready(None)
            }
            Poll::Ready(Err(RecvError::Empty)) | Poll::Pending => Poll::Pending,
        }
    }
}

#[cfg(test)]
mod tests {
    #![allow(
        clippy::pedantic,
        clippy::nursery,
        clippy::expect_fun_call,
        clippy::map_unwrap_or,
        clippy::cast_possible_wrap,
        clippy::future_not_send
    )]
    use super::*;
    use std::sync::Arc;
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::task::Waker;

    fn noop_waker() -> Waker {
        Waker::noop().clone()
    }

    struct CountWaker(Arc<AtomicUsize>);

    use std::task::Wake;
    impl Wake for CountWaker {
        fn wake(self: Arc<Self>) {
            self.0.fetch_add(1, Ordering::SeqCst);
        }

        fn wake_by_ref(self: &Arc<Self>) {
            self.0.fetch_add(1, Ordering::SeqCst);
        }
    }

    fn counting_waker(counter: Arc<AtomicUsize>) -> Waker {
        Waker::from(Arc::new(CountWaker(counter)))
    }

    fn init_test(name: &str) {
        crate::test_utils::init_test_logging();
        crate::test_phase!(name);
    }

    fn collect_closed_stream<T>(mut stream: ReceiverStream<T>) -> Vec<T> {
        let waker = noop_waker();
        let mut cx = Context::from_waker(&waker);
        let mut out = Vec::new();
        loop {
            match Pin::new(&mut stream).poll_next(&mut cx) {
                Poll::Ready(Some(item)) => out.push(item),
                Poll::Ready(None) => return out,
                Poll::Pending => panic!("closed receiver stream unexpectedly returned Pending"),
            }
        }
    }

    fn collect_prefilled_input(input: &[i32]) -> Vec<i32> {
        let cx_recv: Cx = Cx::for_testing();
        let (tx, rx) = mpsc::channel(input.len().max(1));
        for &item in input {
            tx.try_send(item).expect("prefill send");
        }
        drop(tx);
        collect_closed_stream(ReceiverStream::new(cx_recv, rx))
    }

    #[test]
    fn receiver_stream_reads_messages() {
        init_test("receiver_stream_reads_messages");
        let _cx_send: Cx = Cx::for_testing();
        let cx_recv: Cx = Cx::for_testing();
        let (tx, rx) = mpsc::channel(4);

        tx.try_send(1).expect("send 1");
        tx.try_send(2).expect("send 2");
        tx.try_send(3).expect("send 3");
        drop(tx);

        let mut stream = ReceiverStream::new(cx_recv, rx);
        let waker = noop_waker();
        let mut cx = Context::from_waker(&waker);

        let poll = Pin::new(&mut stream).poll_next(&mut cx);
        let ok = matches!(poll, Poll::Ready(Some(1)));
        crate::assert_with_log!(ok, "poll 1", "Poll::Ready(Some(1))", poll);
        let poll = Pin::new(&mut stream).poll_next(&mut cx);
        let ok = matches!(poll, Poll::Ready(Some(2)));
        crate::assert_with_log!(ok, "poll 2", "Poll::Ready(Some(2))", poll);
        let poll = Pin::new(&mut stream).poll_next(&mut cx);
        let ok = matches!(poll, Poll::Ready(Some(3)));
        crate::assert_with_log!(ok, "poll 3", "Poll::Ready(Some(3))", poll);
        let poll = Pin::new(&mut stream).poll_next(&mut cx);
        let ok = matches!(poll, Poll::Ready(None));
        crate::assert_with_log!(ok, "poll done", "Poll::Ready(None)", poll);
        crate::test_complete!("receiver_stream_reads_messages");
    }

    #[test]
    fn receiver_stream_none_is_terminal_after_cancel() {
        init_test("receiver_stream_none_is_terminal_after_cancel");
        let cx_recv: Cx = Cx::for_testing();
        cx_recv.set_cancel_requested(true);
        let (tx, rx) = mpsc::channel(2);
        let mut stream = ReceiverStream::new(cx_recv.clone(), rx);

        let waker = noop_waker();
        let mut cx = Context::from_waker(&waker);

        let poll = Pin::new(&mut stream).poll_next(&mut cx);
        let first_none = matches!(poll, Poll::Ready(None));
        crate::assert_with_log!(first_none, "first poll none", true, first_none);

        cx_recv.set_cancel_requested(false);
        tx.try_send(7).expect("send after cancel clear");

        let poll = Pin::new(&mut stream).poll_next(&mut cx);
        let still_none = matches!(poll, Poll::Ready(None));
        crate::assert_with_log!(still_none, "stream remains terminated", true, still_none);
        crate::test_complete!("receiver_stream_none_is_terminal_after_cancel");
    }

    /// Invariant: poll_next returns Pending when channel is empty but sender
    /// is still alive.
    #[test]
    fn receiver_stream_pending_when_empty() {
        init_test("receiver_stream_pending_when_empty");
        let cx_recv: Cx = Cx::for_testing();
        let (_tx, rx) = mpsc::channel::<i32>(4);
        let mut stream = ReceiverStream::new(cx_recv, rx);

        let waker = noop_waker();
        let mut cx = Context::from_waker(&waker);

        // No messages sent, sender alive — should be Pending.
        let poll = Pin::new(&mut stream).poll_next(&mut cx);
        let is_pending = poll.is_pending();
        crate::assert_with_log!(is_pending, "empty channel is Pending", true, is_pending);

        crate::test_complete!("receiver_stream_pending_when_empty");
    }

    #[test]
    fn receiver_stream_pending_poll_keeps_waker_registration() {
        init_test("receiver_stream_pending_poll_keeps_waker_registration");
        let cx_recv: Cx = Cx::for_testing();
        let (tx, rx) = mpsc::channel::<i32>(4);
        let mut stream = ReceiverStream::new(cx_recv, rx);

        let wake_count = Arc::new(AtomicUsize::new(0));
        let waker = counting_waker(Arc::clone(&wake_count));
        let mut task_cx = Context::from_waker(&waker);

        let first = Pin::new(&mut stream).poll_next(&mut task_cx);
        let first_pending = matches!(first, Poll::Pending);
        crate::assert_with_log!(first_pending, "first poll pending", true, first_pending);

        tx.try_send(7).expect("send");
        let wake_total = wake_count.load(Ordering::SeqCst);
        crate::assert_with_log!(wake_total == 1, "single wake after send", 1, wake_total);

        let second = Pin::new(&mut stream).poll_next(&mut task_cx);
        let second_ready = matches!(second, Poll::Ready(Some(7)));
        crate::assert_with_log!(second_ready, "second poll has item", true, second_ready);

        crate::test_complete!("receiver_stream_pending_poll_keeps_waker_registration");
    }

    /// Invariant: accessors (get_ref, cx, into_inner) work correctly
    /// and preserve stream state.
    #[test]
    fn receiver_stream_accessors() {
        init_test("receiver_stream_accessors");
        let cx_recv: Cx = Cx::for_testing();
        let (tx, rx) = mpsc::channel::<i32>(4);
        tx.try_send(99).expect("send");

        let mut stream = ReceiverStream::new(cx_recv, rx);

        // get_ref returns reference to inner receiver.
        let _inner_ref = stream.get_ref();

        // get_mut returns mutable reference.
        let _inner_mut = stream.get_mut();

        // cx() returns reference to the Cx.
        let _cx_ref = stream.cx();

        // into_inner consumes stream and returns the receiver.
        let mut recovered = stream.into_inner();
        // The message should still be in the channel.
        let msg = recovered.try_recv();
        let got_99 = matches!(msg, Ok(99));
        crate::assert_with_log!(got_99, "message preserved after into_inner", true, got_99);

        crate::test_complete!("receiver_stream_accessors");
    }

    #[test]
    fn receiver_stream_into_inner_clears_pending_waker_registration() {
        init_test("receiver_stream_into_inner_clears_pending_waker_registration");
        let cx_recv: Cx = Cx::for_testing();
        let (tx, rx) = mpsc::channel::<i32>(4);
        let mut stream = ReceiverStream::new(cx_recv, rx);

        let wake_count = Arc::new(AtomicUsize::new(0));
        let waker = counting_waker(Arc::clone(&wake_count));
        let mut task_cx = Context::from_waker(&waker);

        let pending = Pin::new(&mut stream).poll_next(&mut task_cx);
        crate::assert_with_log!(
            pending.is_pending(),
            "first poll pending",
            true,
            pending.is_pending()
        );

        let mut recovered = stream.into_inner();
        tx.try_send(7).expect("send");

        let wake_total = wake_count.load(Ordering::SeqCst);
        crate::assert_with_log!(
            wake_total == 0,
            "into_inner clears stale stream waker",
            0usize,
            wake_total
        );

        let msg = recovered.try_recv();
        let got_7 = matches!(msg, Ok(7));
        crate::assert_with_log!(got_7, "recovered receiver still receives", true, got_7);

        crate::test_complete!("receiver_stream_into_inner_clears_pending_waker_registration");
    }

    #[test]
    fn mr_receiver_stream_split_sends_match_unsplit_fifo_order() {
        init_test("mr_receiver_stream_split_sends_match_unsplit_fifo_order");
        let input = vec![3, 1, 4, 1, 5, 9, 2, 6];
        let expected = collect_prefilled_input(&input);

        for split in 0..=input.len() {
            let cx_recv: Cx = Cx::for_testing();
            let (tx, rx) = mpsc::channel(input.len().max(1));
            for &item in &input[..split] {
                tx.try_send(item).expect("left split send");
            }
            for &item in &input[split..] {
                tx.try_send(item).expect("right split send");
            }
            drop(tx);

            let actual = collect_closed_stream(ReceiverStream::new(cx_recv, rx));
            crate::assert_with_log!(
                actual == expected,
                format!("split at {split}"),
                expected.clone(),
                actual
            );
        }
        crate::test_complete!("mr_receiver_stream_split_sends_match_unsplit_fifo_order");
    }

    #[test]
    fn mr_receiver_stream_pending_then_send_matches_prefilled_single_item() {
        init_test("mr_receiver_stream_pending_then_send_matches_prefilled_single_item");
        let expected = collect_prefilled_input(&[21]);

        let cx_recv: Cx = Cx::for_testing();
        let (tx, rx) = mpsc::channel::<i32>(1);
        let mut stream = ReceiverStream::new(cx_recv, rx);
        let wake_count = Arc::new(AtomicUsize::new(0));
        let waker = counting_waker(Arc::clone(&wake_count));
        let mut task_cx = Context::from_waker(&waker);

        let first = Pin::new(&mut stream).poll_next(&mut task_cx);
        crate::assert_with_log!(
            first.is_pending(),
            "empty live receiver is pending",
            true,
            first.is_pending()
        );
        crate::assert_with_log!(
            wake_count.load(Ordering::SeqCst) == 0,
            "no wake before send",
            0usize,
            wake_count.load(Ordering::SeqCst)
        );

        tx.try_send(21).expect("send after pending poll");
        drop(tx);
        crate::assert_with_log!(
            wake_count.load(Ordering::SeqCst) == 1,
            "pending waiter wakes exactly once",
            1usize,
            wake_count.load(Ordering::SeqCst)
        );

        let actual = collect_closed_stream(stream);
        crate::assert_with_log!(
            actual == expected,
            "pending-then-send output matches prefilled output",
            expected,
            actual
        );
        crate::test_complete!("mr_receiver_stream_pending_then_send_matches_prefilled_single_item");
    }
}