wasm_streams/readable/
byob_reader.rs

1use std::marker::PhantomData;
2
3use js_sys::{Object, Uint8Array};
4use wasm_bindgen::{JsCast, JsValue};
5use wasm_bindgen_futures::JsFuture;
6
7use crate::util::{checked_cast_to_usize, clamp_to_u32, promise_to_void_future};
8
9use super::{sys, IntoAsyncRead, ReadableStream};
10
11/// A [`ReadableStreamBYOBReader`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader)
12/// that can be used to read chunks from a [`ReadableStream`](ReadableStream).
13///
14/// This is returned by the [`get_byob_reader`](ReadableStream::get_byob_reader) method.
15///
16/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
17#[derive(Debug)]
18pub struct ReadableStreamBYOBReader<'stream> {
19    raw: sys::ReadableStreamBYOBReader,
20    _stream: PhantomData<&'stream mut ReadableStream>,
21}
22
23impl<'stream> ReadableStreamBYOBReader<'stream> {
24    pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
25        let reader_options = sys::ReadableStreamGetReaderOptions::new();
26        reader_options.set_mode(sys::ReadableStreamReaderMode::Byob);
27        Ok(Self {
28            raw: stream
29                .as_raw()
30                .unchecked_ref::<sys::ReadableStreamExt>()
31                .try_get_reader_with_options(&reader_options)?
32                .unchecked_into(),
33            _stream: PhantomData,
34        })
35    }
36
37    /// Acquires a reference to the underlying [JavaScript reader](sys::ReadableStreamBYOBReader).
38    #[inline]
39    pub fn as_raw(&self) -> &sys::ReadableStreamBYOBReader {
40        &self.raw
41    }
42
43    /// Waits for the stream to become closed.
44    ///
45    /// This returns an error if the stream ever errors, or if the reader's lock is
46    /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
47    /// closing.
48    pub async fn closed(&self) -> Result<(), JsValue> {
49        promise_to_void_future(self.as_raw().closed()).await
50    }
51
52    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
53    /// signaling a loss of interest in the stream by a consumer.
54    ///
55    /// Equivalent to [`ReadableStream.cancel`](ReadableStream::cancel).
56    pub async fn cancel(&mut self) -> Result<(), JsValue> {
57        promise_to_void_future(self.as_raw().cancel()).await
58    }
59
60    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
61    /// signaling a loss of interest in the stream by a consumer.
62    ///
63    /// Equivalent to [`ReadableStream.cancel_with_reason`](ReadableStream::cancel_with_reason).
64    pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
65        promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
66    }
67
68    /// Reads the next chunk from the stream's internal queue into `dst`,
69    /// and returns the number of bytes read.
70    ///
71    /// * If some bytes were read into `dst`, this returns `Ok(bytes_read)`.
72    /// * If the stream closes and no more bytes are available, this returns `Ok(0)`.
73    /// * If the stream cancels, this returns `Ok(0)`.
74    /// * If the stream encounters an `error`, this returns `Err(error)`.
75    ///
76    /// This always allocated a new temporary `Uint8Array` with the same size as `dst` to hold
77    /// the result before copying to `dst`. We cannot pass a view on the backing WebAssembly memory
78    /// directly, because:
79    /// * `reader.read(view)` needs to transfer `view.buffer`, but `WebAssembly.Memory` buffers
80    ///   are non-transferable.
81    /// * `view.buffer` can be invalidated if the WebAssembly memory grows while `read(view)`
82    ///   is still in progress.
83    ///
84    /// Therefore, it is necessary to use a separate buffer living in the JavaScript heap.
85    /// To avoid repeated allocations for repeated reads,
86    /// use [`read_with_buffer`](Self::read_with_buffer).
87    pub async fn read(&mut self, dst: &mut [u8]) -> Result<usize, JsValue> {
88        let buffer = Uint8Array::new_with_length(clamp_to_u32(dst.len()));
89        let (bytes_read, _) = self.read_with_buffer(dst, buffer).await?;
90        Ok(bytes_read)
91    }
92
93    /// Reads the next chunk from the stream's internal queue into `dst`,
94    /// and returns the number of bytes read.
95    ///
96    /// The given `buffer` is used to store the bytes before they are copied to `dst`.
97    /// This buffer is returned back together with the result, so it can be re-used for subsequent
98    /// reads without extra allocations. Note that the underlying `ArrayBuffer` is transferred
99    /// in the process, so any other views on the original buffer will become unusable.
100    ///
101    /// * If some bytes were read into `dst`, this returns `Ok((bytes_read, Some(buffer)))`.
102    /// * If the stream closes and no more bytes are available, this returns `Ok((0, Some(buffer)))`.
103    /// * If the stream cancels, this returns `Ok((0, None))`. In this case, the given buffer is
104    ///   not returned.
105    /// * If the stream encounters an `error`, this returns `Err(error)`.
106    pub async fn read_with_buffer(
107        &mut self,
108        dst: &mut [u8],
109        buffer: Uint8Array,
110    ) -> Result<(usize, Option<Uint8Array>), JsValue> {
111        // Save the original buffer's byte offset and length.
112        let buffer_offset = buffer.byte_offset();
113        let buffer_len = buffer.byte_length();
114        // Limit view to destination slice's length.
115        let dst_len = clamp_to_u32(dst.len());
116        let view = buffer.subarray(0, dst_len).unchecked_into::<Object>();
117        // Read into view. This transfers `buffer.buffer()`.
118        let promise = self.as_raw().read_with_array_buffer_view(&view);
119        let js_result = JsFuture::from(promise).await?;
120        let result = sys::ReadableStreamReadResult::from(js_result);
121        let js_value = result.get_value();
122        let filled_view = if js_value.is_undefined() {
123            // No new view was returned. The stream must have been canceled.
124            assert!(result.get_done().unwrap_or_default());
125            return Ok((0, None));
126        } else {
127            js_value.unchecked_into::<Uint8Array>()
128        };
129        let filled_len = checked_cast_to_usize(filled_view.byte_length());
130        debug_assert!(filled_len <= dst.len());
131        // Re-construct the original Uint8Array with the new ArrayBuffer.
132        let new_buffer = Uint8Array::new_with_byte_offset_and_length(
133            &filled_view.buffer(),
134            buffer_offset,
135            buffer_len,
136        );
137        if result.get_done().unwrap_or_default() {
138            debug_assert_eq!(filled_len, 0);
139        } else {
140            filled_view.copy_to(&mut dst[0..filled_len]);
141        }
142        Ok((filled_len, Some(new_buffer)))
143    }
144
145    /// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
146    /// corresponding stream.
147    ///
148    /// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
149    /// the Streams standard allows the lock to be released even when there are still pending read
150    /// requests. Such requests will automatically become rejected, and this function will always
151    /// succeed.
152    ///
153    /// However, if the Streams implementation is not yet up-to-date with this change, then
154    /// releasing the lock while there are pending read requests will **panic**. For a non-panicking
155    /// variant, use [`try_release_lock`](Self::try_release_lock).
156    #[inline]
157    pub fn release_lock(mut self) {
158        self.release_lock_mut()
159    }
160
161    fn release_lock_mut(&mut self) {
162        self.as_raw().release_lock()
163    }
164
165    /// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
166    /// corresponding stream.
167    ///
168    /// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
169    /// the Streams standard allows the lock to be released even when there are still pending read
170    /// requests. Such requests will automatically become rejected, and this function will always
171    /// return `Ok(())`.
172    ///
173    /// However, if the Streams implementation is not yet up-to-date with this change, then
174    /// the lock cannot be released while there are pending read requests. Attempting to do so will
175    /// return an error and leave the reader locked to the stream.
176    #[inline]
177    pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
178        self.as_raw()
179            .unchecked_ref::<sys::ReadableStreamReaderExt>()
180            .try_release_lock()
181            .map_err(|err| (err, self))
182    }
183
184    /// Converts this `ReadableStreamBYOBReader` into an [`AsyncRead`].
185    ///
186    /// This is similar to [`ReadableStream.into_async_read`](ReadableStream::into_async_read),
187    /// except that after the returned `AsyncRead` is dropped, the original `ReadableStream` is
188    /// still usable. This allows reading only a few bytes from the `AsyncRead`, while still
189    /// allowing another reader to read the remaining bytes later on.
190    ///
191    /// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
192    #[inline]
193    pub fn into_async_read(self) -> IntoAsyncRead<'stream> {
194        IntoAsyncRead::new(self, false)
195    }
196}
197
198impl Drop for ReadableStreamBYOBReader<'_> {
199    fn drop(&mut self) {
200        self.release_lock_mut();
201    }
202}