wasm_streams/readable/
into_stream.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::ready;
5use futures_util::stream::{FusedStream, Stream};
6use futures_util::FutureExt;
7use wasm_bindgen::prelude::*;
8use wasm_bindgen_futures::JsFuture;
9
10use super::sys::ReadableStreamReadResult;
11use super::ReadableStreamDefaultReader;
12
13/// A [`Stream`] for the [`into_stream`](super::ReadableStream::into_stream) method.
14///
15/// This `Stream` holds a reader, and therefore locks the [`ReadableStream`](super::ReadableStream).
16/// When this `Stream` is dropped, it also drops its reader which in turn
17/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
18///
19/// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
20#[must_use = "streams do nothing unless polled"]
21#[derive(Debug)]
22pub struct IntoStream<'reader> {
23    reader: Option<ReadableStreamDefaultReader<'reader>>,
24    fut: Option<JsFuture>,
25    cancel_on_drop: bool,
26}
27
28impl<'reader> IntoStream<'reader> {
29    #[inline]
30    pub(super) fn new(reader: ReadableStreamDefaultReader, cancel_on_drop: bool) -> IntoStream {
31        IntoStream {
32            reader: Some(reader),
33            fut: None,
34            cancel_on_drop,
35        }
36    }
37
38    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
39    /// signaling a loss of interest in the stream by a consumer.
40    pub async fn cancel(mut self) -> Result<(), JsValue> {
41        match self.reader.take() {
42            Some(mut reader) => reader.cancel().await,
43            None => Ok(()),
44        }
45    }
46
47    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
48    /// signaling a loss of interest in the stream by a consumer.
49    pub async fn cancel_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
50        match self.reader.take() {
51            Some(mut reader) => reader.cancel_with_reason(reason).await,
52            None => Ok(()),
53        }
54    }
55}
56
57impl FusedStream for IntoStream<'_> {
58    fn is_terminated(&self) -> bool {
59        self.reader.is_none() && self.fut.is_none()
60    }
61}
62
63impl<'reader> Stream for IntoStream<'reader> {
64    type Item = Result<JsValue, JsValue>;
65
66    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        let read_fut = match self.fut.as_mut() {
68            Some(fut) => fut,
69            None => match &self.reader {
70                Some(reader) => {
71                    // No pending read
72                    // Start reading the next chunk and create future from read promise
73                    let fut = JsFuture::from(reader.as_raw().read());
74                    self.fut.insert(fut)
75                }
76                None => {
77                    // Reader was already dropped
78                    return Poll::Ready(None);
79                }
80            },
81        };
82
83        // Poll the future for the pending read
84        let js_result = ready!(read_fut.poll_unpin(cx));
85        self.fut = None;
86
87        // Read completed
88        Poll::Ready(match js_result {
89            Ok(js_value) => {
90                let result = ReadableStreamReadResult::from(js_value);
91                if result.get_done().unwrap_or_default() {
92                    // End of stream, drop reader
93                    self.reader = None;
94                    None
95                } else {
96                    Some(Ok(result.get_value()))
97                }
98            }
99            Err(js_value) => {
100                // Error, drop reader
101                self.reader = None;
102                Some(Err(js_value))
103            }
104        })
105    }
106}
107
108impl<'reader> Drop for IntoStream<'reader> {
109    fn drop(&mut self) {
110        if self.cancel_on_drop {
111            if let Some(reader) = self.reader.take() {
112                let on_rejected = Closure::once(|_| {});
113                let _ = reader.as_raw().cancel().catch(&on_rejected);
114                on_rejected.forget();
115            }
116        }
117    }
118}