wasm_streams/readable/
into_async_read.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::io::{AsyncRead, Error};
5use futures_util::ready;
6use futures_util::FutureExt;
7use js_sys::{Object, Uint8Array};
8use wasm_bindgen::prelude::*;
9use wasm_bindgen::JsCast;
10use wasm_bindgen_futures::JsFuture;
11
12use crate::util::{checked_cast_to_usize, clamp_to_u32, js_to_io_error};
13
14use super::sys::ReadableStreamReadResult;
15use super::ReadableStreamBYOBReader;
16
17/// An [`AsyncRead`] for the [`into_async_read`](super::ReadableStream::into_async_read) method.
18///
19/// This `AsyncRead` holds a reader, and therefore locks the [`ReadableStream`](super::ReadableStream).
20/// When this `AsyncRead` is dropped, it also drops its reader which in turn
21/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
22///
23/// [`AsyncRead`]: https://docs.rs/futures/0.3.30/futures/io/trait.AsyncRead.html
24#[must_use = "readers do nothing unless polled"]
25#[derive(Debug)]
26pub struct IntoAsyncRead<'reader> {
27    reader: Option<ReadableStreamBYOBReader<'reader>>,
28    buffer: Option<Uint8Array>,
29    fut: Option<JsFuture>,
30    cancel_on_drop: bool,
31}
32
33impl<'reader> IntoAsyncRead<'reader> {
34    #[inline]
35    pub(super) fn new(reader: ReadableStreamBYOBReader, cancel_on_drop: bool) -> IntoAsyncRead {
36        IntoAsyncRead {
37            reader: Some(reader),
38            buffer: None,
39            fut: None,
40            cancel_on_drop,
41        }
42    }
43
44    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
45    /// signaling a loss of interest in the stream by a consumer.
46    pub async fn cancel(mut self) -> Result<(), JsValue> {
47        match self.reader.take() {
48            Some(mut reader) => reader.cancel().await,
49            None => Ok(()),
50        }
51    }
52
53    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
54    /// signaling a loss of interest in the stream by a consumer.
55    pub async fn cancel_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
56        match self.reader.take() {
57            Some(mut reader) => reader.cancel_with_reason(reason).await,
58            None => Ok(()),
59        }
60    }
61
62    #[inline]
63    fn discard_reader(mut self: Pin<&mut Self>) {
64        self.reader = None;
65        self.buffer = None;
66    }
67}
68
69impl<'reader> AsyncRead for IntoAsyncRead<'reader> {
70    fn poll_read(
71        mut self: Pin<&mut Self>,
72        cx: &mut Context<'_>,
73        buf: &mut [u8],
74    ) -> Poll<Result<usize, Error>> {
75        let read_fut = match self.fut.as_mut() {
76            Some(fut) => fut,
77            None => {
78                // No pending read, start reading the next bytes
79                let buf_len = clamp_to_u32(buf.len());
80                let buffer = match self.buffer.take() {
81                    // Re-use the internal buffer if it is large enough,
82                    // otherwise allocate a new one
83                    Some(buffer) if buffer.byte_length() >= buf_len => buffer,
84                    _ => Uint8Array::new_with_length(buf_len),
85                };
86                // Limit to output buffer size
87                let buffer = buffer.subarray(0, buf_len).unchecked_into::<Object>();
88                match &self.reader {
89                    Some(reader) => {
90                        // Read into internal buffer and store its future
91                        let fut =
92                            JsFuture::from(reader.as_raw().read_with_array_buffer_view(&buffer));
93                        self.fut.insert(fut)
94                    }
95                    None => {
96                        // Reader was already dropped
97                        return Poll::Ready(Ok(0));
98                    }
99                }
100            }
101        };
102
103        // Poll the future for the pending read
104        let js_result = ready!(read_fut.poll_unpin(cx));
105        self.fut = None;
106
107        // Read completed
108        Poll::Ready(match js_result {
109            Ok(js_value) => {
110                let result = ReadableStreamReadResult::from(js_value);
111                if result.get_done().unwrap_or_default() {
112                    // End of stream
113                    self.discard_reader();
114                    Ok(0)
115                } else {
116                    // Cannot be canceled, so view must exist
117                    let filled_view = result.get_value().unchecked_into::<Uint8Array>();
118                    // Copy bytes to output buffer
119                    let filled_len = checked_cast_to_usize(filled_view.byte_length());
120                    debug_assert!(filled_len <= buf.len());
121                    filled_view.copy_to(&mut buf[0..filled_len]);
122                    // Re-construct internal buffer with the new ArrayBuffer
123                    self.buffer = Some(Uint8Array::new(&filled_view.buffer()));
124                    Ok(filled_len)
125                }
126            }
127            Err(js_value) => {
128                // Error
129                self.discard_reader();
130                Err(js_to_io_error(js_value))
131            }
132        })
133    }
134}
135
136impl<'reader> Drop for IntoAsyncRead<'reader> {
137    fn drop(&mut self) {
138        if self.cancel_on_drop {
139            if let Some(reader) = self.reader.take() {
140                let on_rejected = Closure::once(|_| {});
141                let _ = reader.as_raw().cancel().catch(&on_rejected);
142                on_rejected.forget();
143            }
144        }
145    }
146}