Skip to main content

web_transport_trait/
lib.rs

1mod util;
2
3use std::future::Future;
4use std::time::Duration;
5
6pub use crate::util::{MaybeSend, MaybeSync};
7use bytes::{Buf, BufMut, Bytes, BytesMut};
8
9/// Connection-level statistics.
10///
11/// Methods return `Option` — `None` means the implementation doesn't track
12/// this metric, while `Some(0)` means actually zero.
13pub trait Stats {
14    /// Total bytes sent over the connection, including retransmissions and overhead.
15    fn bytes_sent(&self) -> Option<u64> {
16        None
17    }
18
19    /// Total bytes received over the connection, including duplicate and overhead.
20    fn bytes_received(&self) -> Option<u64> {
21        None
22    }
23
24    /// Total bytes lost (detected via retransmission or acknowledgement).
25    fn bytes_lost(&self) -> Option<u64> {
26        None
27    }
28
29    /// Total number of datagrams sent.
30    fn packets_sent(&self) -> Option<u64> {
31        None
32    }
33
34    /// Total number of datagrams received.
35    fn packets_received(&self) -> Option<u64> {
36        None
37    }
38
39    /// Total number of datagrams detected as lost.
40    fn packets_lost(&self) -> Option<u64> {
41        None
42    }
43
44    /// Smoothed round-trip time estimate.
45    fn rtt(&self) -> Option<Duration> {
46        None
47    }
48
49    /// Estimated available send bandwidth, in bits per second.
50    fn estimated_send_rate(&self) -> Option<u64> {
51        None
52    }
53}
54
55/// Default stats implementation that returns `None` for all metrics.
56pub struct StatsUnavailable;
57impl Stats for StatsUnavailable {}
58
59/// Error trait for WebTransport operations.
60///
61/// Implementations must be Send + Sync + 'static for use across async boundaries.
62pub trait Error: std::error::Error + MaybeSend + MaybeSync + 'static {
63    /// Returns the error code and reason if this was an application error.
64    ///
65    /// NOTE: Reason reasons are technically bytes on the wire, but we convert to a String for convenience.
66    fn session_error(&self) -> Option<(u32, String)>;
67
68    /// Returns the error code if this was a stream error.
69    fn stream_error(&self) -> Option<u32> {
70        None
71    }
72}
73
74/// A WebTransport Session, able to accept/create streams and send/recv datagrams.
75///
76/// The session can be cloned to create multiple handles.
77/// The session will be closed on drop.
78pub trait Session: Clone + MaybeSend + MaybeSync + 'static {
79    type SendStream: SendStream;
80    type RecvStream: RecvStream;
81    type Error: Error;
82
83    /// Block until the peer creates a new unidirectional stream.
84    fn accept_uni(&self)
85        -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + MaybeSend;
86
87    /// Block until the peer creates a new bidirectional stream.
88    fn accept_bi(
89        &self,
90    ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + MaybeSend;
91
92    /// Open a new bidirectional stream, which may block when there are too many concurrent streams.
93    fn open_bi(
94        &self,
95    ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + MaybeSend;
96
97    /// Open a new unidirectional stream, which may block when there are too many concurrent streams.
98    fn open_uni(&self) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + MaybeSend;
99
100    /// Send a datagram over the network.
101    ///
102    /// QUIC datagrams may be dropped for any reason:
103    /// - Network congestion.
104    /// - Random packet loss.
105    /// - Payload is larger than `max_datagram_size()`
106    /// - Peer is not receiving datagrams.
107    /// - Peer has too many outstanding datagrams.
108    /// - ???
109    fn send_datagram(&self, payload: Bytes) -> Result<(), Self::Error>;
110
111    /// Receive a datagram over the network.
112    fn recv_datagram(&self) -> impl Future<Output = Result<Bytes, Self::Error>> + MaybeSend;
113
114    /// The maximum size of a datagram that can be sent.
115    fn max_datagram_size(&self) -> usize;
116
117    /// Return the negotiated WebTransport subprotocol, if any.
118    fn protocol(&self) -> Option<&str> {
119        None
120    }
121
122    /// Close the connection immediately with a code and reason.
123    fn close(&self, code: u32, reason: &str);
124
125    /// Block until the connection is closed by either side.
126    fn closed(&self) -> impl Future<Output = Self::Error> + MaybeSend;
127
128    /// Return connection-level statistics, if supported.
129    fn stats(&self) -> impl Stats {
130        StatsUnavailable
131    }
132}
133
134/// An outgoing stream of bytes to the peer.
135///
136/// QUIC streams have flow control, which means the send rate is limited by the peer's receive window.
137/// The stream will be closed with a graceful FIN when dropped.
138pub trait SendStream: MaybeSend {
139    type Error: Error;
140
141    /// Write some of the buffer to the stream.
142    fn write(&mut self, buf: &[u8])
143        -> impl Future<Output = Result<usize, Self::Error>> + MaybeSend;
144
145    /// Write the given buffer to the stream, advancing the internal position.
146    fn write_buf<B: Buf + MaybeSend>(
147        &mut self,
148        buf: &mut B,
149    ) -> impl Future<Output = Result<usize, Self::Error>> + MaybeSend {
150        async move {
151            let chunk = buf.chunk();
152            let size = self.write(chunk).await?;
153            buf.advance(size);
154            Ok(size)
155        }
156    }
157
158    /// Write the entire [Bytes] chunk to the stream, potentially avoiding a copy.
159    fn write_chunk(
160        &mut self,
161        chunk: Bytes,
162    ) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
163        async move {
164            // Just so the arg isn't mut
165            let mut c = chunk;
166            self.write_buf(&mut c).await?;
167            Ok(())
168        }
169    }
170
171    /// A helper to write all the data in the buffer.
172    fn write_all(
173        &mut self,
174        buf: &[u8],
175    ) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
176        async move {
177            let mut pos = 0;
178            while pos < buf.len() {
179                pos += self.write(&buf[pos..]).await?;
180            }
181            Ok(())
182        }
183    }
184
185    /// A helper to write all of the data in the buffer.
186    fn write_all_buf<B: Buf + MaybeSend>(
187        &mut self,
188        buf: &mut B,
189    ) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
190        async move {
191            while buf.has_remaining() {
192                self.write_buf(buf).await?;
193            }
194            Ok(())
195        }
196    }
197
198    /// Set the stream's priority.
199    ///
200    /// Streams with lower values will be sent first, but are not guaranteed to arrive first.
201    fn set_priority(&mut self, order: u8);
202
203    /// Mark the stream as finished, erroring on any future writes.
204    ///
205    /// [SendStream::reset] can still be called to abandon any queued data.
206    /// [SendStream::closed] should return when the FIN is acknowledged by the peer.
207    ///
208    /// NOTE: Quinn implicitly calls this on Drop, but it's a common footgun.
209    /// Implementations SHOULD [SendStream::reset] on Drop instead.
210    fn finish(&mut self) -> Result<(), Self::Error>;
211
212    /// Immediately closes the stream and discards any remaining data.
213    ///
214    /// This translates into a RESET_STREAM QUIC code.
215    /// The peer may not receive the reset code if the stream is already closed.
216    fn reset(&mut self, code: u32);
217
218    /// Block until the stream is closed by either side.
219    ///
220    /// This includes:
221    /// - We sent a RESET_STREAM via [SendStream::reset]
222    /// - We received a STOP_SENDING via [RecvStream::stop]
223    /// - A FIN is acknowledged by the peer via [SendStream::finish]
224    ///
225    /// Some implementations do not support FIN acknowledgement, in which case this will block until the FIN is sent.
226    ///
227    /// NOTE: This takes a &mut to match Quinn and to simplify the implementation.
228    fn closed(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;
229}
230
231/// An incoming stream of bytes from the peer.
232///
233/// All bytes are flushed in order and the stream is flow controlled.
234/// The stream will be closed with STOP_SENDING code=0 when dropped.
235pub trait RecvStream: MaybeSend {
236    type Error: Error;
237
238    /// Read the next chunk of data, up to the max size.
239    ///
240    /// This returns a chunk of data instead of copying, which may be more efficient.
241    fn read(
242        &mut self,
243        dst: &mut [u8],
244    ) -> impl Future<Output = Result<Option<usize>, Self::Error>> + MaybeSend;
245
246    /// Read some data into the provided buffer.
247    ///
248    /// The number of bytes read is returned, or None if the stream is closed.
249    /// The buffer will be advanced by the number of bytes read.
250    fn read_buf<B: BufMut + MaybeSend>(
251        &mut self,
252        buf: &mut B,
253    ) -> impl Future<Output = Result<Option<usize>, Self::Error>> + MaybeSend {
254        async move {
255            let dst = unsafe {
256                std::mem::transmute::<&mut bytes::buf::UninitSlice, &mut [u8]>(buf.chunk_mut())
257            };
258            let size = match self.read(dst).await? {
259                Some(size) if size > 0 => size,
260                _ => return Ok(None),
261            };
262
263            unsafe { buf.advance_mut(size) };
264
265            Ok(Some(size))
266        }
267    }
268
269    /// Read the next chunk of data, up to the max size.
270    ///
271    /// This returns a chunk of data instead of copying, which may be more efficient.
272    fn read_chunk(
273        &mut self,
274        max: usize,
275    ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + MaybeSend {
276        async move {
277            // Don't allocate too much. Write your own if you want to increase this buffer.
278            let mut buf = BytesMut::with_capacity(max.min(8 * 1024));
279
280            // TODO Test this, I think it will work?
281            Ok(self.read_buf(&mut buf).await?.map(|_| buf.freeze()))
282        }
283    }
284
285    /// Send a `STOP_SENDING` QUIC code, informing the peer that no more data will be read.
286    ///
287    /// An implementation MUST do this on Drop otherwise flow control will be leaked.
288    /// Call this method manually if you want to specify a code yourself.
289    fn stop(&mut self, code: u32);
290
291    /// Block until the stream has been closed by either side.
292    ///
293    /// This includes:
294    /// - We received a RESET_STREAM via [SendStream::reset]
295    /// - We sent a STOP_SENDING via [RecvStream::stop]
296    /// - We received a FIN via [SendStream::finish] and read all data.
297    fn closed(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;
298
299    /// A helper to keep reading until the stream is closed.
300    fn read_all(&mut self) -> impl Future<Output = Result<Bytes, Self::Error>> + MaybeSend {
301        async move {
302            let mut buf = BytesMut::new();
303            self.read_all_buf(&mut buf).await?;
304            Ok(buf.freeze())
305        }
306    }
307
308    /// A helper to keep reading until the buffer is full.
309    fn read_all_buf<B: BufMut + MaybeSend>(
310        &mut self,
311        buf: &mut B,
312    ) -> impl Future<Output = Result<usize, Self::Error>> + MaybeSend {
313        async move {
314            let mut size = 0;
315            while buf.has_remaining_mut() {
316                match self.read_buf(buf).await? {
317                    Some(n) if n > 0 => size += n,
318                    _ => break,
319                }
320            }
321            Ok(size)
322        }
323    }
324}