Skip to main content

web_transport/
quinn.rs

1use bytes::{Buf, BufMut, Bytes};
2use url::Url;
3
4// Export the Quinn implementation to simplify Cargo.toml
5pub use web_transport_quinn as quinn;
6
7pub use web_transport_quinn::CongestionControl;
8
9/// Create a [Client] that can be used to dial multiple [Session]s.
10#[derive(Default, Clone)]
11pub struct ClientBuilder {
12    inner: quinn::ClientBuilder,
13}
14
15impl ClientBuilder {
16    pub fn new() -> Self {
17        Self::default()
18    }
19
20    /// Allow a lower latency congestion controller.
21    pub fn with_congestion_control(self, cc: CongestionControl) -> Self {
22        Self {
23            inner: self.inner.with_congestion_control(cc),
24        }
25    }
26
27    /// Accept the server's certificate hashes (sha256) instead of using a root CA.
28    pub fn with_server_certificate_hashes(self, hashes: Vec<Vec<u8>>) -> Result<Client, Error> {
29        Ok(Client {
30            inner: self.inner.with_server_certificate_hashes(hashes)?,
31        })
32    }
33
34    /// Accept certificates using root CAs.
35    pub fn with_system_roots(self) -> Result<Client, Error> {
36        Ok(Client {
37            inner: self.inner.with_system_roots()?,
38        })
39    }
40}
41
42/// Used to dial multiple [Session]s.
43#[derive(Clone, Debug)]
44pub struct Client {
45    inner: quinn::Client,
46}
47
48impl Client {
49    /// Connect to the server.
50    pub async fn connect(&self, url: Url) -> Result<Session, Error> {
51        Ok(self.inner.connect(url).await?.into())
52    }
53}
54
55/// Used to accept incoming connections and create [Session]s. (native only)
56///
57/// NOTE: This is not supported in the WASM runtime, as browsers are clients.
58///
59/// Use a [web_transport_quinn::ServerBuilder] to create a [web_transport_quinn::Server] and then [Into<Server>].
60/// Alternatively, establish a [web_transport_quinn::Session] directly and then [Into<Session>].
61pub struct Server {
62    inner: quinn::Server,
63}
64
65impl From<quinn::Server> for Server {
66    fn from(server: quinn::Server) -> Self {
67        Self { inner: server }
68    }
69}
70
71impl Server {
72    /// Accept an incoming connection.
73    pub async fn accept(&mut self) -> Result<Option<Session>, Error> {
74        match self.inner.accept().await {
75            // TODO add sub-protocol support
76            Some(session) => Ok(Some(session.ok().await?.into())),
77            None => Ok(None),
78        }
79    }
80}
81
82/// A WebTransport Session, able to accept/create streams and send/recv datagrams.
83///
84/// The session can be cloned to create multiple handles, which is which no method is &mut.
85/// The session will be closed with on drop.
86#[derive(Clone, PartialEq, Eq)]
87pub struct Session {
88    inner: quinn::Session,
89}
90
91impl Session {
92    /// Block until the peer creates a new unidirectional stream.
93    ///
94    /// Won't return None unless the connection is closed.
95    pub async fn accept_uni(&self) -> Result<RecvStream, Error> {
96        let stream = self.inner.accept_uni().await?;
97        Ok(RecvStream::new(stream))
98    }
99
100    /// Block until the peer creates a new bidirectional stream.
101    pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), Error> {
102        let (s, r) = self.inner.accept_bi().await?;
103        Ok((SendStream::new(s), RecvStream::new(r)))
104    }
105
106    /// Open a new bidirectional stream, which may block when there are too many concurrent streams.
107    pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), Error> {
108        Ok(self
109            .inner
110            .open_bi()
111            .await
112            .map(|(s, r)| (SendStream::new(s), RecvStream::new(r)))?)
113    }
114
115    /// Open a new unidirectional stream, which may block when there are too many concurrent streams.
116    pub async fn open_uni(&self) -> Result<SendStream, Error> {
117        Ok(self.inner.open_uni().await.map(SendStream::new)?)
118    }
119
120    /// Send a datagram over the network.
121    ///
122    /// QUIC datagrams may be dropped for any reason:
123    /// - Network congestion.
124    /// - Random packet loss.
125    /// - Payload is larger than `max_datagram_size()`
126    /// - Peer is not receiving datagrams.
127    /// - Peer has too many outstanding datagrams.
128    /// - ???
129    pub async fn send_datagram(&self, payload: Bytes) -> Result<(), Error> {
130        // NOTE: This is not async, but we need to make it async to match the wasm implementation.
131        Ok(self.inner.send_datagram(payload)?)
132    }
133
134    /// The maximum size of a datagram that can be sent.
135    pub async fn max_datagram_size(&self) -> usize {
136        self.inner.max_datagram_size()
137    }
138
139    /// Receive a datagram over the network.
140    pub async fn recv_datagram(&self) -> Result<Bytes, Error> {
141        Ok(self.inner.read_datagram().await?)
142    }
143
144    /// Close the connection immediately with a code and reason.
145    pub fn close(&self, code: u32, reason: &str) {
146        self.inner.close(code, reason.as_bytes())
147    }
148
149    /// Block until the connection is closed.
150    pub async fn closed(&self) -> Error {
151        self.inner.closed().await.into()
152    }
153
154    /// Return the URL used to create the session.
155    pub fn url(&self) -> &Url {
156        &self.inner.request().url
157    }
158
159    /// Return the application protocol used to create the session.
160    pub fn protocol(&self) -> Option<&str> {
161        self.inner.response().protocol.as_deref()
162    }
163}
164
165/// Convert a `web_transport_quinn::Session` into a `web_transport::Session`.
166impl From<quinn::Session> for Session {
167    fn from(session: quinn::Session) -> Self {
168        Session { inner: session }
169    }
170}
171
172/// An outgoing stream of bytes to the peer.
173///
174/// QUIC streams have flow control, which means the send rate is limited by the peer's receive window.
175/// The stream will be closed with a graceful FIN when dropped.
176pub struct SendStream {
177    inner: quinn::SendStream,
178}
179
180impl SendStream {
181    fn new(inner: quinn::SendStream) -> Self {
182        Self { inner }
183    }
184
185    /// Write some of the buffer to the stream.
186    #[must_use = "returns the number of bytes written"]
187    pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
188        self.inner.write(buf).await.map_err(Into::into)
189    }
190
191    /// Write some of the buffer to the stream, advancing the internal position.
192    pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, Error> {
193        // We use copy_to_bytes+write_chunk so if Bytes is provided, we can avoid allocating.
194        let size = buf.chunk().len();
195        let chunk = buf.copy_to_bytes(size);
196        self.inner.write_chunk(chunk).await?;
197        Ok(size)
198    }
199
200    /// Set the stream's priority.
201    ///
202    /// Streams with lower values will be sent first, but are not guaranteed to arrive first.
203    pub fn set_priority(&mut self, order: i32) {
204        self.inner.set_priority(order).ok();
205    }
206
207    /// Send an immediate reset code, closing the stream.
208    pub fn reset(&mut self, code: u32) {
209        self.inner.reset(code).ok();
210    }
211
212    /// Mark the stream as finished.
213    ///
214    /// This is automatically called on Drop, but can be called manually.
215    pub fn finish(&mut self) -> Result<(), Error> {
216        self.inner
217            .finish()
218            .map_err(|_| Error::Write(quinn::WriteError::ClosedStream))?;
219        Ok(())
220    }
221
222    /// Block until the stream is closed by either side.
223    ///
224    /// This returns a (potentially truncated) u8 because that's what the WASM implementation returns.
225    // TODO this should be &self but requires modifying quinn.
226    pub async fn closed(&mut self) -> Result<Option<u8>, Error> {
227        match self.inner.stopped().await {
228            Ok(None) => Ok(None),
229            Ok(Some(code)) => Ok(Some(code as u8)),
230            Err(e) => Err(Error::Session(e)),
231        }
232    }
233}
234
235/// An incoming stream of bytes from the peer.
236///
237/// All bytes are flushed in order and the stream is flow controlled.
238/// The stream will be closed with STOP_SENDING code=0 when dropped.
239pub struct RecvStream {
240    inner: quinn::RecvStream,
241}
242
243impl RecvStream {
244    fn new(inner: quinn::RecvStream) -> Self {
245        Self { inner }
246    }
247
248    /// Read the next chunk of data with the provided maximum size.
249    ///
250    /// This returns a chunk of data instead of copying, which may be more efficient.
251    pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
252        Ok(self
253            .inner
254            .read_chunk(max, true)
255            .await?
256            .map(|chunk| chunk.bytes))
257    }
258
259    /// Read some data into the provided buffer.
260    ///
261    /// The number of bytes read is returned, or None if the stream is closed.
262    /// The buffer will be advanced by the number of bytes read.
263    pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Error> {
264        let dst = buf.chunk_mut();
265        let dst = unsafe { &mut *(dst as *mut _ as *mut [u8]) };
266
267        let size = match self.inner.read(dst).await? {
268            Some(size) if size > 0 => size,
269            _ => return Ok(None),
270        };
271
272        unsafe { buf.advance_mut(size) };
273
274        Ok(Some(size))
275    }
276
277    /// Send a `STOP_SENDING` QUIC code.
278    pub fn stop(&mut self, code: u32) {
279        self.inner.stop(code).ok();
280    }
281
282    /// Block until the stream has been closed and return the error code, if any.
283    ///
284    /// This returns a (potentially truncated) u8 because that's what the WASM implementation returns.
285    /// web-transport-quinn returns a u32 because that's what the specification says.
286    // TODO Validate the correct behavior.
287    pub async fn closed(&mut self) -> Result<Option<u8>, Error> {
288        match self.inner.received_reset().await {
289            Ok(None) => Ok(None),
290            Ok(Some(code)) => Ok(Some(code as u8)),
291            Err(e) => Err(Error::Session(e)),
292        }
293    }
294}
295
296/// A WebTransport error.
297///
298/// The source can either be a session error or a stream error.
299/// TODO This interface is currently not generic.
300#[derive(Debug, thiserror::Error, Clone)]
301pub enum Error {
302    #[error("session error: {0}")]
303    Session(#[from] quinn::SessionError),
304
305    #[error("server error: {0}")]
306    Server(#[from] quinn::ServerError),
307
308    #[error("client error: {0}")]
309    Client(#[from] quinn::ClientError),
310
311    #[error("write error: {0}")]
312    Write(quinn::WriteError),
313
314    #[error("read error: {0}")]
315    Read(quinn::ReadError),
316}
317
318impl From<quinn::WriteError> for Error {
319    fn from(e: quinn::WriteError) -> Self {
320        match e {
321            quinn::WriteError::SessionError(e) => Error::Session(e),
322            e => Error::Write(e),
323        }
324    }
325}
326impl From<quinn::ReadError> for Error {
327    fn from(e: quinn::ReadError) -> Self {
328        match e {
329            quinn::ReadError::SessionError(e) => Error::Session(e),
330            e => Error::Read(e),
331        }
332    }
333}