wasm_streams/readable/
default_reader.rs

1use std::marker::PhantomData;
2
3use wasm_bindgen::JsCast;
4use wasm_bindgen::JsValue;
5use wasm_bindgen_futures::JsFuture;
6
7use crate::util::promise_to_void_future;
8
9use super::{sys, IntoStream, ReadableStream};
10
11/// A [`ReadableStreamDefaultReader`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader)
12/// that can be used to read chunks from a [`ReadableStream`](ReadableStream).
13///
14/// This is returned by the [`get_reader`](ReadableStream::get_reader) method.
15///
16/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
17#[derive(Debug)]
18pub struct ReadableStreamDefaultReader<'stream> {
19    raw: sys::ReadableStreamDefaultReader,
20    _stream: PhantomData<&'stream mut ReadableStream>,
21}
22
23impl<'stream> ReadableStreamDefaultReader<'stream> {
24    pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
25        Ok(Self {
26            raw: stream
27                .as_raw()
28                .unchecked_ref::<sys::ReadableStreamExt>()
29                .try_get_reader()?
30                .unchecked_into(),
31            _stream: PhantomData,
32        })
33    }
34
35    /// Acquires a reference to the underlying [JavaScript reader](sys::ReadableStreamDefaultReader).
36    #[inline]
37    pub fn as_raw(&self) -> &sys::ReadableStreamDefaultReader {
38        &self.raw
39    }
40
41    /// Waits for the stream to become closed.
42    ///
43    /// This returns an error if the stream ever errors, or if the reader's lock is
44    /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
45    /// closing.
46    pub async fn closed(&self) -> Result<(), JsValue> {
47        promise_to_void_future(self.as_raw().closed()).await
48    }
49
50    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
51    /// signaling a loss of interest in the stream by a consumer.
52    ///
53    /// Equivalent to [`ReadableStream.cancel`](ReadableStream::cancel).
54    pub async fn cancel(&mut self) -> Result<(), JsValue> {
55        promise_to_void_future(self.as_raw().cancel()).await
56    }
57
58    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
59    /// signaling a loss of interest in the stream by a consumer.
60    ///
61    /// Equivalent to [`ReadableStream.cancel_with_reason`](ReadableStream::cancel_with_reason).
62    pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
63        promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
64    }
65
66    /// Reads the next chunk from the stream's internal queue.
67    ///
68    /// * If a next `chunk` becomes available, this returns `Ok(Some(chunk))`.
69    /// * If the stream closes and no more chunks are available, this returns `Ok(None)`.
70    /// * If the stream encounters an `error`, this returns `Err(error)`.
71    pub async fn read(&mut self) -> Result<Option<JsValue>, JsValue> {
72        let promise = self.as_raw().read();
73        let js_result = JsFuture::from(promise).await?;
74        let result = sys::ReadableStreamReadResult::from(js_result);
75        if result.get_done().unwrap_or_default() {
76            Ok(None)
77        } else {
78            Ok(Some(result.get_value()))
79        }
80    }
81
82    /// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
83    /// corresponding stream.
84    ///
85    /// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
86    /// the Streams standard allows the lock to be released even when there are still pending read
87    /// requests. Such requests will automatically become rejected, and this function will always
88    /// succeed.
89    ///
90    /// However, if the Streams implementation is not yet up-to-date with this change, then
91    /// releasing the lock while there are pending read requests will **panic**. For a non-panicking
92    /// variant, use [`try_release_lock`](Self::try_release_lock).
93    #[inline]
94    pub fn release_lock(mut self) {
95        self.release_lock_mut()
96    }
97
98    fn release_lock_mut(&mut self) {
99        self.as_raw().release_lock()
100    }
101
102    /// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
103    /// corresponding stream.
104    ///
105    /// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
106    /// the Streams standard allows the lock to be released even when there are still pending read
107    /// requests. Such requests will automatically become rejected, and this function will always
108    /// return `Ok(())`.
109    ///
110    /// However, if the Streams implementation is not yet up-to-date with this change, then
111    /// the lock cannot be released while there are pending read requests. Attempting to do so will
112    /// return an error and leave the reader locked to the stream.
113    #[inline]
114    pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
115        self.as_raw()
116            .unchecked_ref::<sys::ReadableStreamReaderExt>()
117            .try_release_lock()
118            .map_err(|err| (err, self))
119    }
120
121    /// Converts this `ReadableStreamDefaultReader` into a [`Stream`].
122    ///
123    /// This is similar to [`ReadableStream.into_stream`](ReadableStream::into_stream),
124    /// except that after the returned `Stream` is dropped, the original `ReadableStream` is still
125    /// usable. This allows reading only a few chunks from the `Stream`, while still allowing
126    /// another reader to read the remaining chunks later on.
127    ///
128    /// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
129    #[inline]
130    pub fn into_stream(self) -> IntoStream<'stream> {
131        IntoStream::new(self, false)
132    }
133}
134
135impl Drop for ReadableStreamDefaultReader<'_> {
136    fn drop(&mut self) {
137        self.release_lock_mut();
138    }
139}