web_streams/
reader.rs

1use std::marker::PhantomData;
2
3use js_sys::Reflect;
4use wasm_bindgen::prelude::*;
5use wasm_bindgen_futures::JsFuture;
6use web_sys::{js_sys, ReadableStream, ReadableStreamDefaultReader, ReadableStreamReadResult};
7
8use crate::{Error, PromiseExt};
9
10/// A wrapper around ReadableStream
11pub struct Reader<T: JsCast> {
12	inner: ReadableStreamDefaultReader,
13
14	// Keep the most recent promise to make `read` cancelable
15	read: Option<js_sys::Promise>,
16
17	_phantom: PhantomData<T>,
18}
19
20impl<T: JsCast> Reader<T> {
21	/// Grab a lock on the given readable stream until dropped.
22	pub fn new(stream: &ReadableStream) -> Result<Self, Error> {
23		let inner = stream.get_reader().unchecked_into();
24		Ok(Self {
25			inner,
26			read: None,
27			_phantom: PhantomData,
28		})
29	}
30
31	/// Read the next element from the stream, returning None if the stream is done.
32	pub async fn read(&mut self) -> Result<Option<T>, Error> {
33		if self.read.is_none() {
34			self.read = Some(self.inner.read());
35		}
36
37		let result: ReadableStreamReadResult = JsFuture::from(self.read.as_ref().unwrap().clone()).await?.into();
38		self.read.take(); // Clear the promise on success
39
40		if Reflect::get(&result, &"done".into())?.is_truthy() {
41			return Ok(None);
42		}
43
44		let res = Reflect::get(&result, &"value".into())?.unchecked_into();
45
46		Ok(Some(res))
47	}
48
49	/// Abort the stream early with the given reason.
50	pub fn abort(&mut self, reason: &str) {
51		let str = JsValue::from_str(reason);
52		self.inner.cancel_with_reason(&str).ignore();
53	}
54
55	pub async fn closed(&self) -> Result<(), Error> {
56		JsFuture::from(self.inner.closed()).await?;
57		Ok(())
58	}
59}
60
61impl<T: JsCast> Drop for Reader<T> {
62	/// Release the lock
63	fn drop(&mut self) {
64		self.inner.release_lock();
65	}
66}