wasm_streams/readable/
into_underlying_source.rs

1use std::cell::RefCell;
2use std::panic::AssertUnwindSafe;
3use std::pin::Pin;
4use std::rc::Rc;
5
6use futures_util::future::{abortable, AbortHandle, TryFutureExt};
7use futures_util::stream::{Stream, TryStreamExt};
8use js_sys::Promise;
9use wasm_bindgen::prelude::*;
10use wasm_bindgen_futures::future_to_promise;
11
12use super::sys;
13
14type JsValueStream = dyn Stream<Item = Result<JsValue, JsValue>>;
15
16#[wasm_bindgen]
17pub(crate) struct IntoUnderlyingSource {
18    inner: Rc<RefCell<Inner>>,
19    pull_handle: Option<AbortHandle>,
20}
21
22impl IntoUnderlyingSource {
23    pub fn new(stream: Box<JsValueStream>) -> Self {
24        IntoUnderlyingSource {
25            inner: Rc::new(RefCell::new(Inner::new(stream))),
26            pull_handle: None,
27        }
28    }
29}
30
31#[allow(clippy::await_holding_refcell_ref)]
32#[wasm_bindgen]
33impl IntoUnderlyingSource {
34    pub fn pull(&mut self, controller: sys::ReadableStreamDefaultController) -> Promise {
35        let inner = self.inner.clone();
36        let fut = async move {
37            // This mutable borrow can never panic, since the ReadableStream always queues
38            // each operation on the underlying source.
39            let mut inner = inner.try_borrow_mut().unwrap_throw();
40            inner.pull(controller).await
41        };
42
43        // Allow aborting the future from cancel().
44        let (fut, handle) = abortable(fut);
45        // Ignore errors from aborting the future.
46        let fut = fut.unwrap_or_else(|_| Ok(JsValue::undefined()));
47
48        self.pull_handle = Some(handle);
49        // SAFETY: We use the take-and-replace pattern in Inner::pull() to ensure
50        // that if a panic occurs, the stream is already taken out of the Option,
51        // leaving it in a clean None state. This prevents use of corrupted state
52        // after a panic is caught.
53        future_to_promise(AssertUnwindSafe(fut))
54    }
55
56    pub fn cancel(self) {
57        // The stream has been canceled, drop everything.
58        drop(self);
59    }
60}
61
62impl Drop for IntoUnderlyingSource {
63    fn drop(&mut self) {
64        // Abort the pending pull, if any.
65        if let Some(handle) = self.pull_handle.take() {
66            handle.abort();
67        }
68    }
69}
70
71struct Inner {
72    stream: Option<Pin<Box<JsValueStream>>>,
73}
74
75impl Inner {
76    fn new(stream: Box<JsValueStream>) -> Self {
77        Inner {
78            stream: Some(stream.into()),
79        }
80    }
81
82    async fn pull(
83        &mut self,
84        controller: sys::ReadableStreamDefaultController,
85    ) -> Result<JsValue, JsValue> {
86        // Take the stream out before the fallible/panickable operation.
87        // This ensures that if a panic occurs, self.stream is already None,
88        // so any subsequent call will fail cleanly instead of using corrupted state.
89        let mut stream = self.stream.take().unwrap_throw();
90
91        match stream.try_next().await {
92            Ok(Some(chunk)) => {
93                // Success with chunk: put the stream back and enqueue
94                self.stream = Some(stream);
95                controller.enqueue_with_chunk(&chunk)?;
96            }
97            Ok(None) => {
98                // Stream closed: don't put it back (it's exhausted), close controller
99                controller.close()?;
100            }
101            Err(err) => {
102                // Error: don't put it back, return the error
103                return Err(err);
104            }
105        };
106        // Panic: stream is dropped during unwind, self.stream remains None
107        Ok(JsValue::undefined())
108    }
109}