wasm_streams/writable/
into_sink.rs1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::Sink;
5use futures_util::{ready, FutureExt};
6use wasm_bindgen::prelude::*;
7use wasm_bindgen_futures::JsFuture;
8
9use super::WritableStreamDefaultWriter;
10
11#[must_use = "sinks do nothing unless polled"]
19#[derive(Debug)]
20pub struct IntoSink<'writer> {
21 writer: Option<WritableStreamDefaultWriter<'writer>>,
22 error: Option<JsValue>,
24 ready_fut: Option<JsFuture>,
25 write_fut: Option<JsFuture>,
26 close_fut: Option<JsFuture>,
27}
28
29impl<'writer> IntoSink<'writer> {
30 #[inline]
31 pub(super) fn new(writer: WritableStreamDefaultWriter) -> IntoSink {
32 IntoSink {
33 writer: Some(writer),
34 error: None,
35 ready_fut: None,
36 write_fut: None,
37 close_fut: None,
38 }
39 }
40
41 pub async fn abort(mut self) -> Result<(), JsValue> {
44 match self.writer.take() {
45 Some(mut writer) => writer.abort().await,
46 None => Ok(()),
47 }
48 }
49
50 pub async fn abort_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
53 match self.writer.take() {
54 Some(mut writer) => writer.abort_with_reason(reason).await,
55 None => Ok(()),
56 }
57 }
58
59 fn get_error(&self) -> JsValue {
61 self.error
62 .clone()
63 .unwrap_or_else(|| JsValue::from_str("WritableStream sink is already closed"))
64 }
65}
66
67impl<'writer> Sink<JsValue> for IntoSink<'writer> {
68 type Error = JsValue;
69
70 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
71 let ready_fut = match self.ready_fut.as_mut() {
72 Some(fut) => fut,
73 None => match &self.writer {
74 Some(writer) => {
75 let fut = JsFuture::from(writer.as_raw().ready());
77 self.ready_fut.insert(fut)
78 }
79 None => {
80 return Poll::Ready(Err(self.get_error()));
82 }
83 },
84 };
85
86 let js_result = ready!(ready_fut.poll_unpin(cx));
88 self.ready_fut = None;
89
90 Poll::Ready(match js_result {
92 Ok(js_value) => {
93 debug_assert!(js_value.is_undefined());
94 Ok(())
95 }
96 Err(js_value) => {
97 self.error = Some(js_value.clone());
99 self.writer = None;
100 Err(js_value)
101 }
102 })
103 }
104
105 fn start_send(mut self: Pin<&mut Self>, item: JsValue) -> Result<(), Self::Error> {
106 match &self.writer {
107 Some(writer) => {
108 let fut = JsFuture::from(writer.as_raw().write_with_chunk(&item));
109 self.write_fut = Some(fut);
111 Ok(())
112 }
113 None => {
114 Err(self.get_error())
116 }
117 }
118 }
119
120 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121 let write_fut = match self.write_fut.as_mut() {
122 Some(fut) => fut,
123 None => {
124 return Poll::Ready(Ok(()));
126 }
127 };
128
129 let js_result = ready!(write_fut.poll_unpin(cx));
131 self.write_fut = None;
132
133 Poll::Ready(match js_result {
135 Ok(js_value) => {
136 debug_assert!(js_value.is_undefined());
137 Ok(())
138 }
139 Err(js_value) => {
140 self.error = Some(js_value.clone());
142 self.writer = None;
143 Err(js_value)
144 }
145 })
146 }
147
148 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
149 let close_fut = match self.close_fut.as_mut() {
150 Some(fut) => fut,
151 None => match &self.writer {
152 Some(writer) => {
153 let fut = JsFuture::from(writer.as_raw().close());
156 self.close_fut.insert(fut)
157 }
158 None => {
159 return Poll::Ready(Err(self.get_error()));
161 }
162 },
163 };
164
165 let js_result = ready!(close_fut.poll_unpin(cx));
167 self.close_fut = None;
168
169 self.writer = None;
171 Poll::Ready(match js_result {
172 Ok(js_value) => {
173 debug_assert!(js_value.is_undefined());
174 Ok(())
175 }
176 Err(js_value) => {
177 self.error = Some(js_value.clone());
178 Err(js_value)
179 }
180 })
181 }
182}