wasm_streams/readable/
into_underlying_source.rs1use std::cell::RefCell;
2use std::panic::AssertUnwindSafe;
3use std::pin::Pin;
4use std::rc::Rc;
5
6use futures_util::future::{abortable, AbortHandle, TryFutureExt};
7use futures_util::stream::{Stream, TryStreamExt};
8use js_sys::Promise;
9use wasm_bindgen::prelude::*;
10use wasm_bindgen_futures::future_to_promise;
11
12use super::sys;
13
14type JsValueStream = dyn Stream<Item = Result<JsValue, JsValue>>;
15
16#[wasm_bindgen]
17pub(crate) struct IntoUnderlyingSource {
18 inner: Rc<RefCell<Inner>>,
19 pull_handle: Option<AbortHandle>,
20}
21
22impl IntoUnderlyingSource {
23 pub fn new(stream: Box<JsValueStream>) -> Self {
24 IntoUnderlyingSource {
25 inner: Rc::new(RefCell::new(Inner::new(stream))),
26 pull_handle: None,
27 }
28 }
29}
30
31#[allow(clippy::await_holding_refcell_ref)]
32#[wasm_bindgen]
33impl IntoUnderlyingSource {
34 pub fn pull(&mut self, controller: sys::ReadableStreamDefaultController) -> Promise {
35 let inner = self.inner.clone();
36 let fut = async move {
37 let mut inner = inner.try_borrow_mut().unwrap_throw();
40 inner.pull(controller).await
41 };
42
43 let (fut, handle) = abortable(fut);
45 let fut = fut.unwrap_or_else(|_| Ok(JsValue::undefined()));
47
48 self.pull_handle = Some(handle);
49 future_to_promise(AssertUnwindSafe(fut))
54 }
55
56 pub fn cancel(self) {
57 drop(self);
59 }
60}
61
62impl Drop for IntoUnderlyingSource {
63 fn drop(&mut self) {
64 if let Some(handle) = self.pull_handle.take() {
66 handle.abort();
67 }
68 }
69}
70
71struct Inner {
72 stream: Option<Pin<Box<JsValueStream>>>,
73}
74
75impl Inner {
76 fn new(stream: Box<JsValueStream>) -> Self {
77 Inner {
78 stream: Some(stream.into()),
79 }
80 }
81
82 async fn pull(
83 &mut self,
84 controller: sys::ReadableStreamDefaultController,
85 ) -> Result<JsValue, JsValue> {
86 let mut stream = self.stream.take().unwrap_throw();
90
91 match stream.try_next().await {
92 Ok(Some(chunk)) => {
93 self.stream = Some(stream);
95 controller.enqueue_with_chunk(&chunk)?;
96 }
97 Ok(None) => {
98 controller.close()?;
100 }
101 Err(err) => {
102 return Err(err);
104 }
105 };
106 Ok(JsValue::undefined())
108 }
109}