wasm_streams/readable/
into_underlying_byte_source.rs1use std::cell::RefCell;
2use std::panic::AssertUnwindSafe;
3use std::pin::Pin;
4use std::rc::Rc;
5
6use futures_util::future::{abortable, AbortHandle, TryFutureExt};
7use futures_util::io::{AsyncRead, AsyncReadExt};
8use js_sys::{Error as JsError, Promise, Uint8Array};
9use wasm_bindgen::prelude::*;
10use wasm_bindgen_futures::future_to_promise;
11
12use crate::util::{checked_cast_to_u32, clamp_to_usize};
13
14use super::sys;
15
16#[wasm_bindgen]
17pub(crate) struct IntoUnderlyingByteSource {
18 inner: Rc<RefCell<Inner>>,
19 default_buffer_len: usize,
20 controller: Option<sys::ReadableByteStreamController>,
21 pull_handle: Option<AbortHandle>,
22}
23
24impl IntoUnderlyingByteSource {
25 pub fn new(async_read: Box<dyn AsyncRead>, default_buffer_len: usize) -> Self {
26 IntoUnderlyingByteSource {
27 inner: Rc::new(RefCell::new(Inner::new(async_read))),
28 default_buffer_len,
29 controller: None,
30 pull_handle: None,
31 }
32 }
33}
34
35#[allow(clippy::await_holding_refcell_ref)]
36#[wasm_bindgen]
37impl IntoUnderlyingByteSource {
38 #[wasm_bindgen(getter, js_name = type)]
39 pub fn type_(&self) -> sys::ReadableStreamType {
40 sys::ReadableStreamType::Bytes
41 }
42
43 #[wasm_bindgen(getter, js_name = autoAllocateChunkSize)]
44 pub fn auto_allocate_chunk_size(&self) -> usize {
45 self.default_buffer_len
46 }
47
48 pub fn start(&mut self, controller: sys::ReadableByteStreamController) {
49 self.controller = Some(controller);
50 }
51
52 pub fn pull(&mut self, controller: sys::ReadableByteStreamController) -> Promise {
53 let inner = self.inner.clone();
54 let fut = async move {
55 let mut inner = inner.try_borrow_mut().unwrap_throw();
58 inner.pull(controller).await
59 };
60
61 let (fut, handle) = abortable(fut);
63 let fut = fut.unwrap_or_else(|_| Ok(JsValue::undefined()));
65
66 self.pull_handle = Some(handle);
67 future_to_promise(AssertUnwindSafe(fut))
72 }
73
74 pub fn cancel(self) {
75 drop(self);
77 }
78}
79
80impl Drop for IntoUnderlyingByteSource {
81 fn drop(&mut self) {
82 if let Some(handle) = self.pull_handle.take() {
84 handle.abort();
85 }
86 }
87}
88
89struct Inner {
90 async_read: Option<Pin<Box<dyn AsyncRead>>>,
91 buffer: Vec<u8>,
92}
93
94impl Inner {
95 fn new(async_read: Box<dyn AsyncRead>) -> Self {
96 Inner {
97 async_read: Some(async_read.into()),
98 buffer: Vec::new(),
99 }
100 }
101
102 async fn pull(
103 &mut self,
104 controller: sys::ReadableByteStreamController,
105 ) -> Result<JsValue, JsValue> {
106 let request = controller.byob_request().unwrap_throw();
108 let request_view = request.view().unwrap_throw().unchecked_into::<Uint8Array>();
110 let request_len = clamp_to_usize(request_view.byte_length());
111 if self.buffer.len() < request_len {
112 self.buffer.resize(request_len, 0);
113 }
114
115 let mut async_read = self.async_read.take().unwrap_throw();
119
120 match async_read.read(&mut self.buffer[0..request_len]).await {
121 Ok(0) => {
122 self.buffer = Vec::new();
124 controller.close()?;
125 request.respond_with_u32(0)?;
126 }
127 Ok(bytes_read) => {
128 self.async_read = Some(async_read);
130 debug_assert!(bytes_read <= request_len);
132 let bytes_read_u32 = checked_cast_to_u32(bytes_read);
133 let dest = Uint8Array::new_with_byte_offset_and_length(
134 &request_view.buffer(),
135 request_view.byte_offset(),
136 bytes_read_u32,
137 );
138 dest.copy_from(&self.buffer[0..bytes_read]);
139 request.respond_with_u32(bytes_read_u32)?;
141 }
142 Err(err) => {
143 self.buffer = Vec::new();
145 return Err(JsError::new(&err.to_string()).into());
146 }
147 };
148 Ok(JsValue::undefined())
150 }
151}