xwt_web_sys/
lib.rs

1#![cfg_attr(
2    target_family = "wasm",
3    doc = "The [`web_wt_sys`]-powered implementation of [`xwt_core`]."
4)]
5#![cfg_attr(
6    not(target_family = "wasm"),
7    doc = "The `web_wt_sys`-powered implementation of `xwt_core`."
8)]
9#![cfg(target_family = "wasm")]
10
11use std::rc::Rc;
12
13use wasm_bindgen::JsError;
14
15mod error;
16mod options;
17
18pub use web_sys;
19pub use web_wt_sys;
20pub use xwt_core as core;
21
22pub use {error::*, options::*};
23
24/// An endpoint for the xwt.
25///
26/// Internally holds the connection options and can create
27/// a new `WebTransport` object on the "web" side on a connection request.
28#[derive(Debug, Clone, Default)]
29pub struct Endpoint {
30    /// The options to use to create the `WebTransport`s.
31    pub options: web_wt_sys::WebTransportOptions,
32}
33
34impl xwt_core::endpoint::Connect for Endpoint {
35    type Error = Error;
36    type Connecting = Connecting;
37
38    async fn connect(&self, url: &str) -> Result<Self::Connecting, Self::Error> {
39        let transport = web_wt_sys::WebTransport::new_with_options(url, &self.options)?;
40        Ok(Connecting { transport })
41    }
42}
43
44/// Connecting represents the transient connection state when
45/// the [`web_wt_sys::WebTransport`] has been created but is not ready yet.
46#[derive(Debug)]
47pub struct Connecting {
48    /// The WebTransport instance.
49    pub transport: web_wt_sys::WebTransport,
50}
51
52impl xwt_core::endpoint::connect::Connecting for Connecting {
53    type Session = Session;
54    type Error = Error;
55
56    async fn wait_connect(self) -> Result<Self::Session, Self::Error> {
57        let Connecting { transport } = self;
58        transport.ready().await?;
59
60        let datagram_read_buffer_size = 65536; // 65k buffers as per spec recommendation
61
62        let datagrams = transport.datagrams();
63        let datagram_readable_stream_reader =
64            web_sys_stream_utils::get_reader_byob(datagrams.readable());
65        let datagram_writable_stream_writer =
66            web_sys_stream_utils::get_writer(datagrams.writable());
67
68        let datagram_read_buffer = js_sys::ArrayBuffer::new(datagram_read_buffer_size);
69        let datagram_read_buffer = tokio::sync::Mutex::new(Some(datagram_read_buffer));
70
71        let session = Session {
72            transport: Rc::new(transport),
73            datagram_readable_stream_reader,
74            datagram_writable_stream_writer,
75            datagram_read_buffer_size,
76            datagram_read_buffer,
77        };
78        Ok(session)
79    }
80}
81
82/// Session holds the [`web_wt_sys::WebTransport`] and is responsible for
83/// providing access to the Web API of WebTransport in a way that is portable.
84/// It also holds handles to the datagram reader and writer, as well as
85/// the datagram reader state.
86#[derive(Debug)]
87pub struct Session {
88    /// The WebTransport instance.
89    pub transport: Rc<web_wt_sys::WebTransport>,
90    /// The datagram reader.
91    pub datagram_readable_stream_reader: web_sys::ReadableStreamByobReader,
92    /// The datagram writer.
93    pub datagram_writable_stream_writer: web_sys::WritableStreamDefaultWriter,
94    /// The desired size of the datagram read buffer.
95    /// Used to allocate the datagram read buffer in case it gets lost.
96    pub datagram_read_buffer_size: u32,
97    /// The datagram read internal buffer.
98    pub datagram_read_buffer: tokio::sync::Mutex<Option<js_sys::ArrayBuffer>>,
99}
100
101impl xwt_core::session::stream::SendSpec for Session {
102    type SendStream = SendStream;
103}
104
105impl xwt_core::session::stream::RecvSpec for Session {
106    type RecvStream = RecvStream;
107}
108
109/// Send the data into a WebTransport stream.
110pub struct SendStream {
111    /// The WebTransport instance.
112    pub transport: Rc<web_wt_sys::WebTransport>,
113    /// The handle to the stream to write to.
114    pub stream: web_wt_sys::WebTransportSendStream,
115    /// A writer to conduct the operation.
116    pub writer: web_sys_async_io::Writer,
117}
118
119/// Recv the data from a WebTransport stream.
120pub struct RecvStream {
121    /// The WebTransport instance.
122    pub transport: Rc<web_wt_sys::WebTransport>,
123    /// The handle to the stream to read from.
124    pub stream: web_wt_sys::WebTransportReceiveStream,
125    /// A reader to conduct the operation.
126    pub reader: web_sys_async_io::Reader,
127}
128
129/// Open a reader for the given stream and create a [`RecvStream`].
130fn wrap_recv_stream(
131    transport: &Rc<web_wt_sys::WebTransport>,
132    stream: web_wt_sys::WebTransportReceiveStream,
133) -> RecvStream {
134    let reader = web_sys_stream_utils::get_reader_byob(stream.clone());
135    let reader: wasm_bindgen::JsValue = reader.into();
136    let reader = reader.into();
137    let reader = web_sys_async_io::Reader::new(reader);
138
139    RecvStream {
140        transport: Rc::clone(transport),
141        stream,
142        reader,
143    }
144}
145
146/// Open a writer for the given stream and create a [`SendStream`].
147fn wrap_send_stream(
148    transport: &Rc<web_wt_sys::WebTransport>,
149    stream: web_wt_sys::WebTransportSendStream,
150) -> SendStream {
151    let writer = stream.get_writer().unwrap();
152    let writer = web_sys_async_io::Writer::new(writer.into());
153    SendStream {
154        transport: Rc::clone(transport),
155        stream,
156        writer,
157    }
158}
159
160/// Take a bidi stream and wrap a reader and writer for it.
161fn wrap_bi_stream(
162    transport: &Rc<web_wt_sys::WebTransport>,
163    stream: web_wt_sys::WebTransportBidirectionalStream,
164) -> (SendStream, RecvStream) {
165    let writeable = stream.writable();
166    let readable = stream.readable();
167
168    let send_stream = wrap_send_stream(transport, writeable);
169    let recv_stream = wrap_recv_stream(transport, readable);
170
171    (send_stream, recv_stream)
172}
173
174impl xwt_core::session::stream::OpenBi for Session {
175    type Opening = xwt_core::utils::dummy::OpeningBiStream<Session>;
176
177    type Error = Error;
178
179    async fn open_bi(&self) -> Result<Self::Opening, Self::Error> {
180        let value = self.transport.create_bidirectional_stream().await?;
181        let value = wrap_bi_stream(&self.transport, value);
182        Ok(xwt_core::utils::dummy::OpeningBiStream(value))
183    }
184}
185
186impl xwt_core::session::stream::AcceptBi for Session {
187    type Error = Error;
188
189    async fn accept_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> {
190        let incoming: web_sys::ReadableStream = self.transport.incoming_bidirectional_streams();
191        let reader: wasm_bindgen::JsValue = incoming.get_reader().into();
192        let reader: web_sys::ReadableStreamDefaultReader = reader.into();
193        let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
194        let read_result: web_wt_sys::ReadableStreamReadResult = read_result.into();
195        if read_result.is_done() {
196            return Err(Error(JsError::new("xwt: accept bi reader is done").into()));
197        }
198        let value: web_wt_sys::WebTransportBidirectionalStream = read_result.get_value().into();
199        let value = wrap_bi_stream(&self.transport, value);
200        Ok(value)
201    }
202}
203
204impl xwt_core::session::stream::OpenUni for Session {
205    type Opening = xwt_core::utils::dummy::OpeningUniStream<Session>;
206    type Error = Error;
207
208    async fn open_uni(&self) -> Result<Self::Opening, Self::Error> {
209        let value = self.transport.create_unidirectional_stream().await?;
210        let send_stream = wrap_send_stream(&self.transport, value);
211        Ok(xwt_core::utils::dummy::OpeningUniStream(send_stream))
212    }
213}
214
215impl xwt_core::session::stream::AcceptUni for Session {
216    type Error = Error;
217
218    async fn accept_uni(&self) -> Result<Self::RecvStream, Self::Error> {
219        let incoming: web_sys::ReadableStream = self.transport.incoming_unidirectional_streams();
220        let reader: wasm_bindgen::JsValue = incoming.get_reader().into();
221        let reader: web_sys::ReadableStreamDefaultReader = reader.into();
222        let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
223        let read_result: web_wt_sys::ReadableStreamReadResult = read_result.into();
224        if read_result.is_done() {
225            return Err(Error(JsError::new("xwt: accept uni reader is done").into()));
226        }
227        let value: web_wt_sys::WebTransportReceiveStream = read_result.get_value().into();
228        let recv_stream = wrap_recv_stream(&self.transport, value);
229        Ok(recv_stream)
230    }
231}
232
233impl tokio::io::AsyncWrite for SendStream {
234    fn poll_write(
235        mut self: std::pin::Pin<&mut Self>,
236        cx: &mut std::task::Context<'_>,
237        buf: &[u8],
238    ) -> std::task::Poll<Result<usize, std::io::Error>> {
239        std::pin::Pin::new(&mut self.writer).poll_write(cx, buf)
240    }
241
242    fn poll_flush(
243        mut self: std::pin::Pin<&mut Self>,
244        cx: &mut std::task::Context<'_>,
245    ) -> std::task::Poll<Result<(), std::io::Error>> {
246        std::pin::Pin::new(&mut self.writer).poll_flush(cx)
247    }
248
249    fn poll_shutdown(
250        mut self: std::pin::Pin<&mut Self>,
251        cx: &mut std::task::Context<'_>,
252    ) -> std::task::Poll<Result<(), std::io::Error>> {
253        std::pin::Pin::new(&mut self.writer).poll_shutdown(cx)
254    }
255}
256
257impl tokio::io::AsyncRead for RecvStream {
258    fn poll_read(
259        mut self: std::pin::Pin<&mut Self>,
260        cx: &mut std::task::Context<'_>,
261        buf: &mut tokio::io::ReadBuf<'_>,
262    ) -> std::task::Poll<std::io::Result<()>> {
263        std::pin::Pin::new(&mut self.reader).poll_read(cx, buf)
264    }
265}
266
267impl xwt_core::stream::Write for SendStream {
268    type Error = Error;
269
270    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
271        web_sys_stream_utils::write(&self.writer.inner, buf).await?;
272        Ok(buf.len())
273    }
274}
275
276impl xwt_core::stream::Read for RecvStream {
277    type Error = Error;
278
279    async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Self::Error> {
280        let requested_size = buf.len().try_into().unwrap();
281        let internal_buf = self
282            .reader
283            .internal_buf
284            .take()
285            .filter(|internal_buf| {
286                let actual_size = internal_buf.byte_length();
287                debug_assert!(actual_size > 0);
288                actual_size >= requested_size
289            })
290            .unwrap_or_else(|| js_sys::ArrayBuffer::new(requested_size));
291        let internal_buf_view = js_sys::Uint8Array::new(&internal_buf);
292        let maybe_internal_buf_view =
293            web_sys_stream_utils::read_byob(&self.reader.inner, internal_buf_view).await?;
294        let Some(internal_buf_view) = maybe_internal_buf_view else {
295            return Ok(None);
296        };
297
298        // Unwrap is safe assuming the `usize` is `u32` in wasm.
299        let len = internal_buf_view.byte_length().try_into().unwrap();
300
301        internal_buf_view.copy_to(&mut buf[..len]);
302
303        self.reader.internal_buf = Some(internal_buf_view.buffer());
304
305        Ok(Some(len))
306    }
307}
308
309impl Session {
310    /// Receive the datagram and handle the buffer with the given function.
311    ///
312    /// Cloning the buffer in the `f` will result in the undefined behaviour,
313    /// because it will create a second reference to an object that is intended
314    /// to be under a `mut ref`.
315    /// Although is would not teachnically be unsafe, it would violate
316    /// the borrow checker rules.
317    pub async fn receive_datagram_with<R>(
318        &self,
319        f: impl FnOnce(&mut js_sys::Uint8Array) -> R,
320    ) -> Result<R, Error> {
321        let mut buffer_guard = self.datagram_read_buffer.lock().await;
322
323        let buffer = buffer_guard
324            .take()
325            .unwrap_or_else(|| js_sys::ArrayBuffer::new(self.datagram_read_buffer_size));
326        let view = js_sys::Uint8Array::new(&buffer);
327
328        let maybe_view =
329            web_sys_stream_utils::read_byob(&self.datagram_readable_stream_reader, view).await?;
330        let Some(mut view) = maybe_view else {
331            return Err(wasm_bindgen::JsError::new("unexpected stream termination").into());
332        };
333
334        let result = f(&mut view);
335
336        *buffer_guard = Some(view.buffer());
337        Ok(result)
338    }
339}
340
341impl xwt_core::session::datagram::MaxSize for Session {
342    fn max_datagram_size(&self) -> Option<usize> {
343        let max_datagram_size = self.transport.datagrams().max_datagram_size();
344        Some(usize::try_from(max_datagram_size).unwrap()) // u32 should fit in a usize on WASM
345    }
346}
347
348impl xwt_core::session::datagram::Receive for Session {
349    type Datagram = Vec<u8>;
350    type Error = Error;
351
352    async fn receive_datagram(&self) -> Result<Self::Datagram, Self::Error> {
353        self.receive_datagram_with(|buffer| buffer.to_vec()).await
354    }
355}
356
357impl xwt_core::session::datagram::ReceiveInto for Session {
358    type Error = Error;
359
360    async fn receive_datagram_into(&self, buf: &mut [u8]) -> Result<usize, Self::Error> {
361        self.receive_datagram_with(|buffer| {
362            let len = buffer.length() as usize;
363            buffer.copy_to(&mut buf[..len]);
364            len
365        })
366        .await
367    }
368}
369
370impl xwt_core::session::datagram::Send for Session {
371    type Error = Error;
372
373    async fn send_datagram<D>(&self, payload: D) -> Result<(), Self::Error>
374    where
375        D: AsRef<[u8]>,
376    {
377        web_sys_stream_utils::write(&self.datagram_writable_stream_writer, payload.as_ref())
378            .await?;
379        Ok(())
380    }
381}
382
383impl Drop for Session {
384    fn drop(&mut self) {
385        self.transport.close();
386    }
387}