wasm_streams/readable/byob_reader.rs
1use std::marker::PhantomData;
2
3use js_sys::{Object, Uint8Array};
4use wasm_bindgen::{JsCast, JsValue};
5use wasm_bindgen_futures::JsFuture;
6
7use crate::util::{checked_cast_to_usize, clamp_to_u32, promise_to_void_future};
8
9use super::{sys, IntoAsyncRead, ReadableStream};
10
11/// A [`ReadableStreamBYOBReader`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader)
12/// that can be used to read chunks from a [`ReadableStream`](ReadableStream).
13///
14/// This is returned by the [`get_byob_reader`](ReadableStream::get_byob_reader) method.
15///
16/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
17#[derive(Debug)]
18pub struct ReadableStreamBYOBReader<'stream> {
19 raw: sys::ReadableStreamBYOBReader,
20 _stream: PhantomData<&'stream mut ReadableStream>,
21}
22
23impl<'stream> ReadableStreamBYOBReader<'stream> {
24 pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
25 let reader_options = sys::ReadableStreamGetReaderOptions::new();
26 reader_options.set_mode(sys::ReadableStreamReaderMode::Byob);
27 Ok(Self {
28 raw: stream
29 .as_raw()
30 .unchecked_ref::<sys::ReadableStreamExt>()
31 .try_get_reader_with_options(&reader_options)?
32 .unchecked_into(),
33 _stream: PhantomData,
34 })
35 }
36
37 /// Acquires a reference to the underlying [JavaScript reader](sys::ReadableStreamBYOBReader).
38 #[inline]
39 pub fn as_raw(&self) -> &sys::ReadableStreamBYOBReader {
40 &self.raw
41 }
42
43 /// Waits for the stream to become closed.
44 ///
45 /// This returns an error if the stream ever errors, or if the reader's lock is
46 /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
47 /// closing.
48 pub async fn closed(&self) -> Result<(), JsValue> {
49 promise_to_void_future(self.as_raw().closed()).await
50 }
51
52 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
53 /// signaling a loss of interest in the stream by a consumer.
54 ///
55 /// Equivalent to [`ReadableStream.cancel`](ReadableStream::cancel).
56 pub async fn cancel(&mut self) -> Result<(), JsValue> {
57 promise_to_void_future(self.as_raw().cancel()).await
58 }
59
60 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
61 /// signaling a loss of interest in the stream by a consumer.
62 ///
63 /// Equivalent to [`ReadableStream.cancel_with_reason`](ReadableStream::cancel_with_reason).
64 pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
65 promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
66 }
67
68 /// Reads the next chunk from the stream's internal queue into `dst`,
69 /// and returns the number of bytes read.
70 ///
71 /// * If some bytes were read into `dst`, this returns `Ok(bytes_read)`.
72 /// * If the stream closes and no more bytes are available, this returns `Ok(0)`.
73 /// * If the stream cancels, this returns `Ok(0)`.
74 /// * If the stream encounters an `error`, this returns `Err(error)`.
75 ///
76 /// This always allocated a new temporary `Uint8Array` with the same size as `dst` to hold
77 /// the result before copying to `dst`. We cannot pass a view on the backing WebAssembly memory
78 /// directly, because:
79 /// * `reader.read(view)` needs to transfer `view.buffer`, but `WebAssembly.Memory` buffers
80 /// are non-transferable.
81 /// * `view.buffer` can be invalidated if the WebAssembly memory grows while `read(view)`
82 /// is still in progress.
83 ///
84 /// Therefore, it is necessary to use a separate buffer living in the JavaScript heap.
85 /// To avoid repeated allocations for repeated reads,
86 /// use [`read_with_buffer`](Self::read_with_buffer).
87 pub async fn read(&mut self, dst: &mut [u8]) -> Result<usize, JsValue> {
88 let buffer = Uint8Array::new_with_length(clamp_to_u32(dst.len()));
89 let (bytes_read, _) = self.read_with_buffer(dst, buffer).await?;
90 Ok(bytes_read)
91 }
92
93 /// Reads the next chunk from the stream's internal queue into `dst`,
94 /// and returns the number of bytes read.
95 ///
96 /// The given `buffer` is used to store the bytes before they are copied to `dst`.
97 /// This buffer is returned back together with the result, so it can be re-used for subsequent
98 /// reads without extra allocations. Note that the underlying `ArrayBuffer` is transferred
99 /// in the process, so any other views on the original buffer will become unusable.
100 ///
101 /// * If some bytes were read into `dst`, this returns `Ok((bytes_read, Some(buffer)))`.
102 /// * If the stream closes and no more bytes are available, this returns `Ok((0, Some(buffer)))`.
103 /// * If the stream cancels, this returns `Ok((0, None))`. In this case, the given buffer is
104 /// not returned.
105 /// * If the stream encounters an `error`, this returns `Err(error)`.
106 pub async fn read_with_buffer(
107 &mut self,
108 dst: &mut [u8],
109 buffer: Uint8Array,
110 ) -> Result<(usize, Option<Uint8Array>), JsValue> {
111 // Save the original buffer's byte offset and length.
112 let buffer_offset = buffer.byte_offset();
113 let buffer_len = buffer.byte_length();
114 // Limit view to destination slice's length.
115 let dst_len = clamp_to_u32(dst.len());
116 let view = buffer.subarray(0, dst_len).unchecked_into::<Object>();
117 // Read into view. This transfers `buffer.buffer()`.
118 let promise = self.as_raw().read_with_array_buffer_view(&view);
119 let js_result = JsFuture::from(promise).await?;
120 let result = sys::ReadableStreamReadResult::from(js_result);
121 let js_value = result.get_value();
122 let filled_view = if js_value.is_undefined() {
123 // No new view was returned. The stream must have been canceled.
124 assert!(result.get_done().unwrap_or_default());
125 return Ok((0, None));
126 } else {
127 js_value.unchecked_into::<Uint8Array>()
128 };
129 let filled_len = checked_cast_to_usize(filled_view.byte_length());
130 debug_assert!(filled_len <= dst.len());
131 // Re-construct the original Uint8Array with the new ArrayBuffer.
132 let new_buffer = Uint8Array::new_with_byte_offset_and_length(
133 &filled_view.buffer(),
134 buffer_offset,
135 buffer_len,
136 );
137 if result.get_done().unwrap_or_default() {
138 debug_assert_eq!(filled_len, 0);
139 } else {
140 filled_view.copy_to(&mut dst[0..filled_len]);
141 }
142 Ok((filled_len, Some(new_buffer)))
143 }
144
145 /// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
146 /// corresponding stream.
147 ///
148 /// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
149 /// the Streams standard allows the lock to be released even when there are still pending read
150 /// requests. Such requests will automatically become rejected, and this function will always
151 /// succeed.
152 ///
153 /// However, if the Streams implementation is not yet up-to-date with this change, then
154 /// releasing the lock while there are pending read requests will **panic**. For a non-panicking
155 /// variant, use [`try_release_lock`](Self::try_release_lock).
156 #[inline]
157 pub fn release_lock(mut self) {
158 self.release_lock_mut()
159 }
160
161 fn release_lock_mut(&mut self) {
162 self.as_raw().release_lock()
163 }
164
165 /// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
166 /// corresponding stream.
167 ///
168 /// [As of January 2022](https://github.com/whatwg/streams/commit/d5f92d9f17306d31ba6b27424d23d58e89bf64a5),
169 /// the Streams standard allows the lock to be released even when there are still pending read
170 /// requests. Such requests will automatically become rejected, and this function will always
171 /// return `Ok(())`.
172 ///
173 /// However, if the Streams implementation is not yet up-to-date with this change, then
174 /// the lock cannot be released while there are pending read requests. Attempting to do so will
175 /// return an error and leave the reader locked to the stream.
176 #[inline]
177 pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
178 self.as_raw()
179 .unchecked_ref::<sys::ReadableStreamReaderExt>()
180 .try_release_lock()
181 .map_err(|err| (err, self))
182 }
183
184 /// Converts this `ReadableStreamBYOBReader` into an [`AsyncRead`].
185 ///
186 /// This is similar to [`ReadableStream.into_async_read`](ReadableStream::into_async_read),
187 /// except that after the returned `AsyncRead` is dropped, the original `ReadableStream` is
188 /// still usable. This allows reading only a few bytes from the `AsyncRead`, while still
189 /// allowing another reader to read the remaining bytes later on.
190 ///
191 /// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
192 #[inline]
193 pub fn into_async_read(self) -> IntoAsyncRead<'stream> {
194 IntoAsyncRead::new(self, false)
195 }
196}
197
198impl Drop for ReadableStreamBYOBReader<'_> {
199 fn drop(&mut self) {
200 self.release_lock_mut();
201 }
202}