wasm_streams/writable/
into_sink.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::Sink;
5use futures_util::{ready, FutureExt};
6use wasm_bindgen::prelude::*;
7use wasm_bindgen_futures::JsFuture;
8
9use super::WritableStreamDefaultWriter;
10
11/// A [`Sink`] for the [`into_sink`](super::WritableStream::into_sink) method.
12///
13/// This sink holds a writer, and therefore locks the [`WritableStream`](super::WritableStream).
14/// When this sink is dropped, it also drops its writer which in turn
15/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
16///
17/// [`Sink`]: https://docs.rs/futures/0.3.30/futures/sink/trait.Sink.html
18#[must_use = "sinks do nothing unless polled"]
19#[derive(Debug)]
20pub struct IntoSink<'writer> {
21    writer: Option<WritableStreamDefaultWriter<'writer>>,
22    /// If an error occurred, this holds the error to return on subsequent operations.
23    error: Option<JsValue>,
24    ready_fut: Option<JsFuture>,
25    write_fut: Option<JsFuture>,
26    close_fut: Option<JsFuture>,
27}
28
29impl<'writer> IntoSink<'writer> {
30    #[inline]
31    pub(super) fn new(writer: WritableStreamDefaultWriter) -> IntoSink {
32        IntoSink {
33            writer: Some(writer),
34            error: None,
35            ready_fut: None,
36            write_fut: None,
37            close_fut: None,
38        }
39    }
40
41    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream,
42    /// signaling that the producer can no longer successfully write to the stream.
43    pub async fn abort(mut self) -> Result<(), JsValue> {
44        match self.writer.take() {
45            Some(mut writer) => writer.abort().await,
46            None => Ok(()),
47        }
48    }
49
50    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream,
51    /// signaling that the producer can no longer successfully write to the stream.
52    pub async fn abort_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
53        match self.writer.take() {
54            Some(mut writer) => writer.abort_with_reason(reason).await,
55            None => Ok(()),
56        }
57    }
58
59    /// Returns the stored error, or a default "sink is closed" error.
60    fn get_error(&self) -> JsValue {
61        self.error
62            .clone()
63            .unwrap_or_else(|| JsValue::from_str("WritableStream sink is already closed"))
64    }
65}
66
67impl<'writer> Sink<JsValue> for IntoSink<'writer> {
68    type Error = JsValue;
69
70    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
71        let ready_fut = match self.ready_fut.as_mut() {
72            Some(fut) => fut,
73            None => match &self.writer {
74                Some(writer) => {
75                    // No pending ready future yet, create one from ready promise
76                    let fut = JsFuture::from(writer.as_raw().ready());
77                    self.ready_fut.insert(fut)
78                }
79                None => {
80                    // Writer was already dropped due to error or close
81                    return Poll::Ready(Err(self.get_error()));
82                }
83            },
84        };
85
86        // Poll the ready future
87        let js_result = ready!(ready_fut.poll_unpin(cx));
88        self.ready_fut = None;
89
90        // Ready future completed
91        Poll::Ready(match js_result {
92            Ok(js_value) => {
93                debug_assert!(js_value.is_undefined());
94                Ok(())
95            }
96            Err(js_value) => {
97                // Error, store it and drop writer
98                self.error = Some(js_value.clone());
99                self.writer = None;
100                Err(js_value)
101            }
102        })
103    }
104
105    fn start_send(mut self: Pin<&mut Self>, item: JsValue) -> Result<(), Self::Error> {
106        match &self.writer {
107            Some(writer) => {
108                let fut = JsFuture::from(writer.as_raw().write_with_chunk(&item));
109                // Set or replace the pending write future
110                self.write_fut = Some(fut);
111                Ok(())
112            }
113            None => {
114                // Writer was already dropped due to error or close
115                Err(self.get_error())
116            }
117        }
118    }
119
120    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121        let write_fut = match self.write_fut.as_mut() {
122            Some(fut) => fut,
123            None => {
124                // If we're not writing, then there's nothing to flush
125                return Poll::Ready(Ok(()));
126            }
127        };
128
129        // Poll the write future
130        let js_result = ready!(write_fut.poll_unpin(cx));
131        self.write_fut = None;
132
133        // Write future completed
134        Poll::Ready(match js_result {
135            Ok(js_value) => {
136                debug_assert!(js_value.is_undefined());
137                Ok(())
138            }
139            Err(js_value) => {
140                // Error, store it and drop writer
141                self.error = Some(js_value.clone());
142                self.writer = None;
143                Err(js_value)
144            }
145        })
146    }
147
148    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
149        let close_fut = match self.close_fut.as_mut() {
150            Some(fut) => fut,
151            None => match &self.writer {
152                Some(writer) => {
153                    // No pending close future
154                    // Start closing the stream and create future from close promise
155                    let fut = JsFuture::from(writer.as_raw().close());
156                    self.close_fut.insert(fut)
157                }
158                None => {
159                    // Writer was already dropped due to error or close
160                    return Poll::Ready(Err(self.get_error()));
161                }
162            },
163        };
164
165        // Poll the close future
166        let js_result = ready!(close_fut.poll_unpin(cx));
167        self.close_fut = None;
168
169        // Close future completed
170        self.writer = None;
171        Poll::Ready(match js_result {
172            Ok(js_value) => {
173                debug_assert!(js_value.is_undefined());
174                Ok(())
175            }
176            Err(js_value) => {
177                self.error = Some(js_value.clone());
178                Err(js_value)
179            }
180        })
181    }
182}