wasm_streams/readable/
into_stream.rs1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::ready;
5use futures_util::stream::{FusedStream, Stream};
6use futures_util::FutureExt;
7use wasm_bindgen::prelude::*;
8use wasm_bindgen_futures::JsFuture;
9
10use super::sys::ReadableStreamReadResult;
11use super::ReadableStreamDefaultReader;
12
13#[must_use = "streams do nothing unless polled"]
21#[derive(Debug)]
22pub struct IntoStream<'reader> {
23 reader: Option<ReadableStreamDefaultReader<'reader>>,
24 fut: Option<JsFuture>,
25 cancel_on_drop: bool,
26}
27
28impl<'reader> IntoStream<'reader> {
29 #[inline]
30 pub(super) fn new(reader: ReadableStreamDefaultReader, cancel_on_drop: bool) -> IntoStream {
31 IntoStream {
32 reader: Some(reader),
33 fut: None,
34 cancel_on_drop,
35 }
36 }
37
38 pub async fn cancel(mut self) -> Result<(), JsValue> {
41 match self.reader.take() {
42 Some(mut reader) => reader.cancel().await,
43 None => Ok(()),
44 }
45 }
46
47 pub async fn cancel_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
50 match self.reader.take() {
51 Some(mut reader) => reader.cancel_with_reason(reason).await,
52 None => Ok(()),
53 }
54 }
55}
56
57impl FusedStream for IntoStream<'_> {
58 fn is_terminated(&self) -> bool {
59 self.reader.is_none() && self.fut.is_none()
60 }
61}
62
63impl<'reader> Stream for IntoStream<'reader> {
64 type Item = Result<JsValue, JsValue>;
65
66 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 let read_fut = match self.fut.as_mut() {
68 Some(fut) => fut,
69 None => match &self.reader {
70 Some(reader) => {
71 let fut = JsFuture::from(reader.as_raw().read());
74 self.fut.insert(fut)
75 }
76 None => {
77 return Poll::Ready(None);
79 }
80 },
81 };
82
83 let js_result = ready!(read_fut.poll_unpin(cx));
85 self.fut = None;
86
87 Poll::Ready(match js_result {
89 Ok(js_value) => {
90 let result = ReadableStreamReadResult::from(js_value);
91 if result.get_done().unwrap_or_default() {
92 self.reader = None;
94 None
95 } else {
96 Some(Ok(result.get_value()))
97 }
98 }
99 Err(js_value) => {
100 self.reader = None;
102 Some(Err(js_value))
103 }
104 })
105 }
106}
107
108impl<'reader> Drop for IntoStream<'reader> {
109 fn drop(&mut self) {
110 if self.cancel_on_drop {
111 if let Some(reader) = self.reader.take() {
112 let on_rejected = Closure::once(|_| {});
113 let _ = reader.as_raw().cancel().catch(&on_rejected);
114 on_rejected.forget();
115 }
116 }
117 }
118}