1#![cfg_attr(
2 target_family = "wasm",
3 doc = "The [`web_wt_sys`]-powered implementation of [`xwt_core`]."
4)]
5#![cfg_attr(
6 not(target_family = "wasm"),
7 doc = "The `web_wt_sys`-powered implementation of `xwt_core`."
8)]
9#![cfg(target_family = "wasm")]
10
11use std::rc::Rc;
12
13use wasm_bindgen::JsError;
14
15mod error;
16mod options;
17
18pub use web_sys;
19pub use web_wt_sys;
20pub use xwt_core as core;
21
22pub use {error::*, options::*};
23
24#[derive(Debug, Clone, Default)]
29pub struct Endpoint {
30 pub options: web_wt_sys::WebTransportOptions,
32}
33
34impl xwt_core::endpoint::Connect for Endpoint {
35 type Error = Error;
36 type Connecting = Connecting;
37
38 async fn connect(&self, url: &str) -> Result<Self::Connecting, Self::Error> {
39 let transport = web_wt_sys::WebTransport::new_with_options(url, &self.options)?;
40 Ok(Connecting { transport })
41 }
42}
43
44#[derive(Debug)]
47pub struct Connecting {
48 pub transport: web_wt_sys::WebTransport,
50}
51
52impl xwt_core::endpoint::connect::Connecting for Connecting {
53 type Session = Session;
54 type Error = Error;
55
56 async fn wait_connect(self) -> Result<Self::Session, Self::Error> {
57 let Connecting { transport } = self;
58 transport.ready().await?;
59
60 let datagram_read_buffer_size = 65536; let datagrams = transport.datagrams();
63 let datagram_readable_stream_reader =
64 web_sys_stream_utils::get_reader_byob(datagrams.readable());
65 let datagram_writable_stream_writer =
66 web_sys_stream_utils::get_writer(datagrams.writable());
67
68 let datagram_read_buffer = js_sys::ArrayBuffer::new(datagram_read_buffer_size);
69 let datagram_read_buffer = tokio::sync::Mutex::new(Some(datagram_read_buffer));
70
71 let session = Session {
72 transport: Rc::new(transport),
73 datagram_readable_stream_reader,
74 datagram_writable_stream_writer,
75 datagram_read_buffer_size,
76 datagram_read_buffer,
77 };
78 Ok(session)
79 }
80}
81
82#[derive(Debug)]
87pub struct Session {
88 pub transport: Rc<web_wt_sys::WebTransport>,
90 pub datagram_readable_stream_reader: web_sys::ReadableStreamByobReader,
92 pub datagram_writable_stream_writer: web_sys::WritableStreamDefaultWriter,
94 pub datagram_read_buffer_size: u32,
97 pub datagram_read_buffer: tokio::sync::Mutex<Option<js_sys::ArrayBuffer>>,
99}
100
101impl xwt_core::session::stream::SendSpec for Session {
102 type SendStream = SendStream;
103}
104
105impl xwt_core::session::stream::RecvSpec for Session {
106 type RecvStream = RecvStream;
107}
108
109pub struct SendStream {
111 pub transport: Rc<web_wt_sys::WebTransport>,
113 pub stream: web_wt_sys::WebTransportSendStream,
115 pub writer: web_sys_async_io::Writer,
117}
118
119pub struct RecvStream {
121 pub transport: Rc<web_wt_sys::WebTransport>,
123 pub stream: web_wt_sys::WebTransportReceiveStream,
125 pub reader: web_sys_async_io::Reader,
127}
128
129fn wrap_recv_stream(
131 transport: &Rc<web_wt_sys::WebTransport>,
132 stream: web_wt_sys::WebTransportReceiveStream,
133) -> RecvStream {
134 let reader = web_sys_stream_utils::get_reader_byob(stream.clone());
135 let reader: wasm_bindgen::JsValue = reader.into();
136 let reader = reader.into();
137 let reader = web_sys_async_io::Reader::new(reader);
138
139 RecvStream {
140 transport: Rc::clone(transport),
141 stream,
142 reader,
143 }
144}
145
146fn wrap_send_stream(
148 transport: &Rc<web_wt_sys::WebTransport>,
149 stream: web_wt_sys::WebTransportSendStream,
150) -> SendStream {
151 let writer = stream.get_writer().unwrap();
152 let writer = web_sys_async_io::Writer::new(writer.into());
153 SendStream {
154 transport: Rc::clone(transport),
155 stream,
156 writer,
157 }
158}
159
160fn wrap_bi_stream(
162 transport: &Rc<web_wt_sys::WebTransport>,
163 stream: web_wt_sys::WebTransportBidirectionalStream,
164) -> (SendStream, RecvStream) {
165 let writeable = stream.writable();
166 let readable = stream.readable();
167
168 let send_stream = wrap_send_stream(transport, writeable);
169 let recv_stream = wrap_recv_stream(transport, readable);
170
171 (send_stream, recv_stream)
172}
173
174impl xwt_core::session::stream::OpenBi for Session {
175 type Opening = xwt_core::utils::dummy::OpeningBiStream<Session>;
176
177 type Error = Error;
178
179 async fn open_bi(&self) -> Result<Self::Opening, Self::Error> {
180 let value = self.transport.create_bidirectional_stream().await?;
181 let value = wrap_bi_stream(&self.transport, value);
182 Ok(xwt_core::utils::dummy::OpeningBiStream(value))
183 }
184}
185
186impl xwt_core::session::stream::AcceptBi for Session {
187 type Error = Error;
188
189 async fn accept_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> {
190 let incoming: web_sys::ReadableStream = self.transport.incoming_bidirectional_streams();
191 let reader: wasm_bindgen::JsValue = incoming.get_reader().into();
192 let reader: web_sys::ReadableStreamDefaultReader = reader.into();
193 let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
194 let read_result: web_wt_sys::ReadableStreamReadResult = read_result.into();
195 if read_result.is_done() {
196 return Err(Error(JsError::new("xwt: accept bi reader is done").into()));
197 }
198 let value: web_wt_sys::WebTransportBidirectionalStream = read_result.get_value().into();
199 let value = wrap_bi_stream(&self.transport, value);
200 Ok(value)
201 }
202}
203
204impl xwt_core::session::stream::OpenUni for Session {
205 type Opening = xwt_core::utils::dummy::OpeningUniStream<Session>;
206 type Error = Error;
207
208 async fn open_uni(&self) -> Result<Self::Opening, Self::Error> {
209 let value = self.transport.create_unidirectional_stream().await?;
210 let send_stream = wrap_send_stream(&self.transport, value);
211 Ok(xwt_core::utils::dummy::OpeningUniStream(send_stream))
212 }
213}
214
215impl xwt_core::session::stream::AcceptUni for Session {
216 type Error = Error;
217
218 async fn accept_uni(&self) -> Result<Self::RecvStream, Self::Error> {
219 let incoming: web_sys::ReadableStream = self.transport.incoming_unidirectional_streams();
220 let reader: wasm_bindgen::JsValue = incoming.get_reader().into();
221 let reader: web_sys::ReadableStreamDefaultReader = reader.into();
222 let read_result = wasm_bindgen_futures::JsFuture::from(reader.read()).await?;
223 let read_result: web_wt_sys::ReadableStreamReadResult = read_result.into();
224 if read_result.is_done() {
225 return Err(Error(JsError::new("xwt: accept uni reader is done").into()));
226 }
227 let value: web_wt_sys::WebTransportReceiveStream = read_result.get_value().into();
228 let recv_stream = wrap_recv_stream(&self.transport, value);
229 Ok(recv_stream)
230 }
231}
232
233impl tokio::io::AsyncWrite for SendStream {
234 fn poll_write(
235 mut self: std::pin::Pin<&mut Self>,
236 cx: &mut std::task::Context<'_>,
237 buf: &[u8],
238 ) -> std::task::Poll<Result<usize, std::io::Error>> {
239 std::pin::Pin::new(&mut self.writer).poll_write(cx, buf)
240 }
241
242 fn poll_flush(
243 mut self: std::pin::Pin<&mut Self>,
244 cx: &mut std::task::Context<'_>,
245 ) -> std::task::Poll<Result<(), std::io::Error>> {
246 std::pin::Pin::new(&mut self.writer).poll_flush(cx)
247 }
248
249 fn poll_shutdown(
250 mut self: std::pin::Pin<&mut Self>,
251 cx: &mut std::task::Context<'_>,
252 ) -> std::task::Poll<Result<(), std::io::Error>> {
253 std::pin::Pin::new(&mut self.writer).poll_shutdown(cx)
254 }
255}
256
257impl tokio::io::AsyncRead for RecvStream {
258 fn poll_read(
259 mut self: std::pin::Pin<&mut Self>,
260 cx: &mut std::task::Context<'_>,
261 buf: &mut tokio::io::ReadBuf<'_>,
262 ) -> std::task::Poll<std::io::Result<()>> {
263 std::pin::Pin::new(&mut self.reader).poll_read(cx, buf)
264 }
265}
266
267impl xwt_core::stream::Write for SendStream {
268 type Error = Error;
269
270 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
271 web_sys_stream_utils::write(&self.writer.inner, buf).await?;
272 Ok(buf.len())
273 }
274}
275
276impl xwt_core::stream::Read for RecvStream {
277 type Error = Error;
278
279 async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Self::Error> {
280 let requested_size = buf.len().try_into().unwrap();
281 let internal_buf = self
282 .reader
283 .internal_buf
284 .take()
285 .filter(|internal_buf| {
286 let actual_size = internal_buf.byte_length();
287 debug_assert!(actual_size > 0);
288 actual_size >= requested_size
289 })
290 .unwrap_or_else(|| js_sys::ArrayBuffer::new(requested_size));
291 let internal_buf_view = js_sys::Uint8Array::new(&internal_buf);
292 let maybe_internal_buf_view =
293 web_sys_stream_utils::read_byob(&self.reader.inner, internal_buf_view).await?;
294 let Some(internal_buf_view) = maybe_internal_buf_view else {
295 return Ok(None);
296 };
297
298 let len = internal_buf_view.byte_length().try_into().unwrap();
300
301 internal_buf_view.copy_to(&mut buf[..len]);
302
303 self.reader.internal_buf = Some(internal_buf_view.buffer());
304
305 Ok(Some(len))
306 }
307}
308
309impl Session {
310 pub async fn receive_datagram_with<R>(
318 &self,
319 f: impl FnOnce(&mut js_sys::Uint8Array) -> R,
320 ) -> Result<R, Error> {
321 let mut buffer_guard = self.datagram_read_buffer.lock().await;
322
323 let buffer = buffer_guard
324 .take()
325 .unwrap_or_else(|| js_sys::ArrayBuffer::new(self.datagram_read_buffer_size));
326 let view = js_sys::Uint8Array::new(&buffer);
327
328 let maybe_view =
329 web_sys_stream_utils::read_byob(&self.datagram_readable_stream_reader, view).await?;
330 let Some(mut view) = maybe_view else {
331 return Err(wasm_bindgen::JsError::new("unexpected stream termination").into());
332 };
333
334 let result = f(&mut view);
335
336 *buffer_guard = Some(view.buffer());
337 Ok(result)
338 }
339}
340
341impl xwt_core::session::datagram::MaxSize for Session {
342 fn max_datagram_size(&self) -> Option<usize> {
343 let max_datagram_size = self.transport.datagrams().max_datagram_size();
344 Some(usize::try_from(max_datagram_size).unwrap()) }
346}
347
348impl xwt_core::session::datagram::Receive for Session {
349 type Datagram = Vec<u8>;
350 type Error = Error;
351
352 async fn receive_datagram(&self) -> Result<Self::Datagram, Self::Error> {
353 self.receive_datagram_with(|buffer| buffer.to_vec()).await
354 }
355}
356
357impl xwt_core::session::datagram::ReceiveInto for Session {
358 type Error = Error;
359
360 async fn receive_datagram_into(&self, buf: &mut [u8]) -> Result<usize, Self::Error> {
361 self.receive_datagram_with(|buffer| {
362 let len = buffer.length() as usize;
363 buffer.copy_to(&mut buf[..len]);
364 len
365 })
366 .await
367 }
368}
369
370impl xwt_core::session::datagram::Send for Session {
371 type Error = Error;
372
373 async fn send_datagram<D>(&self, payload: D) -> Result<(), Self::Error>
374 where
375 D: AsRef<[u8]>,
376 {
377 web_sys_stream_utils::write(&self.datagram_writable_stream_writer, payload.as_ref())
378 .await?;
379 Ok(())
380 }
381}
382
383impl Drop for Session {
384 fn drop(&mut self) {
385 self.transport.close();
386 }
387}