wit_bindgen/rt/async_support/
futures_stream.rs1use super::stream_support::{RawStreamReader, StreamOps, StreamVtable};
2use std::boxed::Box;
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9pub struct RawStreamReaderStream<O: StreamOps + 'static> {
14 state: StreamAdapterState<O>,
15}
16
17impl<O: StreamOps + 'static> Unpin for RawStreamReaderStream<O> {}
20
21pub type StreamReaderStream<T> = RawStreamReaderStream<&'static StreamVtable<T>>;
23
24type ReadNextFut<O> =
25 Pin<Box<dyn Future<Output = (RawStreamReader<O>, Option<<O as StreamOps>::Payload>)>>>;
26
27enum StreamAdapterState<O: StreamOps + 'static> {
28 Idle(RawStreamReader<O>),
30 Reading(ReadNextFut<O>),
32 Complete,
34}
35
36impl<O: StreamOps + 'static> RawStreamReaderStream<O> {
37 pub fn new(reader: RawStreamReader<O>) -> Self {
39 Self {
40 state: StreamAdapterState::Idle(reader),
41 }
42 }
43
44 pub fn into_inner(self) -> Option<RawStreamReader<O>> {
49 match self.state {
50 StreamAdapterState::Idle(reader) => Some(reader),
51 _ => None,
52 }
53 }
54}
55
56impl<O: StreamOps + 'static> futures::stream::Stream for RawStreamReaderStream<O> {
57 type Item = O::Payload;
58
59 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60 loop {
63 match std::mem::replace(&mut self.state, StreamAdapterState::Complete) {
64 StreamAdapterState::Idle(mut reader) => {
65 let fut: ReadNextFut<O> = Box::pin(async move {
66 let item = reader.next().await;
67 (reader, item)
68 });
69 self.state = StreamAdapterState::Reading(fut);
70 }
72 StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) {
73 Poll::Pending => {
74 self.state = StreamAdapterState::Reading(fut);
75 return Poll::Pending;
76 }
77 Poll::Ready((reader, Some(item))) => {
78 self.state = StreamAdapterState::Idle(reader);
79 return Poll::Ready(Some(item));
80 }
81 Poll::Ready((_reader, None)) => {
82 self.state = StreamAdapterState::Complete;
83 return Poll::Ready(None);
84 }
85 },
86 StreamAdapterState::Complete => {
87 self.state = StreamAdapterState::Complete;
88 return Poll::Ready(None);
89 }
90 }
91 }
92 }
93}
94
95impl<O: StreamOps + 'static> RawStreamReader<O> {
96 pub fn into_stream(self) -> RawStreamReaderStream<O> {
98 RawStreamReaderStream::new(self)
99 }
100}