xwt_web/
lib.rs

1#![cfg_attr(
2    target_family = "wasm",
3    doc = "The [`web_wt_sys`]-powered implementation of [`xwt_core`]."
4)]
5#![cfg_attr(
6    not(target_family = "wasm"),
7    doc = "The `web_wt_sys`-powered implementation of `xwt_core`."
8)]
9#![cfg(target_family = "wasm")]
10
11use std::{num::NonZeroUsize, rc::Rc};
12
13use wasm_bindgen::prelude::*;
14
15mod error;
16mod error_as_error_code;
17mod options;
18
19pub use web_sys;
20pub use web_wt_sys;
21pub use xwt_core as core;
22
23pub use {error::*, options::*};
24
25/// An endpoint for the xwt.
26///
27/// Internally holds the connection options and can create
28/// a new `WebTransport` object on the "web" side on a connection request.
29#[derive(Debug, Clone, Default)]
30pub struct Endpoint {
31    /// The options to use to create the `WebTransport`s.
32    pub options: web_wt_sys::WebTransportOptions,
33}
34
35impl xwt_core::endpoint::Connect for Endpoint {
36    type Error = Error;
37    type Connecting = Connecting;
38
39    async fn connect(&self, url: &str) -> Result<Self::Connecting, Self::Error> {
40        let transport = web_wt_sys::WebTransport::new_with_options(url, &self.options)?;
41        Ok(Connecting { transport })
42    }
43}
44
45/// Connecting represents the transient connection state when
46/// the [`web_wt_sys::WebTransport`] has been created but is not ready yet.
47#[derive(Debug)]
48pub struct Connecting {
49    /// The WebTransport instance.
50    pub transport: web_wt_sys::WebTransport,
51}
52
53impl xwt_core::endpoint::connect::Connecting for Connecting {
54    type Session = Session;
55    type Error = Error;
56
57    async fn wait_connect(self) -> Result<Self::Session, Self::Error> {
58        let Connecting { transport } = self;
59        transport.ready().await?;
60
61        Ok(Session::new(transport))
62    }
63}
64
65/// Session holds the [`web_wt_sys::WebTransport`] and is responsible for
66/// providing access to the Web API of WebTransport in a way that is portable.
67/// It also holds handles to the datagram reader and writer, as well as
68/// the datagram reader state.
69#[derive(Debug)]
70pub struct Session {
71    /// The WebTransport instance.
72    transport: Option<Rc<web_wt_sys::WebTransport>>,
73
74    /// The datagrams state for this session.
75    pub datagrams: Datagrams,
76
77    /// Whether to close the session on drop.
78    pub close_on_drop: bool,
79}
80
81impl Session {
82    /// Construct a new session from a [`web_wt_sys::WebTransport`].
83    pub fn new(transport: web_wt_sys::WebTransport) -> Self {
84        let datagrams = Datagrams::from_transport(&transport);
85        Self {
86            transport: Some(Rc::new(transport)),
87            datagrams,
88            close_on_drop: true,
89        }
90    }
91
92    /// If possible, relieves the underlying [`web_wt_sys::WebTransport`] of
93    /// any `xwt-web`-held locks and dependencies and exposes it.
94    pub fn try_unwrap(mut self) -> Result<web_wt_sys::WebTransport, Self> {
95        // Take the transport out of the session; this state is not valid
96        // "publicly", only while we're in this fn.
97        let transport = self.transport.take().unwrap();
98
99        // We want to ensure there are no other references to
100        // the transport's [`Rc`], otherwise something must still be using it.
101        // If we can't unwrap it successfully - we reinsert the transport back
102        // into `self` and return it as an `Err` - it will thus remain
103        // operational to permit doing whatever work may have to be done
104        // further.
105        let unwrapped = match Rc::try_unwrap(transport) {
106            Ok(unwrapped) => unwrapped,
107            Err(transport) => {
108                let _ = self.transport.insert(transport);
109                return Err(self);
110            }
111        };
112
113        // Do not close the transport (we have taken it out anyway).
114        self.close_on_drop = false;
115
116        // Drop the session to release the datagram readers/writers.
117        drop(self);
118
119        // Return the unwrapped transport.
120        Ok(unwrapped)
121    }
122
123    /// Obtain a transport ref.
124    pub const fn transport_ref(&self) -> &Rc<web_wt_sys::WebTransport> {
125        // Trnasport should never be gone generally, only inside of
126        // the `try_unwrap`.
127        self.transport.as_ref().unwrap()
128    }
129}
130
131impl Drop for Session {
132    fn drop(&mut self) {
133        if self.close_on_drop {
134            self.transport_ref().close();
135        }
136    }
137}
138
139/// Datagrams hold the portions of the session that are responsible for working
140/// with the datagrams.
141#[derive(Debug)]
142pub struct Datagrams {
143    /// The datagram reader.
144    pub readable_stream_reader: web_sys::ReadableStreamByobReader,
145
146    /// The datagram writer.
147    pub writable_stream_writer: web_sys::WritableStreamDefaultWriter,
148
149    /// The desired size of the datagram read buffer.
150    /// Used to allocate the datagram read buffer in case it gets lost.
151    pub read_buffer_size: u32,
152
153    /// The datagram read internal buffer.
154    pub read_buffer: tokio::sync::Mutex<Option<js_sys::ArrayBuffer>>,
155
156    /// Unlock the streams on drop.
157    pub unlock_streams_on_drop: bool,
158}
159
160impl Datagrams {
161    /// Create a datagrams state from the transport.
162    pub fn from_transport(transport: &web_wt_sys::WebTransport) -> Self {
163        Self::from_transport_datagrams(&transport.datagrams())
164    }
165
166    /// Create a datagrams state from the transport datagrams.
167    pub fn from_transport_datagrams(
168        datagrams: &web_wt_sys::WebTransportDatagramDuplexStream,
169    ) -> Self {
170        let read_buffer_size = 65536; // 65k buffers as per spec recommendation
171
172        let readable_stream_reader = web_sys_stream_utils::get_reader_byob(datagrams.readable());
173        let writable_stream_writer = web_sys_stream_utils::get_writer(datagrams.writable());
174
175        let read_buffer = js_sys::ArrayBuffer::new(read_buffer_size);
176        let read_buffer = tokio::sync::Mutex::new(Some(read_buffer));
177
178        Self {
179            readable_stream_reader,
180            writable_stream_writer,
181            read_buffer_size,
182            read_buffer,
183            unlock_streams_on_drop: true,
184        }
185    }
186}
187
188impl Drop for Datagrams {
189    fn drop(&mut self) {
190        if self.unlock_streams_on_drop {
191            self.readable_stream_reader.release_lock();
192            self.writable_stream_writer.release_lock();
193        }
194    }
195}
196
197impl xwt_core::session::stream::SendSpec for Session {
198    type SendStream = SendStream;
199}
200
201impl xwt_core::session::stream::RecvSpec for Session {
202    type RecvStream = RecvStream;
203}
204
205/// Send the data into a WebTransport stream.
206pub struct SendStream {
207    /// The WebTransport instance.
208    pub transport: Rc<web_wt_sys::WebTransport>,
209
210    /// The handle to the stream to write to.
211    pub stream: web_wt_sys::WebTransportSendStream,
212
213    /// A writer to conduct the operation.
214    pub writer: web_sys_async_io::Writer,
215
216    /// Unlock the writer on drop.
217    pub unlock_writer_on_drop: bool,
218}
219
220impl Drop for SendStream {
221    fn drop(&mut self) {
222        if self.unlock_writer_on_drop {
223            self.writer.inner.release_lock();
224        }
225    }
226}
227
228/// Recv the data from a WebTransport stream.
229pub struct RecvStream {
230    /// The WebTransport instance.
231    pub transport: Rc<web_wt_sys::WebTransport>,
232
233    /// The handle to the stream to read from.
234    pub stream: web_wt_sys::WebTransportReceiveStream,
235
236    /// A reader to conduct the operation.
237    pub reader: web_sys_async_io::Reader,
238
239    /// Unlock the reader on drop.
240    pub unlock_reader_on_drop: bool,
241}
242
243impl Drop for RecvStream {
244    fn drop(&mut self) {
245        if self.unlock_reader_on_drop {
246            self.reader.inner.release_lock();
247        }
248    }
249}
250
251/// Open a reader for the given stream and create a [`RecvStream`].
252fn wrap_recv_stream(
253    transport: &Rc<web_wt_sys::WebTransport>,
254    stream: web_wt_sys::WebTransportReceiveStream,
255) -> RecvStream {
256    let reader = web_sys_stream_utils::get_reader_byob(stream.clone());
257    let reader: JsValue = reader.into();
258    let reader = reader.into();
259    let reader = web_sys_async_io::Reader::new(reader);
260
261    RecvStream {
262        transport: Rc::clone(transport),
263        stream,
264        reader,
265        unlock_reader_on_drop: true,
266    }
267}
268
269/// Open a writer for the given stream and create a [`SendStream`].
270fn wrap_send_stream(
271    transport: &Rc<web_wt_sys::WebTransport>,
272    stream: web_wt_sys::WebTransportSendStream,
273) -> SendStream {
274    let writer = stream.get_writer().unwrap();
275    let writer = web_sys_async_io::Writer::new(writer.into());
276    SendStream {
277        transport: Rc::clone(transport),
278        stream,
279        writer,
280        unlock_writer_on_drop: true,
281    }
282}
283
284/// Take a bidi stream and wrap a reader and writer for it.
285fn wrap_bi_stream(
286    transport: &Rc<web_wt_sys::WebTransport>,
287    stream: web_wt_sys::WebTransportBidirectionalStream,
288) -> (SendStream, RecvStream) {
289    let writeable = stream.writable();
290    let readable = stream.readable();
291
292    let send_stream = wrap_send_stream(transport, writeable);
293    let recv_stream = wrap_recv_stream(transport, readable);
294
295    (send_stream, recv_stream)
296}
297
298impl xwt_core::session::stream::OpenBi for Session {
299    type Opening = xwt_core::utils::dummy::OpeningBiStream<Session>;
300
301    type Error = Error;
302
303    async fn open_bi(&self) -> Result<Self::Opening, Self::Error> {
304        let transport = self.transport_ref();
305        let value = transport.create_bidirectional_stream().await?;
306        let value = wrap_bi_stream(transport, value);
307        Ok(xwt_core::utils::dummy::OpeningBiStream(value))
308    }
309}
310
311impl xwt_core::session::stream::AcceptBi for Session {
312    type Error = Error;
313
314    async fn accept_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> {
315        let transport = self.transport_ref();
316        let incoming: web_sys::ReadableStream = transport.incoming_bidirectional_streams();
317        let reader: JsValue = incoming.get_reader().into();
318        let reader: web_sys::ReadableStreamDefaultReader = reader.into();
319        let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
320        let read_result: web_wt_sys::ReadableStreamReadResult = read_result.into();
321        if read_result.is_done() {
322            return Err(Error(JsError::new("xwt: accept bi reader is done").into()));
323        }
324        let value: web_wt_sys::WebTransportBidirectionalStream = read_result.get_value().into();
325        let value = wrap_bi_stream(transport, value);
326        Ok(value)
327    }
328}
329
330impl xwt_core::session::stream::OpenUni for Session {
331    type Opening = xwt_core::utils::dummy::OpeningUniStream<Session>;
332    type Error = Error;
333
334    async fn open_uni(&self) -> Result<Self::Opening, Self::Error> {
335        let transport = self.transport_ref();
336        let value = transport.create_unidirectional_stream().await?;
337        let send_stream = wrap_send_stream(transport, value);
338        Ok(xwt_core::utils::dummy::OpeningUniStream(send_stream))
339    }
340}
341
342impl xwt_core::session::stream::AcceptUni for Session {
343    type Error = Error;
344
345    async fn accept_uni(&self) -> Result<Self::RecvStream, Self::Error> {
346        let transport = self.transport_ref();
347        let incoming: web_sys::ReadableStream = transport.incoming_unidirectional_streams();
348        let reader: JsValue = incoming.get_reader().into();
349        let reader: web_sys::ReadableStreamDefaultReader = reader.into();
350        let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
351        let read_result: web_wt_sys::ReadableStreamReadResult = read_result.into();
352        if read_result.is_done() {
353            return Err(Error(JsError::new("xwt: accept uni reader is done").into()));
354        }
355        let value: web_wt_sys::WebTransportReceiveStream = read_result.get_value().into();
356        let recv_stream = wrap_recv_stream(transport, value);
357        Ok(recv_stream)
358    }
359}
360
361impl tokio::io::AsyncWrite for SendStream {
362    fn poll_write(
363        mut self: std::pin::Pin<&mut Self>,
364        cx: &mut std::task::Context<'_>,
365        buf: &[u8],
366    ) -> std::task::Poll<Result<usize, std::io::Error>> {
367        std::pin::Pin::new(&mut self.writer).poll_write(cx, buf)
368    }
369
370    fn poll_flush(
371        mut self: std::pin::Pin<&mut Self>,
372        cx: &mut std::task::Context<'_>,
373    ) -> std::task::Poll<Result<(), std::io::Error>> {
374        std::pin::Pin::new(&mut self.writer).poll_flush(cx)
375    }
376
377    fn poll_shutdown(
378        mut self: std::pin::Pin<&mut Self>,
379        cx: &mut std::task::Context<'_>,
380    ) -> std::task::Poll<Result<(), std::io::Error>> {
381        std::pin::Pin::new(&mut self.writer).poll_shutdown(cx)
382    }
383}
384
385impl tokio::io::AsyncRead for RecvStream {
386    fn poll_read(
387        mut self: std::pin::Pin<&mut Self>,
388        cx: &mut std::task::Context<'_>,
389        buf: &mut tokio::io::ReadBuf<'_>,
390    ) -> std::task::Poll<std::io::Result<()>> {
391        std::pin::Pin::new(&mut self.reader).poll_read(cx, buf)
392    }
393}
394
395/// An error that can occur during the stream writes.
396#[derive(Debug, thiserror::Error)]
397pub enum StreamWriteError {
398    /// The write was called with a zero-size write buffer.
399    #[error("zero size write buffer")]
400    ZeroSizeWriteBuffer,
401
402    /// The write call thrown an exception.
403    #[error("write error: {0}")]
404    Write(Error),
405}
406
407impl xwt_core::stream::Write for SendStream {
408    type Error = StreamWriteError;
409
410    async fn write(&mut self, buf: &[u8]) -> Result<NonZeroUsize, Self::Error> {
411        let Some(buf_len) = NonZeroUsize::new(buf.len()) else {
412            return Err(StreamWriteError::ZeroSizeWriteBuffer);
413        };
414
415        web_sys_stream_utils::write(&self.writer.inner, buf)
416            .await
417            .map_err(|err| StreamWriteError::Write(err.into()))?;
418
419        Ok(buf_len)
420    }
421}
422
423impl xwt_core::stream::WriteAbort for SendStream {
424    type Error = Error;
425
426    async fn abort(self, error_code: xwt_core::stream::ErrorCode) -> Result<(), Self::Error> {
427        wasm_bindgen_futures::JsFuture::from(
428            self.writer.inner.abort_with_reason(&error_code.into()),
429        )
430        .await
431        .map(|val| {
432            debug_assert!(val.is_undefined());
433        })
434        .map_err(Error::from)
435    }
436}
437
438impl xwt_core::stream::WriteAborted for SendStream {
439    type Error = Error;
440
441    async fn aborted(self) -> Result<xwt_core::stream::ErrorCode, Self::Error> {
442        // Hack our way through...
443        let result = wasm_bindgen_futures::JsFuture::from(self.writer.inner.closed()).await;
444        match result {
445            Ok(value) => {
446                debug_assert!(value.is_undefined());
447                Ok(0)
448            }
449            Err(value) => {
450                let error: web_wt_sys::WebTransportError = value.dyn_into().unwrap();
451                if error.source() != web_wt_sys::WebTransportErrorSource::Stream {
452                    return Err(Error(error.into()));
453                }
454                let Some(code) = error.stream_error_code() else {
455                    return Err(Error(error.into()));
456                };
457                Ok(code)
458            }
459        }
460    }
461}
462
463impl xwt_core::stream::Finish for SendStream {
464    type Error = Error;
465
466    async fn finish(self) -> Result<(), Self::Error> {
467        wasm_bindgen_futures::JsFuture::from(self.writer.inner.close())
468            .await
469            .map(|val| {
470                debug_assert!(val.is_undefined());
471            })
472            .map_err(Error::from)
473    }
474}
475
476impl xwt_core::stream::Finished for RecvStream {
477    type Error = Error;
478
479    async fn finished(self) -> Result<(), Self::Error> {
480        wasm_bindgen_futures::JsFuture::from(self.reader.inner.closed())
481            .await
482            .map(|val| {
483                debug_assert!(val.is_undefined());
484            })
485            .map_err(Error::from)
486    }
487}
488
489/// An error that can occur while reading stream data.
490#[derive(Debug, thiserror::Error)]
491pub enum StreamReadError {
492    /// This is an odd case, which is still tbd what to do with.
493    #[error("byob read consumed the buffer and didn't provide a new one")]
494    ByobReadConsumedBuffer,
495
496    /// The `read_byob` call thrown an exception.
497    #[error("read error: {0}")]
498    Read(Error),
499
500    /// The stream was closed, and there is no more data to exect there.
501    #[error("stream closed")]
502    Closed,
503}
504
505impl xwt_core::stream::Read for RecvStream {
506    type Error = StreamReadError;
507
508    async fn read(&mut self, buf: &mut [u8]) -> Result<NonZeroUsize, Self::Error> {
509        let requested_size = buf.len().try_into().unwrap();
510        let internal_buf = self
511            .reader
512            .internal_buf
513            .take()
514            .filter(|internal_buf| {
515                let actual_size = internal_buf.byte_length();
516                debug_assert!(actual_size > 0);
517                actual_size >= requested_size
518            })
519            .unwrap_or_else(|| js_sys::ArrayBuffer::new(requested_size));
520        let internal_buf_view =
521            js_sys::Uint8Array::new_with_byte_offset_and_length(&internal_buf, 0, requested_size);
522        let maybe_internal_buf_view =
523            web_sys_stream_utils::read_byob(&self.reader.inner, internal_buf_view)
524                .await
525                .map_err(|err| StreamReadError::Read(err.into()))?;
526        let Some(internal_buf_view) = maybe_internal_buf_view else {
527            return Err(StreamReadError::ByobReadConsumedBuffer);
528        };
529
530        // Unwrap is safe assuming the `usize` is `u32` in wasm.
531        let len = internal_buf_view.byte_length().try_into().unwrap();
532
533        // Detect when the read is aborted because the stream was closed without
534        // an error.
535        let Some(len) = NonZeroUsize::new(len) else {
536            return Err(StreamReadError::Closed);
537        };
538
539        internal_buf_view.copy_to(&mut buf[..len.get()]);
540
541        self.reader.internal_buf = Some(internal_buf_view.buffer());
542
543        Ok(len)
544    }
545}
546
547impl xwt_core::stream::ReadAbort for RecvStream {
548    type Error = Error;
549
550    async fn abort(self, error_code: xwt_core::stream::ErrorCode) -> Result<(), Self::Error> {
551        wasm_bindgen_futures::JsFuture::from(
552            self.reader.inner.cancel_with_reason(&error_code.into()),
553        )
554        .await
555        .map(|_| ())
556        .map_err(Error::from)
557    }
558}
559
560impl xwt_core::stream::ReadAborted for RecvStream {
561    type Error = Error;
562
563    async fn aborted(self) -> Result<xwt_core::stream::ErrorCode, Self::Error> {
564        // Hack our way through...
565        let result = wasm_bindgen_futures::JsFuture::from(self.reader.inner.closed()).await;
566        match result {
567            Ok(value) => {
568                debug_assert!(value.is_undefined());
569                Ok(0)
570            }
571            Err(value) => {
572                let error: web_wt_sys::WebTransportError = value.dyn_into().unwrap();
573                if error.source() != web_wt_sys::WebTransportErrorSource::Stream {
574                    return Err(Error(error.into()));
575                }
576                let Some(code) = error.stream_error_code() else {
577                    return Err(Error(error.into()));
578                };
579                Ok(code)
580            }
581        }
582    }
583}
584
585impl Datagrams {
586    /// Receive the datagram and handle the buffer with the given function.
587    ///
588    /// Cloning the buffer in the `f` will result in the undefined behaviour,
589    /// because it will create a second reference to an object that is intended
590    /// to be under a `mut ref`.
591    /// Although is would not teachnically be unsafe, it would violate
592    /// the borrow checker rules.
593    pub async fn receive_with<R>(
594        &self,
595        max_read_size: Option<u32>,
596        f: impl FnOnce(&mut js_sys::Uint8Array) -> R,
597    ) -> Result<R, Error> {
598        let mut buffer_guard = self.read_buffer.lock().await;
599
600        let buffer = buffer_guard
601            .take()
602            .unwrap_or_else(|| js_sys::ArrayBuffer::new(self.read_buffer_size));
603        let view = if let Some(max_read_size) = max_read_size {
604            let desired_buffer_length = buffer.byte_length().min(max_read_size);
605            js_sys::Uint8Array::new_with_byte_offset_and_length(&buffer, 0, desired_buffer_length)
606        } else {
607            js_sys::Uint8Array::new(&buffer)
608        };
609
610        let maybe_view =
611            web_sys_stream_utils::read_byob(&self.readable_stream_reader, view).await?;
612        let Some(mut view) = maybe_view else {
613            return Err(wasm_bindgen::JsError::new("unexpected stream termination").into());
614        };
615
616        let result = f(&mut view);
617
618        *buffer_guard = Some(view.buffer());
619        Ok(result)
620    }
621}
622
623impl xwt_core::session::datagram::MaxSize for Session {
624    fn max_datagram_size(&self) -> Option<usize> {
625        let transport = self.transport_ref();
626        let max_datagram_size = transport.datagrams().max_datagram_size();
627        Some(usize::try_from(max_datagram_size).unwrap()) // u32 should fit in a usize on WASM
628    }
629}
630
631impl xwt_core::session::datagram::Receive for Session {
632    type Datagram = Vec<u8>;
633    type Error = Error;
634
635    async fn receive_datagram(&self) -> Result<Self::Datagram, Self::Error> {
636        self.datagrams
637            .receive_with(None, |buffer| buffer.to_vec())
638            .await
639    }
640}
641
642impl xwt_core::session::datagram::ReceiveInto for Session {
643    type Error = Error;
644
645    async fn receive_datagram_into(&self, buf: &mut [u8]) -> Result<usize, Self::Error> {
646        let max_read_size = buf.len().try_into().unwrap();
647        self.datagrams
648            .receive_with(Some(max_read_size), |buffer| {
649                let len = buffer.length() as usize;
650                buffer.copy_to(&mut buf[..len]);
651                len
652            })
653            .await
654    }
655}
656
657impl xwt_core::session::datagram::Send for Session {
658    type Error = Error;
659
660    async fn send_datagram<D>(&self, payload: D) -> Result<(), Self::Error>
661    where
662        D: AsRef<[u8]>,
663    {
664        web_sys_stream_utils::write(&self.datagrams.writable_stream_writer, payload.as_ref())
665            .await?;
666        Ok(())
667    }
668}