wasm_streams/writable/
into_underlying_sink.rs

1use std::cell::RefCell;
2use std::panic::AssertUnwindSafe;
3use std::pin::Pin;
4use std::rc::Rc;
5
6use futures_util::{Sink, SinkExt};
7use js_sys::Promise;
8use wasm_bindgen::prelude::*;
9use wasm_bindgen_futures::future_to_promise;
10
11#[wasm_bindgen]
12pub(crate) struct IntoUnderlyingSink {
13    inner: Rc<RefCell<Inner>>,
14}
15
16impl IntoUnderlyingSink {
17    pub fn new(sink: Box<dyn Sink<JsValue, Error = JsValue>>) -> Self {
18        IntoUnderlyingSink {
19            inner: Rc::new(RefCell::new(Inner::new(sink))),
20        }
21    }
22}
23
24#[allow(clippy::await_holding_refcell_ref)]
25#[wasm_bindgen]
26impl IntoUnderlyingSink {
27    pub fn write(&mut self, chunk: JsValue) -> Promise {
28        let inner = self.inner.clone();
29        // SAFETY: We use the take-and-replace pattern in Inner::write() to ensure
30        // that if a panic occurs, the sink is already taken out of the Option,
31        // leaving it in a clean None state. This prevents use of corrupted state
32        // after a panic is caught.
33        future_to_promise(AssertUnwindSafe(async move {
34            // This mutable borrow can never panic, since the WritableStream always queues
35            // each operation on the underlying sink.
36            let mut inner = inner.try_borrow_mut().unwrap_throw();
37            inner.write(chunk).await.map(|_| JsValue::undefined())
38        }))
39    }
40
41    pub fn close(self) -> Promise {
42        // SAFETY: Inner::close() takes the sink before the fallible operation.
43        future_to_promise(AssertUnwindSafe(async move {
44            let mut inner = self.inner.try_borrow_mut().unwrap_throw();
45            inner.close().await.map(|_| JsValue::undefined())
46        }))
47    }
48
49    pub fn abort(self, reason: JsValue) -> Promise {
50        // SAFETY: Inner::abort() just sets sink to None, no fallible operation.
51        future_to_promise(AssertUnwindSafe(async move {
52            let mut inner = self.inner.try_borrow_mut().unwrap_throw();
53            inner.abort(reason).await.map(|_| JsValue::undefined())
54        }))
55    }
56}
57
58struct Inner {
59    sink: Option<Pin<Box<dyn Sink<JsValue, Error = JsValue>>>>,
60}
61
62impl Inner {
63    fn new(sink: Box<dyn Sink<JsValue, Error = JsValue>>) -> Self {
64        Inner {
65            sink: Some(sink.into()),
66        }
67    }
68
69    async fn write(&mut self, chunk: JsValue) -> Result<(), JsValue> {
70        // Take the sink out before the fallible/panickable operation.
71        // This ensures that if a panic occurs, self.sink is already None,
72        // so any subsequent call will fail cleanly instead of using corrupted state.
73        let mut sink = self.sink.take().unwrap_throw();
74
75        match sink.send(chunk).await {
76            Ok(()) => {
77                // Success: put the sink back for reuse
78                self.sink = Some(sink);
79                Ok(())
80            }
81            Err(err) => {
82                // Error: the sink is dropped, self.sink remains None
83                Err(err)
84            }
85        }
86        // Panic: sink is dropped during unwind, self.sink remains None
87    }
88
89    async fn close(&mut self) -> Result<(), JsValue> {
90        // Take ownership and close - sink is dropped after close completes
91        self.sink.take().unwrap_throw().close().await
92    }
93
94    async fn abort(&mut self, _reason: JsValue) -> Result<(), JsValue> {
95        // Take and drop the sink immediately
96        self.sink = None;
97        Ok(())
98    }
99}