1use std::future::Future;
2use std::io;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::Buf;
7use futures_io::AsyncWrite;
8use js_sys::{Reflect, Uint8Array};
9use web_sys::WebTransportSendStream;
10
11use crate::Error;
12use web_streams::Writer;
13
14type WriteFuture = Pin<Box<dyn Future<Output = (Writer, io::Result<usize>)>>>;
15
16enum WriteState {
17 Idle,
18 Writing(WriteFuture),
19}
20
21pub struct SendStream {
23 stream: WebTransportSendStream,
24 writer: Option<Writer>,
25 write_state: WriteState,
26 is_closed: bool,
27}
28
29impl SendStream {
30 pub(super) fn new(stream: WebTransportSendStream) -> Result<Self, Error> {
31 let writer = Writer::new(&stream)?;
32 Ok(Self {
33 stream,
34 writer: Some(writer),
35 write_state: WriteState::Idle,
36 is_closed: false,
37 })
38 }
39
40 pub async fn write(&mut self, buf: &[u8]) -> Result<(), Error> {
42 let writer = self
43 .writer
44 .as_mut()
45 .ok_or_else(|| Error::Unknown("writer is unavailable".into()))?;
46 writer
47 .write(&Uint8Array::from(buf))
48 .await
49 .map_err(Into::into)
50 }
51
52 pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, Error> {
54 let chunk = buf.chunk();
55 let size = chunk.len();
56 let writer = self
57 .writer
58 .as_mut()
59 .ok_or_else(|| Error::Unknown("writer is unavailable".into()))?;
60 writer.write(&Uint8Array::from(chunk)).await?;
61 buf.advance(size);
62 Ok(size)
63 }
64
65 pub fn reset(&mut self, reason: &str) {
67 if let Some(writer) = self.writer.as_mut() {
68 writer.abort(reason);
69 }
70 }
71
72 pub fn finish(&mut self) -> Result<(), Error> {
76 if let Some(writer) = self.writer.as_mut() {
77 writer.close();
78 }
79 self.is_closed = true;
80 Ok(())
81 }
82
83 pub fn set_priority(&mut self, priority: i32) {
87 Reflect::set(&self.stream, &"sendOrder".into(), &priority.into())
88 .expect("failed to set priority");
89 }
90
91 pub async fn closed(&self) -> Result<Option<u8>, Error> {
93 let writer = match self.writer.as_ref() {
94 Some(writer) => writer,
95 None => return Err(Error::Unknown("writer is unavailable".into())),
96 };
97
98 let err = match writer.closed().await {
99 Ok(()) => return Ok(None),
100 Err(err) => Error::from(err),
101 };
102
103 if let Error::Stream(err) = &err {
105 if let Some(code) = err.stream_error_code() {
106 return Ok(Some(code));
107 }
108 }
109
110 Err(err)
111 }
112}
113
114impl Drop for SendStream {
115 fn drop(&mut self) {
117 if let Some(writer) = self.writer.as_mut() {
118 writer.close();
119 }
120 }
121}
122
123impl SendStream {
124 fn poll_inflight_write(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
125 match &mut self.write_state {
126 WriteState::Idle => Poll::Ready(Ok(0)),
127 WriteState::Writing(fut) => match fut.as_mut().poll(cx) {
128 Poll::Pending => Poll::Pending,
129 Poll::Ready((writer, result)) => {
130 self.writer = Some(writer);
131 self.write_state = WriteState::Idle;
132 Poll::Ready(result)
133 }
134 },
135 }
136 }
137
138 fn error_unavailable() -> io::Error {
139 io::Error::new(io::ErrorKind::Other, "writer is unavailable")
140 }
141
142 fn to_io_error(error: Error) -> io::Error {
143 io::Error::new(io::ErrorKind::Other, error.to_string())
144 }
145}
146
147impl AsyncWrite for SendStream {
148 fn poll_write(
149 mut self: Pin<&mut Self>,
150 cx: &mut Context<'_>,
151 buf: &[u8],
152 ) -> Poll<io::Result<usize>> {
153 if buf.is_empty() {
154 return Poll::Ready(Ok(0));
155 }
156
157 if self.is_closed {
158 return Poll::Ready(Err(io::Error::new(
159 io::ErrorKind::BrokenPipe,
160 "stream is already closed",
161 )));
162 }
163
164 if matches!(self.write_state, WriteState::Idle) {
165 let mut writer = match self.writer.take() {
166 Some(writer) => writer,
167 None => return Poll::Ready(Err(Self::error_unavailable())),
168 };
169
170 let payload = Vec::from(buf);
171 let size = payload.len();
172 let fut = Box::pin(async move {
173 let result = writer
174 .write(&Uint8Array::from(payload.as_slice()))
175 .await
176 .map(|_| size)
177 .map_err(|err| Self::to_io_error(err.into()));
178 (writer, result)
179 });
180 self.write_state = WriteState::Writing(fut);
181 }
182
183 self.poll_inflight_write(cx)
184 }
185
186 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
187 match self.poll_inflight_write(cx) {
188 Poll::Pending => Poll::Pending,
189 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
190 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
191 }
192 }
193
194 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
195 match self.as_mut().poll_flush(cx) {
196 Poll::Pending => Poll::Pending,
197 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
198 Poll::Ready(Ok(())) => {
199 if !self.is_closed {
200 let writer = match self.writer.as_mut() {
201 Some(writer) => writer,
202 None => return Poll::Ready(Err(Self::error_unavailable())),
203 };
204 writer.close();
205 self.is_closed = true;
206 }
207 Poll::Ready(Ok(()))
208 }
209 }
210 }
211}
212
213#[cfg(target_family = "wasm")]
214impl webtrans_trait::SendStream for SendStream {
215 type Error = Error;
216
217 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
218 Self::write(self, buf).await?;
219 Ok(buf.len())
220 }
221
222 fn set_priority(&mut self, order: u8) {
223 Self::set_priority(self, i32::from(order));
224 }
225
226 fn finish(&mut self) -> Result<(), Self::Error> {
227 Self::finish(self)
228 }
229
230 fn reset(&mut self, code: u32) {
231 Self::reset(self, &code.to_string());
232 }
233
234 async fn closed(&mut self) -> Result<(), Self::Error> {
235 match Self::closed(self).await? {
236 Some(code) => Err(Error::Unknown(
237 format!("stream closed with code {code}").into(),
238 )),
239 None => Ok(()),
240 }
241 }
242}