Skip to main content

webtrans_wasm/
session.rs

1use bytes::Bytes;
2use js_sys::Uint8Array;
3use url::Url;
4#[cfg(target_family = "wasm")]
5use wasm_bindgen::JsCast;
6use wasm_bindgen_futures::JsFuture;
7#[cfg(target_family = "wasm")]
8use web_sys::WritableStreamDefaultWriter;
9use web_sys::{
10    WebTransport, WebTransportBidirectionalStream, WebTransportCloseInfo, WebTransportSendStream,
11};
12
13use crate::{Error, RecvStream, SendStream};
14use web_streams::{Reader, Writer};
15
16/// A session represents a client-to-server connection.
17///
18/// This is the main entry point for creating streams and sending datagrams.
19/// Either endpoint may close the session with an error code and reason.
20///
21/// The session can be cloned to create multiple handles.
22/// However, handles cannot currently accept or open the same stream type.
23#[derive(Clone)]
24pub struct Session {
25    inner: WebTransport,
26    url: Url,
27}
28
29impl Session {
30    pub fn new(inner: WebTransport, url: Url) -> Self {
31        Self { inner, url }
32    }
33
34    /// Accept a new unidirectional stream from the peer.
35    pub async fn accept_uni(&self) -> Result<RecvStream, Error> {
36        let mut reader = Reader::new(&self.inner.incoming_unidirectional_streams())?;
37
38        match reader.read().await? {
39            Some(stream) => Ok(RecvStream::new(stream)?),
40            None => Err(self.closed().await),
41        }
42    }
43
44    /// Accept a new bidirectional stream from the peer.
45    pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), Error> {
46        let mut reader = Reader::new(&self.inner.incoming_bidirectional_streams())?;
47
48        let stream: WebTransportBidirectionalStream = match reader.read().await? {
49            Some(stream) => stream,
50            None => return Err(self.closed().await),
51        };
52
53        let send = SendStream::new(stream.writable())?;
54        let recv = RecvStream::new(stream.readable())?;
55
56        Ok((send, recv))
57    }
58
59    /// Create a new bidirectional stream.
60    pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), Error> {
61        let stream: WebTransportBidirectionalStream =
62            JsFuture::from(self.inner.create_bidirectional_stream())
63                .await?
64                .into();
65
66        let send = SendStream::new(stream.writable())?;
67        let recv = RecvStream::new(stream.readable())?;
68
69        Ok((send, recv))
70    }
71
72    /// Create a new unidirectional stream.
73    pub async fn open_uni(&self) -> Result<SendStream, Error> {
74        let stream: WebTransportSendStream =
75            JsFuture::from(self.inner.create_unidirectional_stream())
76                .await?
77                .into();
78
79        let send = SendStream::new(stream)?;
80        Ok(send)
81    }
82
83    /// Send a datagram over the network.
84    pub async fn send_datagram(&self, payload: Bytes) -> Result<(), Error> {
85        let mut writer = Writer::new(&self.inner.datagrams().writable())?;
86        writer.write(&Uint8Array::from(payload.as_ref())).await?;
87        Ok(())
88    }
89
90    /// Receive a datagram over the network.
91    pub async fn recv_datagram(&self) -> Result<Bytes, Error> {
92        let mut reader = Reader::new(&self.inner.datagrams().readable())?;
93        let data: Uint8Array = reader.read().await?.unwrap_or_default();
94        Ok(data.to_vec().into())
95    }
96
97    /// Close the session with the given error code and reason.
98    pub fn close(&self, code: u32, reason: &str) {
99        let info = WebTransportCloseInfo::new();
100        info.set_close_code(code);
101        info.set_reason(reason);
102        self.inner.close_with_close_info(&info);
103    }
104
105    /// Block until the session closes and return the error.
106    pub async fn closed(&self) -> Error {
107        self.closed_inner().await.unwrap_err()
108    }
109
110    async fn closed_inner(&self) -> Result<(), Error> {
111        let info: WebTransportCloseInfo = JsFuture::from(self.inner.closed()).await?.into();
112        let reason = info.get_reason().unwrap_or_default();
113
114        let options = web_sys::WebTransportErrorOptions::new();
115        options.set_source(web_sys::WebTransportErrorSource::Session);
116
117        if let Ok(code) = info.get_close_code().map(u8::try_from).transpose() {
118            options.set_stream_error_code(code);
119        }
120
121        let err = web_sys::WebTransportError::new_with_message_and_options(&reason, &options)?;
122        Err(Error::Session(err))
123    }
124
125    /// Return the URL used to create the session.
126    pub fn url(&self) -> &Url {
127        &self.url
128    }
129
130    // Queue a datagram write and return once the write request is submitted.
131    #[cfg(target_family = "wasm")]
132    fn send_datagram_nowait(&self, payload: Bytes) -> Result<(), Error> {
133        let writer = self.inner.datagrams().writable().get_writer()?;
134        let writer: WritableStreamDefaultWriter = writer.unchecked_into();
135
136        wasm_bindgen_futures::spawn_local(async move {
137            let payload = Uint8Array::from(payload.as_ref());
138            let promise = writer.write_with_chunk(&payload.into());
139            let _ = JsFuture::from(promise).await;
140            writer.release_lock();
141        });
142
143        Ok(())
144    }
145}
146
147impl PartialEq for Session {
148    fn eq(&self, other: &Self) -> bool {
149        self.inner == other.inner
150    }
151}
152
153impl Eq for Session {}
154
155#[cfg(target_family = "wasm")]
156impl webtrans_trait::Session for Session {
157    type SendStream = SendStream;
158    type RecvStream = RecvStream;
159    type Error = Error;
160
161    async fn accept_uni(&self) -> Result<Self::RecvStream, Self::Error> {
162        Self::accept_uni(self).await
163    }
164
165    async fn accept_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> {
166        Self::accept_bi(self).await
167    }
168
169    async fn open_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> {
170        Self::open_bi(self).await
171    }
172
173    async fn open_uni(&self) -> Result<Self::SendStream, Self::Error> {
174        Self::open_uni(self).await
175    }
176
177    fn send_datagram(&self, payload: Bytes) -> Result<(), Self::Error> {
178        self.send_datagram_nowait(payload)
179    }
180
181    async fn recv_datagram(&self) -> Result<Bytes, Self::Error> {
182        Self::recv_datagram(self).await
183    }
184
185    fn max_datagram_size(&self) -> usize {
186        self.inner.datagrams().max_datagram_size() as usize
187    }
188
189    fn close(&self, code: u32, reason: &str) {
190        Self::close(self, code, reason);
191    }
192
193    async fn closed(&self) -> Self::Error {
194        Self::closed(self).await
195    }
196}