webtrans_wasm/
session.rs

1use bytes::Bytes;
2use js_sys::Uint8Array;
3use url::Url;
4use wasm_bindgen_futures::JsFuture;
5use web_sys::{
6    WebTransport, WebTransportBidirectionalStream, WebTransportCloseInfo, WebTransportSendStream,
7};
8
9use crate::{Error, RecvStream, SendStream};
10use web_streams::{Reader, Writer};
11
12/// A session represents a client-to-server connection.
13///
14/// This is the main entry point for creating streams and sending datagrams.
15/// Either endpoint may close the session with an error code and reason.
16///
17/// The session can be cloned to create multiple handles.
18/// However, handles cannot currently accept or open the same stream type.
19#[derive(Clone)]
20pub struct Session {
21    inner: WebTransport,
22    url: Url,
23}
24
25impl Session {
26    pub fn new(inner: WebTransport, url: Url) -> Self {
27        Self { inner, url }
28    }
29
30    /// Accept a new unidirectional stream from the peer.
31    pub async fn accept_uni(&self) -> Result<RecvStream, Error> {
32        let mut reader = Reader::new(&self.inner.incoming_unidirectional_streams())?;
33
34        match reader.read().await? {
35            Some(stream) => Ok(RecvStream::new(stream)?),
36            None => Err(self.closed().await),
37        }
38    }
39
40    /// Accept a new bidirectional stream from the peer.
41    pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), Error> {
42        let mut reader = Reader::new(&self.inner.incoming_bidirectional_streams())?;
43
44        let stream: WebTransportBidirectionalStream = match reader.read().await? {
45            Some(stream) => stream,
46            None => return Err(self.closed().await),
47        };
48
49        let send = SendStream::new(stream.writable())?;
50        let recv = RecvStream::new(stream.readable())?;
51
52        Ok((send, recv))
53    }
54
55    /// Create a new bidirectional stream.
56    pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), Error> {
57        let stream: WebTransportBidirectionalStream =
58            JsFuture::from(self.inner.create_bidirectional_stream())
59                .await?
60                .into();
61
62        let send = SendStream::new(stream.writable())?;
63        let recv = RecvStream::new(stream.readable())?;
64
65        Ok((send, recv))
66    }
67
68    /// Create a new unidirectional stream.
69    pub async fn open_uni(&self) -> Result<SendStream, Error> {
70        let stream: WebTransportSendStream =
71            JsFuture::from(self.inner.create_unidirectional_stream())
72                .await?
73                .into();
74
75        let send = SendStream::new(stream)?;
76        Ok(send)
77    }
78
79    /// Send a datagram over the network.
80    pub async fn send_datagram(&self, payload: Bytes) -> Result<(), Error> {
81        let mut writer = Writer::new(&self.inner.datagrams().writable())?;
82        writer.write(&Uint8Array::from(payload.as_ref())).await?;
83        Ok(())
84    }
85
86    /// Receive a datagram over the network.
87    pub async fn recv_datagram(&self) -> Result<Bytes, Error> {
88        let mut reader = Reader::new(&self.inner.datagrams().readable())?;
89        let data: Uint8Array = reader.read().await?.unwrap_or_default();
90        Ok(data.to_vec().into())
91    }
92
93    /// Close the session with the given error code and reason.
94    pub fn close(&self, code: u32, reason: &str) {
95        let info = WebTransportCloseInfo::new();
96        info.set_close_code(code);
97        info.set_reason(reason);
98        self.inner.close_with_close_info(&info);
99    }
100
101    /// Block until the session closes and return the error.
102    pub async fn closed(&self) -> Error {
103        self.closed_inner().await.unwrap_err()
104    }
105
106    async fn closed_inner(&self) -> Result<(), Error> {
107        let info: WebTransportCloseInfo = JsFuture::from(self.inner.closed()).await?.into();
108        let reason = info.get_reason().unwrap_or_default();
109
110        let options = web_sys::WebTransportErrorOptions::new();
111        options.set_source(web_sys::WebTransportErrorSource::Session);
112
113        if let Ok(code) = info.get_close_code().map(u8::try_from).transpose() {
114            options.set_stream_error_code(code);
115        }
116
117        let err = web_sys::WebTransportError::new_with_message_and_options(&reason, &options)?;
118        Err(Error::Session(err))
119    }
120
121    /// Return the URL used to create the session.
122    pub fn url(&self) -> &Url {
123        &self.url
124    }
125}
126
127impl PartialEq for Session {
128    fn eq(&self, other: &Self) -> bool {
129        self.inner == other.inner
130    }
131}
132
133impl Eq for Session {}