web_transport/
quinn.rs

1use bytes::{Buf, BufMut, Bytes};
2use url::Url;
3
4// Export the Quinn implementation to simplify Cargo.toml
5pub use web_transport_quinn as quinn;
6
7pub use web_transport_quinn::CongestionControl;
8
9/// Create a [Client] that can be used to dial multiple [Session]s.
10#[derive(Default)]
11pub struct ClientBuilder {
12    inner: quinn::ClientBuilder,
13}
14
15impl ClientBuilder {
16    pub fn new() -> Self {
17        Self::default()
18    }
19
20    /// For compatibility with WASM. Panics if `val` is false, but does nothing else.
21    pub fn with_unreliable(self, val: bool) -> Self {
22        Self {
23            inner: self.inner.with_unreliable(val),
24        }
25    }
26
27    /// Allow a lower latency congestion controller.
28    pub fn with_congestion_control(self, cc: CongestionControl) -> Self {
29        Self {
30            inner: self.inner.with_congestion_control(cc),
31        }
32    }
33
34    /// Accept the server's certificate hashes (sha256) instead of using a root CA.
35    pub fn with_server_certificate_hashes(self, hashes: Vec<Vec<u8>>) -> Result<Client, Error> {
36        Ok(Client {
37            inner: self.inner.with_server_certificate_hashes(hashes)?,
38        })
39    }
40
41    /// Accept certificates using root CAs.
42    pub fn with_system_roots(self) -> Result<Client, Error> {
43        Ok(Client {
44            inner: self.inner.with_system_roots()?,
45        })
46    }
47}
48
49/// Used to dial multiple [Session]s.
50#[derive(Clone, Debug)]
51pub struct Client {
52    inner: quinn::Client,
53}
54
55impl Client {
56    /// Connect to the server.
57    pub async fn connect(&self, url: Url) -> Result<Session, Error> {
58        Ok(self.inner.connect(url).await?.into())
59    }
60}
61
62/// Used to accept incoming connections and create [Session]s. (native only)
63///
64/// NOTE: This is not supported in the WASM runtime, as browsers are clients.
65///
66/// Use a [web_transport_quinn::ServerBuilder] to create a [web_transport_quinn::Server] and then [Into<Server>].
67/// Alternatively, establish a [web_transport_quinn::Session] directly and then [Into<Session>].
68pub struct Server {
69    inner: quinn::Server,
70}
71
72impl From<quinn::Server> for Server {
73    fn from(server: quinn::Server) -> Self {
74        Self { inner: server }
75    }
76}
77
78impl Server {
79    /// Accept an incoming connection.
80    pub async fn accept(&mut self) -> Result<Option<Session>, Error> {
81        match self.inner.accept().await {
82            Some(session) => Ok(Some(
83                session
84                    .ok()
85                    .await
86                    .map_err(|e| Error::Write(e.into()))?
87                    .into(),
88            )),
89            None => Ok(None),
90        }
91    }
92}
93
94/// A WebTransport Session, able to accept/create streams and send/recv datagrams.
95///
96/// The session can be cloned to create multiple handles.
97/// The session will be closed with on drop.
98#[derive(Clone, PartialEq, Eq)]
99pub struct Session {
100    inner: quinn::Session,
101}
102
103impl Session {
104    /// Block until the peer creates a new unidirectional stream.
105    ///
106    /// Won't return None unless the connection is closed.
107    pub async fn accept_uni(&mut self) -> Result<RecvStream, Error> {
108        let stream = self.inner.accept_uni().await?;
109        Ok(RecvStream::new(stream))
110    }
111
112    /// Block until the peer creates a new bidirectional stream.
113    pub async fn accept_bi(&mut self) -> Result<(SendStream, RecvStream), Error> {
114        let (s, r) = self.inner.accept_bi().await?;
115        Ok((SendStream::new(s), RecvStream::new(r)))
116    }
117
118    /// Open a new bidirectional stream, which may block when there are too many concurrent streams.
119    pub async fn open_bi(&mut self) -> Result<(SendStream, RecvStream), Error> {
120        Ok(self
121            .inner
122            .open_bi()
123            .await
124            .map(|(s, r)| (SendStream::new(s), RecvStream::new(r)))?)
125    }
126
127    /// Open a new unidirectional stream, which may block when there are too many concurrent streams.
128    pub async fn open_uni(&mut self) -> Result<SendStream, Error> {
129        Ok(self.inner.open_uni().await.map(SendStream::new)?)
130    }
131
132    /// Send a datagram over the network.
133    ///
134    /// QUIC datagrams may be dropped for any reason:
135    /// - Network congestion.
136    /// - Random packet loss.
137    /// - Payload is larger than `max_datagram_size()`
138    /// - Peer is not receiving datagrams.
139    /// - Peer has too many outstanding datagrams.
140    /// - ???
141    pub async fn send_datagram(&mut self, payload: Bytes) -> Result<(), Error> {
142        // NOTE: This is not async, but we need to make it async to match the wasm implementation.
143        Ok(self.inner.send_datagram(payload)?)
144    }
145
146    /// The maximum size of a datagram that can be sent.
147    pub async fn max_datagram_size(&self) -> usize {
148        self.inner.max_datagram_size()
149    }
150
151    /// Receive a datagram over the network.
152    pub async fn recv_datagram(&mut self) -> Result<Bytes, Error> {
153        Ok(self.inner.read_datagram().await?)
154    }
155
156    /// Close the connection immediately with a code and reason.
157    pub fn close(&mut self, code: u32, reason: &str) {
158        self.inner.close(code, reason.as_bytes())
159    }
160
161    /// Block until the connection is closed.
162    pub async fn closed(&self) -> Error {
163        self.inner.closed().await.into()
164    }
165
166    /// Return the URL used to create the session.
167    pub fn url(&self) -> &Url {
168        self.inner.url()
169    }
170}
171
172/// Convert a `web_transport_quinn::Session` into a `web_transport::Session`.
173impl From<quinn::Session> for Session {
174    fn from(session: quinn::Session) -> Self {
175        Session { inner: session }
176    }
177}
178
179/// An outgoing stream of bytes to the peer.
180///
181/// QUIC streams have flow control, which means the send rate is limited by the peer's receive window.
182/// The stream will be closed with a graceful FIN when dropped.
183pub struct SendStream {
184    inner: quinn::SendStream,
185}
186
187impl SendStream {
188    fn new(inner: quinn::SendStream) -> Self {
189        Self { inner }
190    }
191
192    /// Write some of the buffer to the stream.
193    #[must_use = "returns the number of bytes written"]
194    pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
195        self.inner.write(buf).await.map_err(Into::into)
196    }
197
198    /// Write some of the buffer to the stream, advancing the internal position.
199    pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, Error> {
200        // We use copy_to_bytes+write_chunk so if Bytes is provided, we can avoid allocating.
201        let size = buf.chunk().len();
202        let chunk = buf.copy_to_bytes(size);
203        self.inner.write_chunk(chunk).await?;
204        Ok(size)
205    }
206
207    /// Set the stream's priority.
208    ///
209    /// Streams with lower values will be sent first, but are not guaranteed to arrive first.
210    pub fn set_priority(&mut self, order: i32) {
211        self.inner.set_priority(order).ok();
212    }
213
214    /// Send an immediate reset code, closing the stream.
215    pub fn reset(&mut self, code: u32) {
216        self.inner.reset(code).ok();
217    }
218
219    /// Mark the stream as finished.
220    ///
221    /// This is automatically called on Drop, but can be called manually.
222    pub fn finish(&mut self) -> Result<(), Error> {
223        self.inner
224            .finish()
225            .map_err(|_| Error::Write(quinn::WriteError::ClosedStream))?;
226        Ok(())
227    }
228
229    /// Block until the stream is closed by either side.
230    ///
231    /// This returns a (potentially truncated) u8 because that's what the WASM implementation returns.
232    // TODO this should be &self but requires modifying quinn.
233    pub async fn closed(&mut self) -> Result<Option<u8>, Error> {
234        match self.inner.stopped().await {
235            Ok(None) => Ok(None),
236            Ok(Some(code)) => Ok(Some(code as u8)),
237            Err(e) => Err(Error::Session(e)),
238        }
239    }
240}
241
242/// An incoming stream of bytes from the peer.
243///
244/// All bytes are flushed in order and the stream is flow controlled.
245/// The stream will be closed with STOP_SENDING code=0 when dropped.
246pub struct RecvStream {
247    inner: quinn::RecvStream,
248}
249
250impl RecvStream {
251    fn new(inner: quinn::RecvStream) -> Self {
252        Self { inner }
253    }
254
255    /// Read the next chunk of data with the provided maximum size.
256    ///
257    /// This returns a chunk of data instead of copying, which may be more efficient.
258    pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
259        Ok(self
260            .inner
261            .read_chunk(max, true)
262            .await?
263            .map(|chunk| chunk.bytes))
264    }
265
266    /// Read some data into the provided buffer.
267    ///
268    /// The number of bytes read is returned, or None if the stream is closed.
269    /// The buffer will be advanced by the number of bytes read.
270    pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Error> {
271        let dst = buf.chunk_mut();
272        let dst = unsafe { &mut *(dst as *mut _ as *mut [u8]) };
273
274        let size = match self.inner.read(dst).await? {
275            Some(size) => size,
276            None => return Ok(None),
277        };
278
279        unsafe { buf.advance_mut(size) };
280
281        Ok(Some(size))
282    }
283
284    /// Send a `STOP_SENDING` QUIC code.
285    pub fn stop(&mut self, code: u32) {
286        self.inner.stop(code).ok();
287    }
288
289    /// Block until the stream has been closed and return the error code, if any.
290    ///
291    /// This returns a (potentially truncated) u8 because that's what the WASM implementation returns.
292    /// web-transport-quinn returns a u32 because that's what the specification says.
293    // TODO Validate the correct behavior.
294    pub async fn closed(&mut self) -> Result<Option<u8>, Error> {
295        match self.inner.received_reset().await {
296            Ok(None) => Ok(None),
297            Ok(Some(code)) => Ok(Some(code as u8)),
298            Err(e) => Err(Error::Session(e)),
299        }
300    }
301}
302
303/// A WebTransport error.
304///
305/// The source can either be a session error or a stream error.
306/// TODO This interface is currently not generic.
307#[derive(Debug, thiserror::Error, Clone)]
308pub enum Error {
309    #[error("session error: {0}")]
310    Session(#[from] quinn::SessionError),
311
312    #[error("client error: {0}")]
313    Client(#[from] quinn::ClientError),
314
315    #[error("write error: {0}")]
316    Write(quinn::WriteError),
317
318    #[error("read error: {0}")]
319    Read(quinn::ReadError),
320}
321
322impl From<quinn::WriteError> for Error {
323    fn from(e: quinn::WriteError) -> Self {
324        match e {
325            quinn::WriteError::SessionError(e) => Error::Session(e),
326            e => Error::Write(e),
327        }
328    }
329}
330impl From<quinn::ReadError> for Error {
331    fn from(e: quinn::ReadError) -> Self {
332        match e {
333            quinn::ReadError::SessionError(e) => Error::Session(e),
334            e => Error::Read(e),
335        }
336    }
337}