1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
use std::marker::PhantomData; use wasm_bindgen::{throw_val, JsValue}; use wasm_bindgen_futures::JsFuture; use crate::util::promise_to_void_future; use super::{sys, IntoStream, ReadableStream}; /// A [`ReadableStreamDefaultReader`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader) /// that can be used to read chunks from a [`ReadableStream`](ReadableStream). /// /// This is returned by the [`get_reader`](ReadableStream::get_reader) method. /// /// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock). #[derive(Debug)] pub struct ReadableStreamDefaultReader<'stream> { raw: sys::ReadableStreamDefaultReader, _stream: PhantomData<&'stream mut ReadableStream>, } impl<'stream> ReadableStreamDefaultReader<'stream> { pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> { Ok(Self { raw: stream.as_raw().get_reader()?, _stream: PhantomData, }) } /// Acquires a reference to the underlying [JavaScript reader](sys::ReadableStreamDefaultReader). #[inline] pub fn as_raw(&self) -> &sys::ReadableStreamDefaultReader { &self.raw } /// Waits for the stream to become closed. /// /// This returns an error if the stream ever errors, or if the reader's lock is /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes /// closing. pub async fn closed(&self) -> Result<(), JsValue> { promise_to_void_future(self.as_raw().closed()).await } /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream, /// signaling a loss of interest in the stream by a consumer. /// /// Equivalent to [`ReadableStream.cancel`](ReadableStream::cancel). pub async fn cancel(&mut self) -> Result<(), JsValue> { promise_to_void_future(self.as_raw().cancel()).await } /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream, /// signaling a loss of interest in the stream by a consumer. /// /// Equivalent to [`ReadableStream.cancel_with_reason`](ReadableStream::cancel_with_reason). pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> { promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await } /// Reads the next chunk from the stream's internal queue. /// /// * If a next `chunk` becomes available, this returns `Ok(Some(chunk))`. /// * If the stream closes and no more chunks are available, this returns `Ok(None)`. /// * If the stream encounters an `error`, this returns `Err(error)`. pub async fn read(&mut self) -> Result<Option<JsValue>, JsValue> { let promise = self.as_raw().read(); let js_value = JsFuture::from(promise).await?; let result = sys::ReadableStreamDefaultReadResult::from(js_value); if result.is_done() { Ok(None) } else { Ok(Some(result.value())) } } /// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the /// corresponding stream. /// /// **Panics** if the reader still has a pending read request, i.e. if a future returned /// by [`read`](Self::read) is not yet ready. For a non-panicking variant, /// use [`try_release_lock`](Self::try_release_lock). #[inline] pub fn release_lock(mut self) { self.release_lock_mut() } fn release_lock_mut(&mut self) { self.as_raw() .release_lock() .unwrap_or_else(|error| throw_val(error.into())) } /// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the /// corresponding stream. /// /// The lock cannot be released while the reader still has a pending read request, i.e. /// if a future returned by [`read`](Self::read) is not yet ready. Attempting to do so will /// return an error and leave the reader locked to the stream. #[inline] pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> { self.as_raw().release_lock().map_err(|error| (error, self)) } /// Converts this `ReadableStreamDefaultReader` into a [`Stream`](futures::stream::Stream). /// /// This is similar to [`ReadableStream.into_stream`](ReadableStream::into_stream), /// except that after the returned `Stream` is dropped, the original `ReadableStream` is still /// usable. This allows reading only a few chunks from the `Stream`, while still allowing /// another reader to read the remaining chunks later on. #[inline] pub fn into_stream(self) -> IntoStream<'stream> { IntoStream::new(self) } } impl Drop for ReadableStreamDefaultReader<'_> { fn drop(&mut self) { self.release_lock_mut(); } }