wasm_streams/readable/
mod.rs

1//! Bindings and conversions for
2//! [readable streams](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
3use futures_util::io::AsyncRead;
4use futures_util::Stream;
5use js_sys::Object;
6use wasm_bindgen::prelude::*;
7use wasm_bindgen::JsCast;
8
9pub use byob_reader::ReadableStreamBYOBReader;
10pub use default_reader::ReadableStreamDefaultReader;
11pub use into_async_read::IntoAsyncRead;
12pub use into_stream::IntoStream;
13use into_underlying_source::IntoUnderlyingSource;
14pub use pipe_options::PipeOptions;
15
16use crate::queuing_strategy::QueuingStrategy;
17use crate::readable::into_underlying_byte_source::IntoUnderlyingByteSource;
18use crate::util::promise_to_void_future;
19use crate::writable::WritableStream;
20
21mod byob_reader;
22mod default_reader;
23mod into_async_read;
24mod into_stream;
25mod into_underlying_byte_source;
26mod into_underlying_source;
27mod pipe_options;
28pub mod sys;
29
30/// A [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
31///
32/// `ReadableStream`s can be created from a [raw JavaScript stream](sys::ReadableStream) with
33/// [`from_raw`](Self::from_raw), or from a Rust [`Stream`] with [`from_stream`](Self::from_stream).
34///
35/// They can be converted into a [raw JavaScript stream](sys::ReadableStream) with
36/// [`into_raw`](Self::into_raw), or into a Rust [`Stream`] with [`into_stream`](Self::into_stream).
37///
38/// If the browser supports [readable byte streams](https://streams.spec.whatwg.org/#readable-byte-stream),
39/// then they can be created from a Rust [`AsyncRead`] with [`from_async_read`](Self::from_async_read),
40/// or converted into one with [`into_async_read`](Self::into_async_read).
41///
42/// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
43/// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
44#[derive(Debug)]
45pub struct ReadableStream {
46    raw: sys::ReadableStream,
47}
48
49impl ReadableStream {
50    /// Creates a new `ReadableStream` from a [JavaScript stream](sys::ReadableStream).
51    #[inline]
52    pub fn from_raw(raw: sys::ReadableStream) -> Self {
53        Self { raw }
54    }
55
56    /// Creates a new `ReadableStream` from a [`Stream`].
57    ///
58    /// Items and errors must be represented as raw [`JsValue`]s.
59    /// Use [`map`], [`map_ok`] and/or [`map_err`] to convert a stream's items to a `JsValue`
60    /// before passing it to this function.
61    ///
62    /// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
63    /// [`map`]: https://docs.rs/futures/0.3.30/futures/stream/trait.StreamExt.html#method.map
64    /// [`map_ok`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_ok
65    /// [`map_err`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_err
66    pub fn from_stream<St>(stream: St) -> Self
67    where
68        St: Stream<Item = Result<JsValue, JsValue>> + 'static,
69    {
70        let source = IntoUnderlyingSource::new(Box::new(stream));
71        // Set HWM to 0 to prevent the JS ReadableStream from buffering chunks in its queue,
72        // since the original Rust stream is better suited to handle that.
73        let strategy = QueuingStrategy::new(0.0);
74        let raw =
75            sys::ReadableStreamExt::new_with_into_underlying_source(source, strategy.into_raw())
76                .unchecked_into();
77        Self::from_raw(raw)
78    }
79
80    /// Creates a new `ReadableStream` from an [`AsyncRead`].
81    ///
82    /// This creates a readable byte stream whose `autoAllocateChunkSize` is `default_buffer_len`.
83    /// Therefore, if a default reader is used to consume the stream, the given `async_read`
84    /// will be [polled][AsyncRead::poll_read] with a buffer of this size. If a BYOB reader is used,
85    /// then it will be polled with a buffer of the same size as the BYOB read request instead.
86    ///
87    /// **Panics** if readable byte streams are not supported by the browser.
88    ///
89    /// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
90    /// [AsyncRead::poll_read]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html#tymethod.poll_read
91    // TODO Non-panicking variant?
92    pub fn from_async_read<R>(async_read: R, default_buffer_len: usize) -> Self
93    where
94        R: AsyncRead + 'static,
95    {
96        let source = IntoUnderlyingByteSource::new(Box::new(async_read), default_buffer_len);
97        let raw = sys::ReadableStreamExt::new_with_into_underlying_byte_source(source)
98            .expect_throw("readable byte streams not supported")
99            .unchecked_into();
100        Self::from_raw(raw)
101    }
102
103    /// Creates a new `ReadableStream` wrapping the provided [iterable] or [async iterable].
104    ///
105    /// This can be used to adapt various kinds of objects into a readable stream,
106    /// such as an [array], an [async generator] or a [Node.js readable stream][Readable].
107    ///
108    /// **Panics** if `ReadableStream.from()` is not supported by the browser,
109    /// or if the given object is not a valid iterable or async iterable.
110    /// For a non-panicking variant, use [`try_from`](Self::try_from).
111    ///
112    /// [iterable]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol
113    /// [async iterable]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols
114    /// [array]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array
115    /// [async generator]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncGenerator
116    /// [Readable]: https://nodejs.org/api/stream.html#class-streamreadable
117    pub fn from(async_iterable: Object) -> Self {
118        Self::try_from(async_iterable).unwrap_throw()
119    }
120
121    /// Try to create a new `ReadableStream` wrapping the provided [iterable] or [async iterable].
122    ///
123    /// This can be used to adapt various kinds of objects into a readable stream,
124    /// such as an [array], an [async generator] or a [Node.js readable stream][Readable].
125    ///
126    /// If `ReadableStream.from()` is not supported by the browser,
127    /// or if the given object is not a valid iterable or async iterable,
128    /// then this returns an error.
129    ///
130    /// [iterable]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol
131    /// [async iterable]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols
132    /// [array]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array
133    /// [async generator]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncGenerator
134    /// [Readable]: https://nodejs.org/api/stream.html#class-streamreadable
135    pub fn try_from(async_iterable: Object) -> Result<Self, js_sys::Error> {
136        let raw = sys::ReadableStreamExt::from_async_iterable(&async_iterable)?.unchecked_into();
137        Ok(Self::from_raw(raw))
138    }
139
140    /// Acquires a reference to the underlying [JavaScript stream](sys::ReadableStream).
141    #[inline]
142    pub fn as_raw(&self) -> &sys::ReadableStream {
143        &self.raw
144    }
145
146    /// Consumes this `ReadableStream`, returning the underlying [JavaScript stream](sys::ReadableStream).
147    #[inline]
148    pub fn into_raw(self) -> sys::ReadableStream {
149        self.raw
150    }
151
152    /// Returns `true` if the stream is [locked to a reader](https://streams.spec.whatwg.org/#lock).
153    #[inline]
154    pub fn is_locked(&self) -> bool {
155        self.as_raw().locked()
156    }
157
158    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
159    /// signaling a loss of interest in the stream by a consumer.
160    ///
161    /// If the stream is currently locked to a reader, then this returns an error.
162    pub async fn cancel(&mut self) -> Result<(), JsValue> {
163        promise_to_void_future(self.as_raw().cancel()).await
164    }
165
166    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
167    /// signaling a loss of interest in the stream by a consumer.
168    ///
169    /// The supplied `reason` will be given to the underlying source, which may or may not use it.
170    ///
171    /// If the stream is currently locked to a reader, then this returns an error.
172    pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
173        promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
174    }
175
176    /// Creates a [default reader](ReadableStreamDefaultReader) and
177    /// [locks](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
178    ///
179    /// While the stream is locked, no other reader can be acquired until this one is released.
180    ///
181    /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
182    /// use [`try_get_reader`](Self::try_get_reader).
183    #[inline]
184    pub fn get_reader(&mut self) -> ReadableStreamDefaultReader {
185        self.try_get_reader()
186            .expect_throw("already locked to a reader")
187    }
188
189    /// Try to create a [default reader](ReadableStreamDefaultReader) and
190    /// [lock](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
191    ///
192    /// While the stream is locked, no other reader can be acquired until this one is released.
193    ///
194    /// If the stream is already locked to a reader, then this returns an error.
195    pub fn try_get_reader(&mut self) -> Result<ReadableStreamDefaultReader, js_sys::Error> {
196        ReadableStreamDefaultReader::new(self)
197    }
198
199    /// Creates a [BYOB reader](ReadableStreamBYOBReader) and
200    /// [locks](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
201    ///
202    /// While the stream is locked, no other reader can be acquired until this one is released.
203    ///
204    /// **Panics** if the stream is already locked to a reader, or if this stream is not a readable
205    /// byte stream. For a non-panicking variant, use [`try_get_reader`](Self::try_get_reader).
206    pub fn get_byob_reader(&mut self) -> ReadableStreamBYOBReader {
207        self.try_get_byob_reader()
208            .expect_throw("already locked to a reader, or not a readable byte stream")
209    }
210
211    /// Try to create a [BYOB reader](ReadableStreamBYOBReader) and
212    /// [lock](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
213    ///
214    /// While the stream is locked, no other reader can be acquired until this one is released.
215    ///
216    /// If the stream is already locked to a reader, then this returns an error.
217    pub fn try_get_byob_reader(&mut self) -> Result<ReadableStreamBYOBReader, js_sys::Error> {
218        ReadableStreamBYOBReader::new(self)
219    }
220
221    /// [Pipes](https://streams.spec.whatwg.org/#piping) this readable stream to a given
222    /// writable stream.
223    ///
224    /// Piping a stream will [lock](https://streams.spec.whatwg.org/#lock) it for the duration
225    /// of the pipe, preventing any other consumer from acquiring a reader.
226    ///
227    /// This returns `()` if the pipe completes successfully, or `Err(error)` if any `error`
228    /// was encountered during the process.
229    pub async fn pipe_to<'a>(&'a mut self, dest: &'a mut WritableStream) -> Result<(), JsValue> {
230        self.pipe_to_with_options(dest, &PipeOptions::default())
231            .await
232    }
233
234    /// [Pipes](https://streams.spec.whatwg.org/#piping) this readable stream to a given
235    /// writable stream.
236    ///
237    /// Piping a stream will [lock](https://streams.spec.whatwg.org/#lock) it for the duration
238    /// of the pipe, preventing any other consumer from acquiring a reader.
239    ///
240    /// Errors and closures of the source and destination streams propagate as follows:
241    /// * An error in the source readable stream will [abort](https://streams.spec.whatwg.org/#abort-a-writable-stream)
242    ///   the destination writable stream, unless [`options.prevent_abort`](PipeOptions::prevent_abort)
243    ///   is `true`.
244    /// * An error in the destination writable stream will [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream)
245    ///   the source readable stream, unless [`options.prevent_cancel`](PipeOptions::prevent_cancel)
246    ///   is `true`.
247    /// * When the source readable stream closes, the destination writable stream will be closed,
248    ///   unless [`options.prevent_close`](PipeOptions::prevent_close) is `true`.
249    /// * If the destination writable stream starts out closed or closing, the source readable stream
250    ///   will be [canceled](https://streams.spec.whatwg.org/#cancel-a-readable-stream),
251    ///   unless unless [`options.prevent_cancel`](PipeOptions::prevent_cancel) is `true`.
252    ///
253    /// This returns `()` if the pipe completes successfully, or `Err(error)` if any `error`
254    /// was encountered during the process.
255    pub async fn pipe_to_with_options<'a>(
256        &'a mut self,
257        dest: &'a mut WritableStream,
258        options: &PipeOptions,
259    ) -> Result<(), JsValue> {
260        let promise = self
261            .as_raw()
262            .pipe_to_with_options(dest.as_raw(), &options.clone().into_raw());
263        promise_to_void_future(promise).await
264    }
265
266    /// [Tees](https://streams.spec.whatwg.org/#tee-a-readable-stream) this readable stream,
267    /// returning the two resulting branches as new [`ReadableStream`] instances.
268    ///
269    /// Teeing a stream will [lock](https://streams.spec.whatwg.org/#lock) it, preventing any other
270    /// consumer from acquiring a reader.
271    /// To [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
272    /// cancel both of the resulting branches; a composite cancellation reason will then be
273    /// propagated to the stream's underlying source.
274    ///
275    /// Note that the chunks seen in each branch will be the same object.
276    /// If the chunks are not immutable, this could allow interference between the two branches.
277    ///
278    /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
279    /// use [`try_tee`](Self::try_tee).
280    pub fn tee(self) -> (ReadableStream, ReadableStream) {
281        self.try_tee().expect_throw("already locked to a reader")
282    }
283
284    /// Tries to [tee](https://streams.spec.whatwg.org/#tee-a-readable-stream) this readable stream,
285    /// returning the two resulting branches as new [`ReadableStream`] instances.
286    ///
287    /// Teeing a stream will [lock](https://streams.spec.whatwg.org/#lock) it, preventing any other
288    /// consumer from acquiring a reader.
289    /// To [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
290    /// cancel both of the resulting branches; a composite cancellation reason will then be
291    /// propagated to the stream's underlying source.
292    ///
293    /// Note that the chunks seen in each branch will be the same object.
294    /// If the chunks are not immutable, this could allow interference between the two branches.
295    ///
296    /// If the stream is already locked to a reader, then this returns an error
297    /// along with the original `ReadableStream`.
298    pub fn try_tee(self) -> Result<(ReadableStream, ReadableStream), (js_sys::Error, Self)> {
299        let branches = self
300            .as_raw()
301            .unchecked_ref::<sys::ReadableStreamExt>()
302            .try_tee()
303            .map_err(|err| (err, self))?;
304        debug_assert_eq!(branches.length(), 2);
305        let (left, right) = (branches.get(0), branches.get(1));
306        Ok((
307            Self::from_raw(left.unchecked_into()),
308            Self::from_raw(right.unchecked_into()),
309        ))
310    }
311
312    /// Converts this `ReadableStream` into a [`Stream`].
313    ///
314    /// Items and errors are represented by their raw [`JsValue`].
315    /// Use [`map`], [`map_ok`] and/or [`map_err`] on the returned stream to convert them to a more
316    /// appropriate type.
317    ///
318    /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
319    /// use [`try_into_stream`](Self::try_into_stream).
320    ///
321    /// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
322    /// [`map`]: https://docs.rs/futures/0.3.30/futures/stream/trait.StreamExt.html#method.map
323    /// [`map_ok`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_ok
324    /// [`map_err`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_err
325    #[inline]
326    pub fn into_stream(self) -> IntoStream<'static> {
327        self.try_into_stream()
328            .expect_throw("already locked to a reader")
329    }
330
331    /// Try to convert this `ReadableStream` into a [`Stream`].
332    ///
333    /// Items and errors are represented by their raw [`JsValue`].
334    /// Use [`map`], [`map_ok`] and/or [`map_err`] on the returned stream to convert them to a more
335    /// appropriate type.
336    ///
337    /// If the stream is already locked to a reader, then this returns an error
338    /// along with the original `ReadableStream`.
339    ///
340    /// [`Stream`]: https://docs.rs/futures/0.3.30/futures/stream/trait.Stream.html
341    /// [`map`]: https://docs.rs/futures/0.3.30/futures/stream/trait.StreamExt.html#method.map
342    /// [`map_ok`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_ok
343    /// [`map_err`]: https://docs.rs/futures/0.3.30/futures/stream/trait.TryStreamExt.html#method.map_err
344    pub fn try_into_stream(mut self) -> Result<IntoStream<'static>, (js_sys::Error, Self)> {
345        let reader = ReadableStreamDefaultReader::new(&mut self).map_err(|err| (err, self))?;
346        Ok(IntoStream::new(reader, true))
347    }
348
349    /// Converts this `ReadableStream` into an [`AsyncRead`].
350    ///
351    /// **Panics** if the stream is already locked to a reader, or if this stream is not a readable
352    /// byte stream. For a non-panicking variant, use [`try_into_async_read`](Self::try_into_async_read).
353    ///
354    /// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
355    #[inline]
356    pub fn into_async_read(self) -> IntoAsyncRead<'static> {
357        self.try_into_async_read()
358            .expect_throw("already locked to a reader, or not a readable byte stream")
359    }
360
361    /// Try to convert this `ReadableStream` into an [`AsyncRead`].
362    ///
363    /// If the stream is already locked to a reader, or if this stream is not a readable byte
364    /// stream, then this returns an error along with the original `ReadableStream`.
365    ///
366    /// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
367    pub fn try_into_async_read(mut self) -> Result<IntoAsyncRead<'static>, (js_sys::Error, Self)> {
368        let reader = ReadableStreamBYOBReader::new(&mut self).map_err(|err| (err, self))?;
369        Ok(IntoAsyncRead::new(reader, true))
370    }
371}
372
373impl<St> From<St> for ReadableStream
374where
375    St: Stream<Item = Result<JsValue, JsValue>> + 'static,
376{
377    /// Equivalent to [`from_stream`](Self::from_stream).
378    #[inline]
379    fn from(stream: St) -> Self {
380        Self::from_stream(stream)
381    }
382}