wasm_streams/readable/mod.rs
1//! Bindings and conversions for
2//! [readable streams](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
3use futures_util::io::AsyncRead;
4use futures_util::Stream;
5use js_sys::Object;
6use wasm_bindgen::prelude::*;
7use wasm_bindgen::JsCast;
8
9pub use byob_reader::ReadableStreamBYOBReader;
10pub use default_reader::ReadableStreamDefaultReader;
11pub use into_async_read::IntoAsyncRead;
12pub use into_stream::IntoStream;
13use into_underlying_source::IntoUnderlyingSource;
14pub use pipe_options::PipeOptions;
15
16use crate::queuing_strategy::QueuingStrategy;
17use crate::readable::into_underlying_byte_source::IntoUnderlyingByteSource;
18use crate::util::promise_to_void_future;
19use crate::writable::WritableStream;
20
21mod byob_reader;
22mod default_reader;
23mod into_async_read;
24mod into_stream;
25mod into_underlying_byte_source;
26mod into_underlying_source;
27mod pipe_options;
28pub mod sys;
29
30/// A [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
31///
32/// `ReadableStream`s can be created from a [raw JavaScript stream](sys::ReadableStream) with
33/// [`from_raw`](Self::from_raw), or from a Rust [`Stream`] with [`from_stream`](Self::from_stream).
34///
35/// They can be converted into a [raw JavaScript stream](sys::ReadableStream) with
36/// [`into_raw`](Self::into_raw), or into a Rust [`Stream`] with [`into_stream`](Self::into_stream).
37///
38/// If the browser supports [readable byte streams](https://streams.spec.whatwg.org/#readable-byte-stream),
39/// then they can be created from a Rust [`AsyncRead`] with [`from_async_read`](Self::from_async_read),
40/// or converted into one with [`into_async_read`](Self::into_async_read).
41///
42/// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
43/// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
44#[derive(Debug)]
45pub struct ReadableStream {
46 raw: sys::ReadableStream,
47}
48
49impl ReadableStream {
50 /// Creates a new `ReadableStream` from a [JavaScript stream](sys::ReadableStream).
51 #[inline]
52 pub fn from_raw(raw: sys::ReadableStream) -> Self {
53 Self { raw }
54 }
55
56 /// Creates a new `ReadableStream` from a [`Stream`].
57 ///
58 /// Items and errors must be represented as raw [`JsValue`]s.
59 /// Use [`map`], [`map_ok`] and/or [`map_err`] to convert a stream's items to a `JsValue`
60 /// before passing it to this function.
61 ///
62 /// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
63 /// [`map`]: https://docs.rs/futures/0.3.30/futures/stream/trait.StreamExt.html#method.map
64 /// [`map_ok`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_ok
65 /// [`map_err`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_err
66 pub fn from_stream<St>(stream: St) -> Self
67 where
68 St: Stream<Item = Result<JsValue, JsValue>> + 'static,
69 {
70 let source = IntoUnderlyingSource::new(Box::new(stream));
71 // Set HWM to 0 to prevent the JS ReadableStream from buffering chunks in its queue,
72 // since the original Rust stream is better suited to handle that.
73 let strategy = QueuingStrategy::new(0.0);
74 let raw =
75 sys::ReadableStreamExt::new_with_into_underlying_source(source, strategy.into_raw())
76 .unchecked_into();
77 Self::from_raw(raw)
78 }
79
80 /// Creates a new `ReadableStream` from an [`AsyncRead`].
81 ///
82 /// This creates a readable byte stream whose `autoAllocateChunkSize` is `default_buffer_len`.
83 /// Therefore, if a default reader is used to consume the stream, the given `async_read`
84 /// will be [polled][AsyncRead::poll_read] with a buffer of this size. If a BYOB reader is used,
85 /// then it will be polled with a buffer of the same size as the BYOB read request instead.
86 ///
87 /// **Panics** if readable byte streams are not supported by the browser.
88 ///
89 /// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
90 /// [AsyncRead::poll_read]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html#tymethod.poll_read
91 // TODO Non-panicking variant?
92 pub fn from_async_read<R>(async_read: R, default_buffer_len: usize) -> Self
93 where
94 R: AsyncRead + 'static,
95 {
96 let source = IntoUnderlyingByteSource::new(Box::new(async_read), default_buffer_len);
97 let raw = sys::ReadableStreamExt::new_with_into_underlying_byte_source(source)
98 .expect_throw("readable byte streams not supported")
99 .unchecked_into();
100 Self::from_raw(raw)
101 }
102
103 /// Creates a new `ReadableStream` wrapping the provided [iterable] or [async iterable].
104 ///
105 /// This can be used to adapt various kinds of objects into a readable stream,
106 /// such as an [array], an [async generator] or a [Node.js readable stream][Readable].
107 ///
108 /// **Panics** if `ReadableStream.from()` is not supported by the browser,
109 /// or if the given object is not a valid iterable or async iterable.
110 /// For a non-panicking variant, use [`try_from`](Self::try_from).
111 ///
112 /// [iterable]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol
113 /// [async iterable]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols
114 /// [array]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array
115 /// [async generator]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncGenerator
116 /// [Readable]: https://nodejs.org/api/stream.html#class-streamreadable
117 pub fn from(async_iterable: Object) -> Self {
118 Self::try_from(async_iterable).unwrap_throw()
119 }
120
121 /// Try to create a new `ReadableStream` wrapping the provided [iterable] or [async iterable].
122 ///
123 /// This can be used to adapt various kinds of objects into a readable stream,
124 /// such as an [array], an [async generator] or a [Node.js readable stream][Readable].
125 ///
126 /// If `ReadableStream.from()` is not supported by the browser,
127 /// or if the given object is not a valid iterable or async iterable,
128 /// then this returns an error.
129 ///
130 /// [iterable]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol
131 /// [async iterable]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols
132 /// [array]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array
133 /// [async generator]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncGenerator
134 /// [Readable]: https://nodejs.org/api/stream.html#class-streamreadable
135 pub fn try_from(async_iterable: Object) -> Result<Self, js_sys::Error> {
136 let raw = sys::ReadableStreamExt::from_async_iterable(&async_iterable)?.unchecked_into();
137 Ok(Self::from_raw(raw))
138 }
139
140 /// Acquires a reference to the underlying [JavaScript stream](sys::ReadableStream).
141 #[inline]
142 pub fn as_raw(&self) -> &sys::ReadableStream {
143 &self.raw
144 }
145
146 /// Consumes this `ReadableStream`, returning the underlying [JavaScript stream](sys::ReadableStream).
147 #[inline]
148 pub fn into_raw(self) -> sys::ReadableStream {
149 self.raw
150 }
151
152 /// Returns `true` if the stream is [locked to a reader](https://streams.spec.whatwg.org/#lock).
153 #[inline]
154 pub fn is_locked(&self) -> bool {
155 self.as_raw().locked()
156 }
157
158 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
159 /// signaling a loss of interest in the stream by a consumer.
160 ///
161 /// If the stream is currently locked to a reader, then this returns an error.
162 pub async fn cancel(&mut self) -> Result<(), JsValue> {
163 promise_to_void_future(self.as_raw().cancel()).await
164 }
165
166 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
167 /// signaling a loss of interest in the stream by a consumer.
168 ///
169 /// The supplied `reason` will be given to the underlying source, which may or may not use it.
170 ///
171 /// If the stream is currently locked to a reader, then this returns an error.
172 pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
173 promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
174 }
175
176 /// Creates a [default reader](ReadableStreamDefaultReader) and
177 /// [locks](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
178 ///
179 /// While the stream is locked, no other reader can be acquired until this one is released.
180 ///
181 /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
182 /// use [`try_get_reader`](Self::try_get_reader).
183 #[inline]
184 pub fn get_reader(&mut self) -> ReadableStreamDefaultReader {
185 self.try_get_reader()
186 .expect_throw("already locked to a reader")
187 }
188
189 /// Try to create a [default reader](ReadableStreamDefaultReader) and
190 /// [lock](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
191 ///
192 /// While the stream is locked, no other reader can be acquired until this one is released.
193 ///
194 /// If the stream is already locked to a reader, then this returns an error.
195 pub fn try_get_reader(&mut self) -> Result<ReadableStreamDefaultReader, js_sys::Error> {
196 ReadableStreamDefaultReader::new(self)
197 }
198
199 /// Creates a [BYOB reader](ReadableStreamBYOBReader) and
200 /// [locks](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
201 ///
202 /// While the stream is locked, no other reader can be acquired until this one is released.
203 ///
204 /// **Panics** if the stream is already locked to a reader, or if this stream is not a readable
205 /// byte stream. For a non-panicking variant, use [`try_get_reader`](Self::try_get_reader).
206 pub fn get_byob_reader(&mut self) -> ReadableStreamBYOBReader {
207 self.try_get_byob_reader()
208 .expect_throw("already locked to a reader, or not a readable byte stream")
209 }
210
211 /// Try to create a [BYOB reader](ReadableStreamBYOBReader) and
212 /// [lock](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
213 ///
214 /// While the stream is locked, no other reader can be acquired until this one is released.
215 ///
216 /// If the stream is already locked to a reader, then this returns an error.
217 pub fn try_get_byob_reader(&mut self) -> Result<ReadableStreamBYOBReader, js_sys::Error> {
218 ReadableStreamBYOBReader::new(self)
219 }
220
221 /// [Pipes](https://streams.spec.whatwg.org/#piping) this readable stream to a given
222 /// writable stream.
223 ///
224 /// Piping a stream will [lock](https://streams.spec.whatwg.org/#lock) it for the duration
225 /// of the pipe, preventing any other consumer from acquiring a reader.
226 ///
227 /// This returns `()` if the pipe completes successfully, or `Err(error)` if any `error`
228 /// was encountered during the process.
229 pub async fn pipe_to<'a>(&'a mut self, dest: &'a mut WritableStream) -> Result<(), JsValue> {
230 self.pipe_to_with_options(dest, &PipeOptions::default())
231 .await
232 }
233
234 /// [Pipes](https://streams.spec.whatwg.org/#piping) this readable stream to a given
235 /// writable stream.
236 ///
237 /// Piping a stream will [lock](https://streams.spec.whatwg.org/#lock) it for the duration
238 /// of the pipe, preventing any other consumer from acquiring a reader.
239 ///
240 /// Errors and closures of the source and destination streams propagate as follows:
241 /// * An error in the source readable stream will [abort](https://streams.spec.whatwg.org/#abort-a-writable-stream)
242 /// the destination writable stream, unless [`options.prevent_abort`](PipeOptions::prevent_abort)
243 /// is `true`.
244 /// * An error in the destination writable stream will [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream)
245 /// the source readable stream, unless [`options.prevent_cancel`](PipeOptions::prevent_cancel)
246 /// is `true`.
247 /// * When the source readable stream closes, the destination writable stream will be closed,
248 /// unless [`options.prevent_close`](PipeOptions::prevent_close) is `true`.
249 /// * If the destination writable stream starts out closed or closing, the source readable stream
250 /// will be [canceled](https://streams.spec.whatwg.org/#cancel-a-readable-stream),
251 /// unless unless [`options.prevent_cancel`](PipeOptions::prevent_cancel) is `true`.
252 ///
253 /// This returns `()` if the pipe completes successfully, or `Err(error)` if any `error`
254 /// was encountered during the process.
255 pub async fn pipe_to_with_options<'a>(
256 &'a mut self,
257 dest: &'a mut WritableStream,
258 options: &PipeOptions,
259 ) -> Result<(), JsValue> {
260 let promise = self
261 .as_raw()
262 .pipe_to_with_options(dest.as_raw(), &options.clone().into_raw());
263 promise_to_void_future(promise).await
264 }
265
266 /// [Tees](https://streams.spec.whatwg.org/#tee-a-readable-stream) this readable stream,
267 /// returning the two resulting branches as new [`ReadableStream`] instances.
268 ///
269 /// Teeing a stream will [lock](https://streams.spec.whatwg.org/#lock) it, preventing any other
270 /// consumer from acquiring a reader.
271 /// To [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
272 /// cancel both of the resulting branches; a composite cancellation reason will then be
273 /// propagated to the stream's underlying source.
274 ///
275 /// Note that the chunks seen in each branch will be the same object.
276 /// If the chunks are not immutable, this could allow interference between the two branches.
277 ///
278 /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
279 /// use [`try_tee`](Self::try_tee).
280 pub fn tee(self) -> (ReadableStream, ReadableStream) {
281 self.try_tee().expect_throw("already locked to a reader")
282 }
283
284 /// Tries to [tee](https://streams.spec.whatwg.org/#tee-a-readable-stream) this readable stream,
285 /// returning the two resulting branches as new [`ReadableStream`] instances.
286 ///
287 /// Teeing a stream will [lock](https://streams.spec.whatwg.org/#lock) it, preventing any other
288 /// consumer from acquiring a reader.
289 /// To [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
290 /// cancel both of the resulting branches; a composite cancellation reason will then be
291 /// propagated to the stream's underlying source.
292 ///
293 /// Note that the chunks seen in each branch will be the same object.
294 /// If the chunks are not immutable, this could allow interference between the two branches.
295 ///
296 /// If the stream is already locked to a reader, then this returns an error
297 /// along with the original `ReadableStream`.
298 pub fn try_tee(self) -> Result<(ReadableStream, ReadableStream), (js_sys::Error, Self)> {
299 let branches = self
300 .as_raw()
301 .unchecked_ref::<sys::ReadableStreamExt>()
302 .try_tee()
303 .map_err(|err| (err, self))?;
304 debug_assert_eq!(branches.length(), 2);
305 let (left, right) = (branches.get(0), branches.get(1));
306 Ok((
307 Self::from_raw(left.unchecked_into()),
308 Self::from_raw(right.unchecked_into()),
309 ))
310 }
311
312 /// Converts this `ReadableStream` into a [`Stream`].
313 ///
314 /// Items and errors are represented by their raw [`JsValue`].
315 /// Use [`map`], [`map_ok`] and/or [`map_err`] on the returned stream to convert them to a more
316 /// appropriate type.
317 ///
318 /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
319 /// use [`try_into_stream`](Self::try_into_stream).
320 ///
321 /// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
322 /// [`map`]: https://docs.rs/futures/0.3.30/futures/stream/trait.StreamExt.html#method.map
323 /// [`map_ok`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_ok
324 /// [`map_err`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_err
325 #[inline]
326 pub fn into_stream(self) -> IntoStream<'static> {
327 self.try_into_stream()
328 .expect_throw("already locked to a reader")
329 }
330
331 /// Try to convert this `ReadableStream` into a [`Stream`].
332 ///
333 /// Items and errors are represented by their raw [`JsValue`].
334 /// Use [`map`], [`map_ok`] and/or [`map_err`] on the returned stream to convert them to a more
335 /// appropriate type.
336 ///
337 /// If the stream is already locked to a reader, then this returns an error
338 /// along with the original `ReadableStream`.
339 ///
340 /// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
341 /// [`map`]: https://docs.rs/futures/0.3.30/futures/stream/trait.StreamExt.html#method.map
342 /// [`map_ok`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_ok
343 /// [`map_err`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_err
344 pub fn try_into_stream(mut self) -> Result<IntoStream<'static>, (js_sys::Error, Self)> {
345 let reader = ReadableStreamDefaultReader::new(&mut self).map_err(|err| (err, self))?;
346 Ok(IntoStream::new(reader, true))
347 }
348
349 /// Converts this `ReadableStream` into an [`AsyncRead`].
350 ///
351 /// **Panics** if the stream is already locked to a reader, or if this stream is not a readable
352 /// byte stream. For a non-panicking variant, use [`try_into_async_read`](Self::try_into_async_read).
353 ///
354 /// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
355 #[inline]
356 pub fn into_async_read(self) -> IntoAsyncRead<'static> {
357 self.try_into_async_read()
358 .expect_throw("already locked to a reader, or not a readable byte stream")
359 }
360
361 /// Try to convert this `ReadableStream` into an [`AsyncRead`].
362 ///
363 /// If the stream is already locked to a reader, or if this stream is not a readable byte
364 /// stream, then this returns an error along with the original `ReadableStream`.
365 ///
366 /// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
367 pub fn try_into_async_read(mut self) -> Result<IntoAsyncRead<'static>, (js_sys::Error, Self)> {
368 let reader = ReadableStreamBYOBReader::new(&mut self).map_err(|err| (err, self))?;
369 Ok(IntoAsyncRead::new(reader, true))
370 }
371}
372
373impl<St> From<St> for ReadableStream
374where
375 St: Stream<Item = Result<JsValue, JsValue>> + 'static,
376{
377 /// Equivalent to [`from_stream`](Self::from_stream).
378 #[inline]
379 fn from(stream: St) -> Self {
380 Self::from_stream(stream)
381 }
382}