Skip to main content

wit_bindgen/rt/async_support/
futures_stream.rs

1use super::stream_support::{RawStreamReader, StreamOps, StreamVtable};
2use std::boxed::Box;
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9/// A wrapper around [`RawStreamReader`] that implements [`futures::Stream`].
10///
11/// Obtain one via [`RawStreamReader::into_stream`] or
12/// [`RawStreamReaderStream::new`].
13pub struct RawStreamReaderStream<O: StreamOps + 'static> {
14    state: StreamAdapterState<O>,
15}
16
17// SAFETY: No field is structurally pinned. The inner `Pin<Box<dyn Future>>`
18// is itself `Unpin`, and `RawStreamReader` is only stored when idle.
19impl<O: StreamOps + 'static> Unpin for RawStreamReaderStream<O> {}
20
21/// Convenience alias for the common vtable-based case.
22pub type StreamReaderStream<T> = RawStreamReaderStream<&'static StreamVtable<T>>;
23
24type ReadNextFut<O> =
25    Pin<Box<dyn Future<Output = (RawStreamReader<O>, Option<<O as StreamOps>::Payload>)>>>;
26
27enum StreamAdapterState<O: StreamOps + 'static> {
28    /// The reader is idle and ready for the next read.
29    Idle(RawStreamReader<O>),
30    /// A read is in progress.
31    Reading(ReadNextFut<O>),
32    /// The stream has been exhausted.
33    Complete,
34}
35
36impl<O: StreamOps + 'static> RawStreamReaderStream<O> {
37    /// Create a new [`futures::Stream`] wrapper from a [`RawStreamReader`].
38    pub fn new(reader: RawStreamReader<O>) -> Self {
39        Self {
40            state: StreamAdapterState::Idle(reader),
41        }
42    }
43
44    /// Recover the underlying [`RawStreamReader`], if no read is in flight.
45    ///
46    /// Returns `None` when a read is currently in progress or the stream has
47    /// already finished.
48    pub fn into_inner(self) -> Option<RawStreamReader<O>> {
49        match self.state {
50            StreamAdapterState::Idle(reader) => Some(reader),
51            _ => None,
52        }
53    }
54}
55
56impl<O: StreamOps + 'static> futures::stream::Stream for RawStreamReaderStream<O> {
57    type Item = O::Payload;
58
59    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60        // All variants of `StreamAdapterState` are `Unpin`, so `Pin<&mut Self>`
61        // can be freely projected.
62        loop {
63            match std::mem::replace(&mut self.state, StreamAdapterState::Complete) {
64                StreamAdapterState::Idle(mut reader) => {
65                    let fut: ReadNextFut<O> = Box::pin(async move {
66                        let item = reader.next().await;
67                        (reader, item)
68                    });
69                    self.state = StreamAdapterState::Reading(fut);
70                    // Loop to immediately poll the new future.
71                }
72                StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) {
73                    Poll::Pending => {
74                        self.state = StreamAdapterState::Reading(fut);
75                        return Poll::Pending;
76                    }
77                    Poll::Ready((reader, Some(item))) => {
78                        self.state = StreamAdapterState::Idle(reader);
79                        return Poll::Ready(Some(item));
80                    }
81                    Poll::Ready((_reader, None)) => {
82                        self.state = StreamAdapterState::Complete;
83                        return Poll::Ready(None);
84                    }
85                },
86                StreamAdapterState::Complete => {
87                    self.state = StreamAdapterState::Complete;
88                    return Poll::Ready(None);
89                }
90            }
91        }
92    }
93}
94
95impl<O: StreamOps + 'static> RawStreamReader<O> {
96    /// Convert this reader into a [`futures::Stream`].
97    pub fn into_stream(self) -> RawStreamReaderStream<O> {
98        RawStreamReaderStream::new(self)
99    }
100}