1use std::marker::PhantomData;
2
3use js_sys::Reflect;
4use wasm_bindgen::prelude::*;
5use wasm_bindgen_futures::JsFuture;
6use web_sys::{js_sys, ReadableStream, ReadableStreamDefaultReader, ReadableStreamReadResult};
7
8use crate::{Error, PromiseExt};
9
10pub struct Reader<T: JsCast> {
12 inner: ReadableStreamDefaultReader,
13
14 read: Option<js_sys::Promise>,
16
17 _phantom: PhantomData<T>,
18}
19
20impl<T: JsCast> Reader<T> {
21 pub fn new(stream: &ReadableStream) -> Result<Self, Error> {
23 let inner = stream.get_reader().unchecked_into();
24 Ok(Self {
25 inner,
26 read: None,
27 _phantom: PhantomData,
28 })
29 }
30
31 pub async fn read(&mut self) -> Result<Option<T>, Error> {
33 if self.read.is_none() {
34 self.read = Some(self.inner.read());
35 }
36
37 let result: ReadableStreamReadResult = JsFuture::from(self.read.as_ref().unwrap().clone()).await?.into();
38 self.read.take(); if Reflect::get(&result, &"done".into())?.is_truthy() {
41 return Ok(None);
42 }
43
44 let res = Reflect::get(&result, &"value".into())?.unchecked_into();
45
46 Ok(Some(res))
47 }
48
49 pub fn abort(&mut self, reason: &str) {
51 let str = JsValue::from_str(reason);
52 self.inner.cancel_with_reason(&str).ignore();
53 }
54
55 pub async fn closed(&self) -> Result<(), Error> {
56 JsFuture::from(self.inner.closed()).await?;
57 Ok(())
58 }
59}
60
61impl<T: JsCast> Drop for Reader<T> {
62 fn drop(&mut self) {
64 self.inner.release_lock();
65 }
66}