1#![cfg_attr(
2 target_family = "wasm",
3 doc = "The [`web_wt_sys`]-powered implementation of [`xwt_core`]."
4)]
5#![cfg_attr(
6 not(target_family = "wasm"),
7 doc = "The `web_wt_sys`-powered implementation of `xwt_core`."
8)]
9#![cfg(target_family = "wasm")]
10
11use std::{num::NonZeroUsize, rc::Rc};
12
13use wasm_bindgen::prelude::*;
14
15mod error;
16mod error_as_error_code;
17mod options;
18
19pub use web_sys;
20pub use web_wt_sys;
21pub use xwt_core as core;
22
23pub use {error::*, options::*};
24
25#[derive(Debug, Clone, Default)]
30pub struct Endpoint {
31 pub options: web_wt_sys::WebTransportOptions,
33}
34
35impl xwt_core::endpoint::Connect for Endpoint {
36 type Error = Error;
37 type Connecting = Connecting;
38
39 async fn connect(&self, url: &str) -> Result<Self::Connecting, Self::Error> {
40 let transport = web_wt_sys::WebTransport::new_with_options(url, &self.options)?;
41 Ok(Connecting { transport })
42 }
43}
44
45#[derive(Debug)]
48pub struct Connecting {
49 pub transport: web_wt_sys::WebTransport,
51}
52
53impl xwt_core::endpoint::connect::Connecting for Connecting {
54 type Session = Session;
55 type Error = Error;
56
57 async fn wait_connect(self) -> Result<Self::Session, Self::Error> {
58 let Connecting { transport } = self;
59 transport.ready().await?;
60
61 Ok(Session::new(transport))
62 }
63}
64
65#[derive(Debug)]
70pub struct Session {
71 transport: Option<Rc<web_wt_sys::WebTransport>>,
73
74 pub datagrams: Datagrams,
76
77 pub close_on_drop: bool,
79}
80
81impl Session {
82 pub fn new(transport: web_wt_sys::WebTransport) -> Self {
84 let datagrams = Datagrams::from_transport(&transport);
85 Self {
86 transport: Some(Rc::new(transport)),
87 datagrams,
88 close_on_drop: true,
89 }
90 }
91
92 pub fn try_unwrap(mut self) -> Result<web_wt_sys::WebTransport, Self> {
95 let transport = self.transport.take().unwrap();
98
99 let unwrapped = match Rc::try_unwrap(transport) {
106 Ok(unwrapped) => unwrapped,
107 Err(transport) => {
108 let _ = self.transport.insert(transport);
109 return Err(self);
110 }
111 };
112
113 self.close_on_drop = false;
115
116 drop(self);
118
119 Ok(unwrapped)
121 }
122
123 pub const fn transport_ref(&self) -> &Rc<web_wt_sys::WebTransport> {
125 self.transport.as_ref().unwrap()
128 }
129}
130
131impl Drop for Session {
132 fn drop(&mut self) {
133 if self.close_on_drop {
134 self.transport_ref().close();
135 }
136 }
137}
138
139#[derive(Debug)]
142pub struct Datagrams {
143 pub readable_stream_reader: web_sys::ReadableStreamByobReader,
145
146 pub writable_stream_writer: web_sys::WritableStreamDefaultWriter,
148
149 pub read_buffer_size: u32,
152
153 pub read_buffer: tokio::sync::Mutex<Option<js_sys::ArrayBuffer>>,
155
156 pub unlock_streams_on_drop: bool,
158}
159
160impl Datagrams {
161 pub fn from_transport(transport: &web_wt_sys::WebTransport) -> Self {
163 Self::from_transport_datagrams(&transport.datagrams())
164 }
165
166 pub fn from_transport_datagrams(
168 datagrams: &web_wt_sys::WebTransportDatagramDuplexStream,
169 ) -> Self {
170 let read_buffer_size = 65536; let readable_stream_reader = web_sys_stream_utils::get_reader_byob(datagrams.readable());
173 let writable_stream_writer = web_sys_stream_utils::get_writer(datagrams.writable());
174
175 let read_buffer = js_sys::ArrayBuffer::new(read_buffer_size);
176 let read_buffer = tokio::sync::Mutex::new(Some(read_buffer));
177
178 Self {
179 readable_stream_reader,
180 writable_stream_writer,
181 read_buffer_size,
182 read_buffer,
183 unlock_streams_on_drop: true,
184 }
185 }
186}
187
188impl Drop for Datagrams {
189 fn drop(&mut self) {
190 if self.unlock_streams_on_drop {
191 self.readable_stream_reader.release_lock();
192 self.writable_stream_writer.release_lock();
193 }
194 }
195}
196
197impl xwt_core::session::stream::SendSpec for Session {
198 type SendStream = SendStream;
199}
200
201impl xwt_core::session::stream::RecvSpec for Session {
202 type RecvStream = RecvStream;
203}
204
205pub struct SendStream {
207 pub transport: Rc<web_wt_sys::WebTransport>,
209
210 pub stream: web_wt_sys::WebTransportSendStream,
212
213 pub writer: web_sys_async_io::Writer,
215
216 pub unlock_writer_on_drop: bool,
218}
219
220impl Drop for SendStream {
221 fn drop(&mut self) {
222 if self.unlock_writer_on_drop {
223 self.writer.inner.release_lock();
224 }
225 }
226}
227
228pub struct RecvStream {
230 pub transport: Rc<web_wt_sys::WebTransport>,
232
233 pub stream: web_wt_sys::WebTransportReceiveStream,
235
236 pub reader: web_sys_async_io::Reader,
238
239 pub unlock_reader_on_drop: bool,
241}
242
243impl Drop for RecvStream {
244 fn drop(&mut self) {
245 if self.unlock_reader_on_drop {
246 self.reader.inner.release_lock();
247 }
248 }
249}
250
251fn wrap_recv_stream(
253 transport: &Rc<web_wt_sys::WebTransport>,
254 stream: web_wt_sys::WebTransportReceiveStream,
255) -> RecvStream {
256 let reader = web_sys_stream_utils::get_reader_byob(stream.clone());
257 let reader: JsValue = reader.into();
258 let reader = reader.into();
259 let reader = web_sys_async_io::Reader::new(reader);
260
261 RecvStream {
262 transport: Rc::clone(transport),
263 stream,
264 reader,
265 unlock_reader_on_drop: true,
266 }
267}
268
269fn wrap_send_stream(
271 transport: &Rc<web_wt_sys::WebTransport>,
272 stream: web_wt_sys::WebTransportSendStream,
273) -> SendStream {
274 let writer = stream.get_writer().unwrap();
275 let writer = web_sys_async_io::Writer::new(writer.into());
276 SendStream {
277 transport: Rc::clone(transport),
278 stream,
279 writer,
280 unlock_writer_on_drop: true,
281 }
282}
283
284fn wrap_bi_stream(
286 transport: &Rc<web_wt_sys::WebTransport>,
287 stream: web_wt_sys::WebTransportBidirectionalStream,
288) -> (SendStream, RecvStream) {
289 let writeable = stream.writable();
290 let readable = stream.readable();
291
292 let send_stream = wrap_send_stream(transport, writeable);
293 let recv_stream = wrap_recv_stream(transport, readable);
294
295 (send_stream, recv_stream)
296}
297
298impl xwt_core::session::stream::OpenBi for Session {
299 type Opening = xwt_core::utils::dummy::OpeningBiStream<Session>;
300
301 type Error = Error;
302
303 async fn open_bi(&self) -> Result<Self::Opening, Self::Error> {
304 let transport = self.transport_ref();
305 let value = transport.create_bidirectional_stream().await?;
306 let value = wrap_bi_stream(transport, value);
307 Ok(xwt_core::utils::dummy::OpeningBiStream(value))
308 }
309}
310
311impl xwt_core::session::stream::AcceptBi for Session {
312 type Error = Error;
313
314 async fn accept_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> {
315 let transport = self.transport_ref();
316 let incoming: web_sys::ReadableStream = transport.incoming_bidirectional_streams();
317 let reader: JsValue = incoming.get_reader().into();
318 let reader: web_sys::ReadableStreamDefaultReader = reader.into();
319 let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
320 let read_result: web_wt_sys::ReadableStreamReadResult = read_result.into();
321 if read_result.is_done() {
322 return Err(Error(JsError::new("xwt: accept bi reader is done").into()));
323 }
324 let value: web_wt_sys::WebTransportBidirectionalStream = read_result.get_value().into();
325 let value = wrap_bi_stream(transport, value);
326 Ok(value)
327 }
328}
329
330impl xwt_core::session::stream::OpenUni for Session {
331 type Opening = xwt_core::utils::dummy::OpeningUniStream<Session>;
332 type Error = Error;
333
334 async fn open_uni(&self) -> Result<Self::Opening, Self::Error> {
335 let transport = self.transport_ref();
336 let value = transport.create_unidirectional_stream().await?;
337 let send_stream = wrap_send_stream(transport, value);
338 Ok(xwt_core::utils::dummy::OpeningUniStream(send_stream))
339 }
340}
341
342impl xwt_core::session::stream::AcceptUni for Session {
343 type Error = Error;
344
345 async fn accept_uni(&self) -> Result<Self::RecvStream, Self::Error> {
346 let transport = self.transport_ref();
347 let incoming: web_sys::ReadableStream = transport.incoming_unidirectional_streams();
348 let reader: JsValue = incoming.get_reader().into();
349 let reader: web_sys::ReadableStreamDefaultReader = reader.into();
350 let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
351 let read_result: web_wt_sys::ReadableStreamReadResult = read_result.into();
352 if read_result.is_done() {
353 return Err(Error(JsError::new("xwt: accept uni reader is done").into()));
354 }
355 let value: web_wt_sys::WebTransportReceiveStream = read_result.get_value().into();
356 let recv_stream = wrap_recv_stream(transport, value);
357 Ok(recv_stream)
358 }
359}
360
361impl tokio::io::AsyncWrite for SendStream {
362 fn poll_write(
363 mut self: std::pin::Pin<&mut Self>,
364 cx: &mut std::task::Context<'_>,
365 buf: &[u8],
366 ) -> std::task::Poll<Result<usize, std::io::Error>> {
367 std::pin::Pin::new(&mut self.writer).poll_write(cx, buf)
368 }
369
370 fn poll_flush(
371 mut self: std::pin::Pin<&mut Self>,
372 cx: &mut std::task::Context<'_>,
373 ) -> std::task::Poll<Result<(), std::io::Error>> {
374 std::pin::Pin::new(&mut self.writer).poll_flush(cx)
375 }
376
377 fn poll_shutdown(
378 mut self: std::pin::Pin<&mut Self>,
379 cx: &mut std::task::Context<'_>,
380 ) -> std::task::Poll<Result<(), std::io::Error>> {
381 std::pin::Pin::new(&mut self.writer).poll_shutdown(cx)
382 }
383}
384
385impl tokio::io::AsyncRead for RecvStream {
386 fn poll_read(
387 mut self: std::pin::Pin<&mut Self>,
388 cx: &mut std::task::Context<'_>,
389 buf: &mut tokio::io::ReadBuf<'_>,
390 ) -> std::task::Poll<std::io::Result<()>> {
391 std::pin::Pin::new(&mut self.reader).poll_read(cx, buf)
392 }
393}
394
395#[derive(Debug, thiserror::Error)]
397pub enum StreamWriteError {
398 #[error("zero size write buffer")]
400 ZeroSizeWriteBuffer,
401
402 #[error("write error: {0}")]
404 Write(Error),
405}
406
407impl xwt_core::stream::Write for SendStream {
408 type Error = StreamWriteError;
409
410 async fn write(&mut self, buf: &[u8]) -> Result<NonZeroUsize, Self::Error> {
411 let Some(buf_len) = NonZeroUsize::new(buf.len()) else {
412 return Err(StreamWriteError::ZeroSizeWriteBuffer);
413 };
414
415 web_sys_stream_utils::write(&self.writer.inner, buf)
416 .await
417 .map_err(|err| StreamWriteError::Write(err.into()))?;
418
419 Ok(buf_len)
420 }
421}
422
423impl xwt_core::stream::WriteAbort for SendStream {
424 type Error = Error;
425
426 async fn abort(self, error_code: xwt_core::stream::ErrorCode) -> Result<(), Self::Error> {
427 wasm_bindgen_futures::JsFuture::from(
428 self.writer.inner.abort_with_reason(&error_code.into()),
429 )
430 .await
431 .map(|val| {
432 debug_assert!(val.is_undefined());
433 })
434 .map_err(Error::from)
435 }
436}
437
438impl xwt_core::stream::WriteAborted for SendStream {
439 type Error = Error;
440
441 async fn aborted(self) -> Result<xwt_core::stream::ErrorCode, Self::Error> {
442 let result = wasm_bindgen_futures::JsFuture::from(self.writer.inner.closed()).await;
444 match result {
445 Ok(value) => {
446 debug_assert!(value.is_undefined());
447 Ok(0)
448 }
449 Err(value) => {
450 let error: web_wt_sys::WebTransportError = value.dyn_into().unwrap();
451 if error.source() != web_wt_sys::WebTransportErrorSource::Stream {
452 return Err(Error(error.into()));
453 }
454 let Some(code) = error.stream_error_code() else {
455 return Err(Error(error.into()));
456 };
457 Ok(code)
458 }
459 }
460 }
461}
462
463impl xwt_core::stream::Finish for SendStream {
464 type Error = Error;
465
466 async fn finish(self) -> Result<(), Self::Error> {
467 wasm_bindgen_futures::JsFuture::from(self.writer.inner.close())
468 .await
469 .map(|val| {
470 debug_assert!(val.is_undefined());
471 })
472 .map_err(Error::from)
473 }
474}
475
476impl xwt_core::stream::Finished for RecvStream {
477 type Error = Error;
478
479 async fn finished(self) -> Result<(), Self::Error> {
480 wasm_bindgen_futures::JsFuture::from(self.reader.inner.closed())
481 .await
482 .map(|val| {
483 debug_assert!(val.is_undefined());
484 })
485 .map_err(Error::from)
486 }
487}
488
489#[derive(Debug, thiserror::Error)]
491pub enum StreamReadError {
492 #[error("byob read consumed the buffer and didn't provide a new one")]
494 ByobReadConsumedBuffer,
495
496 #[error("read error: {0}")]
498 Read(Error),
499
500 #[error("stream closed")]
502 Closed,
503}
504
505impl xwt_core::stream::Read for RecvStream {
506 type Error = StreamReadError;
507
508 async fn read(&mut self, buf: &mut [u8]) -> Result<NonZeroUsize, Self::Error> {
509 let requested_size = buf.len().try_into().unwrap();
510 let internal_buf = self
511 .reader
512 .internal_buf
513 .take()
514 .filter(|internal_buf| {
515 let actual_size = internal_buf.byte_length();
516 debug_assert!(actual_size > 0);
517 actual_size >= requested_size
518 })
519 .unwrap_or_else(|| js_sys::ArrayBuffer::new(requested_size));
520 let internal_buf_view =
521 js_sys::Uint8Array::new_with_byte_offset_and_length(&internal_buf, 0, requested_size);
522 let maybe_internal_buf_view =
523 web_sys_stream_utils::read_byob(&self.reader.inner, internal_buf_view)
524 .await
525 .map_err(|err| StreamReadError::Read(err.into()))?;
526 let Some(internal_buf_view) = maybe_internal_buf_view else {
527 return Err(StreamReadError::ByobReadConsumedBuffer);
528 };
529
530 let len = internal_buf_view.byte_length().try_into().unwrap();
532
533 let Some(len) = NonZeroUsize::new(len) else {
536 return Err(StreamReadError::Closed);
537 };
538
539 internal_buf_view.copy_to(&mut buf[..len.get()]);
540
541 self.reader.internal_buf = Some(internal_buf_view.buffer());
542
543 Ok(len)
544 }
545}
546
547impl xwt_core::stream::ReadAbort for RecvStream {
548 type Error = Error;
549
550 async fn abort(self, error_code: xwt_core::stream::ErrorCode) -> Result<(), Self::Error> {
551 wasm_bindgen_futures::JsFuture::from(
552 self.reader.inner.cancel_with_reason(&error_code.into()),
553 )
554 .await
555 .map(|_| ())
556 .map_err(Error::from)
557 }
558}
559
560impl xwt_core::stream::ReadAborted for RecvStream {
561 type Error = Error;
562
563 async fn aborted(self) -> Result<xwt_core::stream::ErrorCode, Self::Error> {
564 let result = wasm_bindgen_futures::JsFuture::from(self.reader.inner.closed()).await;
566 match result {
567 Ok(value) => {
568 debug_assert!(value.is_undefined());
569 Ok(0)
570 }
571 Err(value) => {
572 let error: web_wt_sys::WebTransportError = value.dyn_into().unwrap();
573 if error.source() != web_wt_sys::WebTransportErrorSource::Stream {
574 return Err(Error(error.into()));
575 }
576 let Some(code) = error.stream_error_code() else {
577 return Err(Error(error.into()));
578 };
579 Ok(code)
580 }
581 }
582 }
583}
584
585impl Datagrams {
586 pub async fn receive_with<R>(
594 &self,
595 max_read_size: Option<u32>,
596 f: impl FnOnce(&mut js_sys::Uint8Array) -> R,
597 ) -> Result<R, Error> {
598 let mut buffer_guard = self.read_buffer.lock().await;
599
600 let buffer = buffer_guard
601 .take()
602 .unwrap_or_else(|| js_sys::ArrayBuffer::new(self.read_buffer_size));
603 let view = if let Some(max_read_size) = max_read_size {
604 let desired_buffer_length = buffer.byte_length().min(max_read_size);
605 js_sys::Uint8Array::new_with_byte_offset_and_length(&buffer, 0, desired_buffer_length)
606 } else {
607 js_sys::Uint8Array::new(&buffer)
608 };
609
610 let maybe_view =
611 web_sys_stream_utils::read_byob(&self.readable_stream_reader, view).await?;
612 let Some(mut view) = maybe_view else {
613 return Err(wasm_bindgen::JsError::new("unexpected stream termination").into());
614 };
615
616 let result = f(&mut view);
617
618 *buffer_guard = Some(view.buffer());
619 Ok(result)
620 }
621}
622
623impl xwt_core::session::datagram::MaxSize for Session {
624 fn max_datagram_size(&self) -> Option<usize> {
625 let transport = self.transport_ref();
626 let max_datagram_size = transport.datagrams().max_datagram_size();
627 Some(usize::try_from(max_datagram_size).unwrap()) }
629}
630
631impl xwt_core::session::datagram::Receive for Session {
632 type Datagram = Vec<u8>;
633 type Error = Error;
634
635 async fn receive_datagram(&self) -> Result<Self::Datagram, Self::Error> {
636 self.datagrams
637 .receive_with(None, |buffer| buffer.to_vec())
638 .await
639 }
640}
641
642impl xwt_core::session::datagram::ReceiveInto for Session {
643 type Error = Error;
644
645 async fn receive_datagram_into(&self, buf: &mut [u8]) -> Result<usize, Self::Error> {
646 let max_read_size = buf.len().try_into().unwrap();
647 self.datagrams
648 .receive_with(Some(max_read_size), |buffer| {
649 let len = buffer.length() as usize;
650 buffer.copy_to(&mut buf[..len]);
651 len
652 })
653 .await
654 }
655}
656
657impl xwt_core::session::datagram::Send for Session {
658 type Error = Error;
659
660 async fn send_datagram<D>(&self, payload: D) -> Result<(), Self::Error>
661 where
662 D: AsRef<[u8]>,
663 {
664 web_sys_stream_utils::write(&self.datagrams.writable_stream_writer, payload.as_ref())
665 .await?;
666 Ok(())
667 }
668}