Skip to main content

webtrans_wasm/
send.rs

1use std::future::Future;
2use std::io;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::Buf;
7use futures_io::AsyncWrite;
8use js_sys::{Reflect, Uint8Array};
9use web_sys::WebTransportSendStream;
10
11use crate::Error;
12use web_streams::Writer;
13
14type WriteFuture = Pin<Box<dyn Future<Output = (Writer, io::Result<usize>)>>>;
15
16enum WriteState {
17    Idle,
18    Writing(WriteFuture),
19}
20
21/// A byte stream sent to the remote peer.
22pub struct SendStream {
23    stream: WebTransportSendStream,
24    writer: Option<Writer>,
25    write_state: WriteState,
26    is_closed: bool,
27}
28
29impl SendStream {
30    pub(super) fn new(stream: WebTransportSendStream) -> Result<Self, Error> {
31        let writer = Writer::new(&stream)?;
32        Ok(Self {
33            stream,
34            writer: Some(writer),
35            write_state: WriteState::Idle,
36            is_closed: false,
37        })
38    }
39
40    /// Write all of the provided bytes to the stream.
41    pub async fn write(&mut self, buf: &[u8]) -> Result<(), Error> {
42        let writer = self
43            .writer
44            .as_mut()
45            .ok_or_else(|| Error::Unknown("writer is unavailable".into()))?;
46        writer
47            .write(&Uint8Array::from(buf))
48            .await
49            .map_err(Into::into)
50    }
51
52    /// Write some of the provided buffer to the stream.
53    pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, Error> {
54        let chunk = buf.chunk();
55        let size = chunk.len();
56        let writer = self
57            .writer
58            .as_mut()
59            .ok_or_else(|| Error::Unknown("writer is unavailable".into()))?;
60        writer.write(&Uint8Array::from(chunk)).await?;
61        buf.advance(size);
62        Ok(size)
63    }
64
65    /// Send an immediate reset, closing the stream with an error.
66    pub fn reset(&mut self, reason: &str) {
67        if let Some(writer) = self.writer.as_mut() {
68            writer.abort(reason);
69        }
70    }
71
72    /// Mark the stream as finished.
73    ///
74    /// This is called on drop, but can also be invoked manually.
75    pub fn finish(&mut self) -> Result<(), Error> {
76        if let Some(writer) = self.writer.as_mut() {
77            writer.close();
78        }
79        self.is_closed = true;
80        Ok(())
81    }
82
83    /// Set the stream's priority.
84    ///
85    /// Streams with higher values are sent first, but delivery order is not guaranteed.
86    pub fn set_priority(&mut self, priority: i32) {
87        Reflect::set(&self.stream, &"sendOrder".into(), &priority.into())
88            .expect("failed to set priority");
89    }
90
91    /// Block until the stream has closed and return the error code, if any.
92    pub async fn closed(&self) -> Result<Option<u8>, Error> {
93        let writer = match self.writer.as_ref() {
94            Some(writer) => writer,
95            None => return Err(Error::Unknown("writer is unavailable".into())),
96        };
97
98        let err = match writer.closed().await {
99            Ok(()) => return Ok(None),
100            Err(err) => Error::from(err),
101        };
102
103        // If this is a WebTransportError, extract the error code when available.
104        if let Error::Stream(err) = &err {
105            if let Some(code) = err.stream_error_code() {
106                return Ok(Some(code));
107            }
108        }
109
110        Err(err)
111    }
112}
113
114impl Drop for SendStream {
115    /// Close the stream with a FIN.
116    fn drop(&mut self) {
117        if let Some(writer) = self.writer.as_mut() {
118            writer.close();
119        }
120    }
121}
122
123impl SendStream {
124    fn poll_inflight_write(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
125        match &mut self.write_state {
126            WriteState::Idle => Poll::Ready(Ok(0)),
127            WriteState::Writing(fut) => match fut.as_mut().poll(cx) {
128                Poll::Pending => Poll::Pending,
129                Poll::Ready((writer, result)) => {
130                    self.writer = Some(writer);
131                    self.write_state = WriteState::Idle;
132                    Poll::Ready(result)
133                }
134            },
135        }
136    }
137
138    fn error_unavailable() -> io::Error {
139        io::Error::new(io::ErrorKind::Other, "writer is unavailable")
140    }
141
142    fn to_io_error(error: Error) -> io::Error {
143        io::Error::new(io::ErrorKind::Other, error.to_string())
144    }
145}
146
147impl AsyncWrite for SendStream {
148    fn poll_write(
149        mut self: Pin<&mut Self>,
150        cx: &mut Context<'_>,
151        buf: &[u8],
152    ) -> Poll<io::Result<usize>> {
153        if buf.is_empty() {
154            return Poll::Ready(Ok(0));
155        }
156
157        if self.is_closed {
158            return Poll::Ready(Err(io::Error::new(
159                io::ErrorKind::BrokenPipe,
160                "stream is already closed",
161            )));
162        }
163
164        if matches!(self.write_state, WriteState::Idle) {
165            let mut writer = match self.writer.take() {
166                Some(writer) => writer,
167                None => return Poll::Ready(Err(Self::error_unavailable())),
168            };
169
170            let payload = Vec::from(buf);
171            let size = payload.len();
172            let fut = Box::pin(async move {
173                let result = writer
174                    .write(&Uint8Array::from(payload.as_slice()))
175                    .await
176                    .map(|_| size)
177                    .map_err(|err| Self::to_io_error(err.into()));
178                (writer, result)
179            });
180            self.write_state = WriteState::Writing(fut);
181        }
182
183        self.poll_inflight_write(cx)
184    }
185
186    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
187        match self.poll_inflight_write(cx) {
188            Poll::Pending => Poll::Pending,
189            Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
190            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
191        }
192    }
193
194    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
195        match self.as_mut().poll_flush(cx) {
196            Poll::Pending => Poll::Pending,
197            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
198            Poll::Ready(Ok(())) => {
199                if !self.is_closed {
200                    let writer = match self.writer.as_mut() {
201                        Some(writer) => writer,
202                        None => return Poll::Ready(Err(Self::error_unavailable())),
203                    };
204                    writer.close();
205                    self.is_closed = true;
206                }
207                Poll::Ready(Ok(()))
208            }
209        }
210    }
211}
212
213#[cfg(target_family = "wasm")]
214impl webtrans_trait::SendStream for SendStream {
215    type Error = Error;
216
217    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
218        Self::write(self, buf).await?;
219        Ok(buf.len())
220    }
221
222    fn set_priority(&mut self, order: u8) {
223        Self::set_priority(self, i32::from(order));
224    }
225
226    fn finish(&mut self) -> Result<(), Self::Error> {
227        Self::finish(self)
228    }
229
230    fn reset(&mut self, code: u32) {
231        Self::reset(self, &code.to_string());
232    }
233
234    async fn closed(&mut self) -> Result<(), Self::Error> {
235        match Self::closed(self).await? {
236            Some(code) => Err(Error::Unknown(
237                format!("stream closed with code {code}").into(),
238            )),
239            None => Ok(()),
240        }
241    }
242}