webtrans_wasm/
send.rs

1use bytes::Buf;
2use js_sys::{Reflect, Uint8Array};
3use web_sys::WebTransportSendStream;
4
5use crate::Error;
6use web_streams::Writer;
7
8/// A byte stream sent to the remote peer.
9pub struct SendStream {
10    stream: WebTransportSendStream,
11    writer: Writer,
12}
13
14impl SendStream {
15    pub(super) fn new(stream: WebTransportSendStream) -> Result<Self, Error> {
16        let writer = Writer::new(&stream)?;
17        Ok(Self { stream, writer })
18    }
19
20    /// Write all of the provided bytes to the stream.
21    pub async fn write(&mut self, buf: &[u8]) -> Result<(), Error> {
22        self.writer
23            .write(&Uint8Array::from(buf))
24            .await
25            .map_err(Into::into)
26    }
27
28    /// Write some of the provided buffer to the stream.
29    pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, Error> {
30        let chunk = buf.chunk();
31        let size = chunk.len();
32        self.writer.write(&Uint8Array::from(chunk)).await?;
33        buf.advance(size);
34        Ok(size)
35    }
36
37    /// Send an immediate reset, closing the stream with an error.
38    pub fn reset(&mut self, reason: &str) {
39        self.writer.abort(reason);
40    }
41
42    /// Mark the stream as finished.
43    ///
44    /// This is called on drop, but can also be invoked manually.
45    pub fn finish(&mut self) -> Result<(), Error> {
46        self.writer.close();
47        Ok(())
48    }
49
50    /// Set the stream's priority.
51    ///
52    /// Streams with higher values are sent first, but delivery order is not guaranteed.
53    pub fn set_priority(&mut self, priority: i32) {
54        Reflect::set(&self.stream, &"sendOrder".into(), &priority.into())
55            .expect("failed to set priority");
56    }
57
58    /// Block until the stream has closed and return the error code, if any.
59    pub async fn closed(&self) -> Result<Option<u8>, Error> {
60        let err = match self.writer.closed().await {
61            Ok(()) => return Ok(None),
62            Err(err) => Error::from(err),
63        };
64
65        // If this is a WebTransportError, extract the error code when available.
66        if let Error::Stream(err) = &err {
67            if let Some(code) = err.stream_error_code() {
68                return Ok(Some(code));
69            }
70        }
71
72        Err(err)
73    }
74}
75
76impl Drop for SendStream {
77    /// Close the stream with a FIN.
78    fn drop(&mut self) {
79        self.writer.close();
80    }
81}