wasm_streams/readable/
into_underlying_byte_source.rs

1use std::cell::RefCell;
2use std::panic::AssertUnwindSafe;
3use std::pin::Pin;
4use std::rc::Rc;
5
6use futures_util::future::{abortable, AbortHandle, TryFutureExt};
7use futures_util::io::{AsyncRead, AsyncReadExt};
8use js_sys::{Error as JsError, Promise, Uint8Array};
9use wasm_bindgen::prelude::*;
10use wasm_bindgen_futures::future_to_promise;
11
12use crate::util::{checked_cast_to_u32, clamp_to_usize};
13
14use super::sys;
15
16#[wasm_bindgen]
17pub(crate) struct IntoUnderlyingByteSource {
18    inner: Rc<RefCell<Inner>>,
19    default_buffer_len: usize,
20    controller: Option<sys::ReadableByteStreamController>,
21    pull_handle: Option<AbortHandle>,
22}
23
24impl IntoUnderlyingByteSource {
25    pub fn new(async_read: Box<dyn AsyncRead>, default_buffer_len: usize) -> Self {
26        IntoUnderlyingByteSource {
27            inner: Rc::new(RefCell::new(Inner::new(async_read))),
28            default_buffer_len,
29            controller: None,
30            pull_handle: None,
31        }
32    }
33}
34
35#[allow(clippy::await_holding_refcell_ref)]
36#[wasm_bindgen]
37impl IntoUnderlyingByteSource {
38    #[wasm_bindgen(getter, js_name = type)]
39    pub fn type_(&self) -> sys::ReadableStreamType {
40        sys::ReadableStreamType::Bytes
41    }
42
43    #[wasm_bindgen(getter, js_name = autoAllocateChunkSize)]
44    pub fn auto_allocate_chunk_size(&self) -> usize {
45        self.default_buffer_len
46    }
47
48    pub fn start(&mut self, controller: sys::ReadableByteStreamController) {
49        self.controller = Some(controller);
50    }
51
52    pub fn pull(&mut self, controller: sys::ReadableByteStreamController) -> Promise {
53        let inner = self.inner.clone();
54        let fut = async move {
55            // This mutable borrow can never panic, since the ReadableStream always queues
56            // each operation on the underlying source.
57            let mut inner = inner.try_borrow_mut().unwrap_throw();
58            inner.pull(controller).await
59        };
60
61        // Allow aborting the future from cancel().
62        let (fut, handle) = abortable(fut);
63        // Ignore errors from aborting the future.
64        let fut = fut.unwrap_or_else(|_| Ok(JsValue::undefined()));
65
66        self.pull_handle = Some(handle);
67        // SAFETY: We use the take-and-replace pattern in Inner::pull() to ensure
68        // that if a panic occurs, the async_read is already taken out of the Option,
69        // leaving it in a clean None state. This prevents use of corrupted state
70        // after a panic is caught.
71        future_to_promise(AssertUnwindSafe(fut))
72    }
73
74    pub fn cancel(self) {
75        // The stream has been canceled, drop everything.
76        drop(self);
77    }
78}
79
80impl Drop for IntoUnderlyingByteSource {
81    fn drop(&mut self) {
82        // Abort the pending pull, if any.
83        if let Some(handle) = self.pull_handle.take() {
84            handle.abort();
85        }
86    }
87}
88
89struct Inner {
90    async_read: Option<Pin<Box<dyn AsyncRead>>>,
91    buffer: Vec<u8>,
92}
93
94impl Inner {
95    fn new(async_read: Box<dyn AsyncRead>) -> Self {
96        Inner {
97            async_read: Some(async_read.into()),
98            buffer: Vec::new(),
99        }
100    }
101
102    async fn pull(
103        &mut self,
104        controller: sys::ReadableByteStreamController,
105    ) -> Result<JsValue, JsValue> {
106        // We set autoAllocateChunkSize, so there should always be a BYOB request.
107        let request = controller.byob_request().unwrap_throw();
108        // Resize the buffer to fit the BYOB request.
109        let request_view = request.view().unwrap_throw().unchecked_into::<Uint8Array>();
110        let request_len = clamp_to_usize(request_view.byte_length());
111        if self.buffer.len() < request_len {
112            self.buffer.resize(request_len, 0);
113        }
114
115        // Take the async_read out before the fallible/panickable operation.
116        // This ensures that if a panic occurs, self.async_read is already None,
117        // so any subsequent call will fail cleanly instead of using corrupted state.
118        let mut async_read = self.async_read.take().unwrap_throw();
119
120        match async_read.read(&mut self.buffer[0..request_len]).await {
121            Ok(0) => {
122                // Stream closed: don't put it back, clear buffer, close controller
123                self.buffer = Vec::new();
124                controller.close()?;
125                request.respond_with_u32(0)?;
126            }
127            Ok(bytes_read) => {
128                // Success: put the async_read back for reuse
129                self.async_read = Some(async_read);
130                // Copy read bytes from buffer to BYOB request view
131                debug_assert!(bytes_read <= request_len);
132                let bytes_read_u32 = checked_cast_to_u32(bytes_read);
133                let dest = Uint8Array::new_with_byte_offset_and_length(
134                    &request_view.buffer(),
135                    request_view.byte_offset(),
136                    bytes_read_u32,
137                );
138                dest.copy_from(&self.buffer[0..bytes_read]);
139                // Respond to BYOB request
140                request.respond_with_u32(bytes_read_u32)?;
141            }
142            Err(err) => {
143                // Error: don't put it back, clear buffer, return error
144                self.buffer = Vec::new();
145                return Err(JsError::new(&err.to_string()).into());
146            }
147        };
148        // Panic: async_read is dropped during unwind, self.async_read remains None
149        Ok(JsValue::undefined())
150    }
151}