1use bytes::Bytes;
2use js_sys::Uint8Array;
3use url::Url;
4#[cfg(target_family = "wasm")]
5use wasm_bindgen::JsCast;
6use wasm_bindgen_futures::JsFuture;
7#[cfg(target_family = "wasm")]
8use web_sys::WritableStreamDefaultWriter;
9use web_sys::{
10 WebTransport, WebTransportBidirectionalStream, WebTransportCloseInfo, WebTransportSendStream,
11};
12
13use crate::{Error, RecvStream, SendStream};
14use web_streams::{Reader, Writer};
15
16#[derive(Clone)]
24pub struct Session {
25 inner: WebTransport,
26 url: Url,
27}
28
29impl Session {
30 pub fn new(inner: WebTransport, url: Url) -> Self {
31 Self { inner, url }
32 }
33
34 pub async fn accept_uni(&self) -> Result<RecvStream, Error> {
36 let mut reader = Reader::new(&self.inner.incoming_unidirectional_streams())?;
37
38 match reader.read().await? {
39 Some(stream) => Ok(RecvStream::new(stream)?),
40 None => Err(self.closed().await),
41 }
42 }
43
44 pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), Error> {
46 let mut reader = Reader::new(&self.inner.incoming_bidirectional_streams())?;
47
48 let stream: WebTransportBidirectionalStream = match reader.read().await? {
49 Some(stream) => stream,
50 None => return Err(self.closed().await),
51 };
52
53 let send = SendStream::new(stream.writable())?;
54 let recv = RecvStream::new(stream.readable())?;
55
56 Ok((send, recv))
57 }
58
59 pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), Error> {
61 let stream: WebTransportBidirectionalStream =
62 JsFuture::from(self.inner.create_bidirectional_stream())
63 .await?
64 .into();
65
66 let send = SendStream::new(stream.writable())?;
67 let recv = RecvStream::new(stream.readable())?;
68
69 Ok((send, recv))
70 }
71
72 pub async fn open_uni(&self) -> Result<SendStream, Error> {
74 let stream: WebTransportSendStream =
75 JsFuture::from(self.inner.create_unidirectional_stream())
76 .await?
77 .into();
78
79 let send = SendStream::new(stream)?;
80 Ok(send)
81 }
82
83 pub async fn send_datagram(&self, payload: Bytes) -> Result<(), Error> {
85 let mut writer = Writer::new(&self.inner.datagrams().writable())?;
86 writer.write(&Uint8Array::from(payload.as_ref())).await?;
87 Ok(())
88 }
89
90 pub async fn recv_datagram(&self) -> Result<Bytes, Error> {
92 let mut reader = Reader::new(&self.inner.datagrams().readable())?;
93 let data: Uint8Array = reader.read().await?.unwrap_or_default();
94 Ok(data.to_vec().into())
95 }
96
97 pub fn close(&self, code: u32, reason: &str) {
99 let info = WebTransportCloseInfo::new();
100 info.set_close_code(code);
101 info.set_reason(reason);
102 self.inner.close_with_close_info(&info);
103 }
104
105 pub async fn closed(&self) -> Error {
107 self.closed_inner().await.unwrap_err()
108 }
109
110 async fn closed_inner(&self) -> Result<(), Error> {
111 let info: WebTransportCloseInfo = JsFuture::from(self.inner.closed()).await?.into();
112 let reason = info.get_reason().unwrap_or_default();
113
114 let options = web_sys::WebTransportErrorOptions::new();
115 options.set_source(web_sys::WebTransportErrorSource::Session);
116
117 if let Ok(code) = info.get_close_code().map(u8::try_from).transpose() {
118 options.set_stream_error_code(code);
119 }
120
121 let err = web_sys::WebTransportError::new_with_message_and_options(&reason, &options)?;
122 Err(Error::Session(err))
123 }
124
125 pub fn url(&self) -> &Url {
127 &self.url
128 }
129
130 #[cfg(target_family = "wasm")]
132 fn send_datagram_nowait(&self, payload: Bytes) -> Result<(), Error> {
133 let writer = self.inner.datagrams().writable().get_writer()?;
134 let writer: WritableStreamDefaultWriter = writer.unchecked_into();
135
136 wasm_bindgen_futures::spawn_local(async move {
137 let payload = Uint8Array::from(payload.as_ref());
138 let promise = writer.write_with_chunk(&payload.into());
139 let _ = JsFuture::from(promise).await;
140 writer.release_lock();
141 });
142
143 Ok(())
144 }
145}
146
147impl PartialEq for Session {
148 fn eq(&self, other: &Self) -> bool {
149 self.inner == other.inner
150 }
151}
152
153impl Eq for Session {}
154
155#[cfg(target_family = "wasm")]
156impl webtrans_trait::Session for Session {
157 type SendStream = SendStream;
158 type RecvStream = RecvStream;
159 type Error = Error;
160
161 async fn accept_uni(&self) -> Result<Self::RecvStream, Self::Error> {
162 Self::accept_uni(self).await
163 }
164
165 async fn accept_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> {
166 Self::accept_bi(self).await
167 }
168
169 async fn open_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> {
170 Self::open_bi(self).await
171 }
172
173 async fn open_uni(&self) -> Result<Self::SendStream, Self::Error> {
174 Self::open_uni(self).await
175 }
176
177 fn send_datagram(&self, payload: Bytes) -> Result<(), Self::Error> {
178 self.send_datagram_nowait(payload)
179 }
180
181 async fn recv_datagram(&self) -> Result<Bytes, Self::Error> {
182 Self::recv_datagram(self).await
183 }
184
185 fn max_datagram_size(&self) -> usize {
186 self.inner.datagrams().max_datagram_size() as usize
187 }
188
189 fn close(&self, code: u32, reason: &str) {
190 Self::close(self, code, reason);
191 }
192
193 async fn closed(&self) -> Self::Error {
194 Self::closed(self).await
195 }
196}