wasm_streams/readable/
into_async_read.rs1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::io::{AsyncRead, Error};
5use futures_util::ready;
6use futures_util::FutureExt;
7use js_sys::{Object, Uint8Array};
8use wasm_bindgen::prelude::*;
9use wasm_bindgen::JsCast;
10use wasm_bindgen_futures::JsFuture;
11
12use crate::util::{checked_cast_to_usize, clamp_to_u32, js_to_io_error};
13
14use super::sys::ReadableStreamReadResult;
15use super::ReadableStreamBYOBReader;
16
17#[must_use = "readers do nothing unless polled"]
25#[derive(Debug)]
26pub struct IntoAsyncRead<'reader> {
27 reader: Option<ReadableStreamBYOBReader<'reader>>,
28 buffer: Option<Uint8Array>,
29 fut: Option<JsFuture>,
30 cancel_on_drop: bool,
31}
32
33impl<'reader> IntoAsyncRead<'reader> {
34 #[inline]
35 pub(super) fn new(reader: ReadableStreamBYOBReader, cancel_on_drop: bool) -> IntoAsyncRead {
36 IntoAsyncRead {
37 reader: Some(reader),
38 buffer: None,
39 fut: None,
40 cancel_on_drop,
41 }
42 }
43
44 pub async fn cancel(mut self) -> Result<(), JsValue> {
47 match self.reader.take() {
48 Some(mut reader) => reader.cancel().await,
49 None => Ok(()),
50 }
51 }
52
53 pub async fn cancel_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
56 match self.reader.take() {
57 Some(mut reader) => reader.cancel_with_reason(reason).await,
58 None => Ok(()),
59 }
60 }
61
62 #[inline]
63 fn discard_reader(mut self: Pin<&mut Self>) {
64 self.reader = None;
65 self.buffer = None;
66 }
67}
68
69impl<'reader> AsyncRead for IntoAsyncRead<'reader> {
70 fn poll_read(
71 mut self: Pin<&mut Self>,
72 cx: &mut Context<'_>,
73 buf: &mut [u8],
74 ) -> Poll<Result<usize, Error>> {
75 let read_fut = match self.fut.as_mut() {
76 Some(fut) => fut,
77 None => {
78 let buf_len = clamp_to_u32(buf.len());
80 let buffer = match self.buffer.take() {
81 Some(buffer) if buffer.byte_length() >= buf_len => buffer,
84 _ => Uint8Array::new_with_length(buf_len),
85 };
86 let buffer = buffer.subarray(0, buf_len).unchecked_into::<Object>();
88 match &self.reader {
89 Some(reader) => {
90 let fut =
92 JsFuture::from(reader.as_raw().read_with_array_buffer_view(&buffer));
93 self.fut.insert(fut)
94 }
95 None => {
96 return Poll::Ready(Ok(0));
98 }
99 }
100 }
101 };
102
103 let js_result = ready!(read_fut.poll_unpin(cx));
105 self.fut = None;
106
107 Poll::Ready(match js_result {
109 Ok(js_value) => {
110 let result = ReadableStreamReadResult::from(js_value);
111 if result.get_done().unwrap_or_default() {
112 self.discard_reader();
114 Ok(0)
115 } else {
116 let filled_view = result.get_value().unchecked_into::<Uint8Array>();
118 let filled_len = checked_cast_to_usize(filled_view.byte_length());
120 debug_assert!(filled_len <= buf.len());
121 filled_view.copy_to(&mut buf[0..filled_len]);
122 self.buffer = Some(Uint8Array::new(&filled_view.buffer()));
124 Ok(filled_len)
125 }
126 }
127 Err(js_value) => {
128 self.discard_reader();
130 Err(js_to_io_error(js_value))
131 }
132 })
133 }
134}
135
136impl<'reader> Drop for IntoAsyncRead<'reader> {
137 fn drop(&mut self) {
138 if self.cancel_on_drop {
139 if let Some(reader) = self.reader.take() {
140 let on_rejected = Closure::once(|_| {});
141 let _ = reader.as_raw().cancel().catch(&on_rejected);
142 on_rejected.forget();
143 }
144 }
145 }
146}