web_transport_trait/lib.rs
1mod util;
2
3use std::future::Future;
4use std::time::Duration;
5
6pub use crate::util::{MaybeSend, MaybeSync};
7use bytes::{Buf, BufMut, Bytes, BytesMut};
8
9/// Connection-level statistics.
10///
11/// Methods return `Option` — `None` means the implementation doesn't track
12/// this metric, while `Some(0)` means actually zero.
13pub trait Stats {
14 /// Total bytes sent over the connection, including retransmissions and overhead.
15 fn bytes_sent(&self) -> Option<u64> {
16 None
17 }
18
19 /// Total bytes received over the connection, including duplicate and overhead.
20 fn bytes_received(&self) -> Option<u64> {
21 None
22 }
23
24 /// Total bytes lost (detected via retransmission or acknowledgement).
25 fn bytes_lost(&self) -> Option<u64> {
26 None
27 }
28
29 /// Total number of datagrams sent.
30 fn packets_sent(&self) -> Option<u64> {
31 None
32 }
33
34 /// Total number of datagrams received.
35 fn packets_received(&self) -> Option<u64> {
36 None
37 }
38
39 /// Total number of datagrams detected as lost.
40 fn packets_lost(&self) -> Option<u64> {
41 None
42 }
43
44 /// Smoothed round-trip time estimate.
45 fn rtt(&self) -> Option<Duration> {
46 None
47 }
48
49 /// Estimated available send bandwidth, in bits per second.
50 fn estimated_send_rate(&self) -> Option<u64> {
51 None
52 }
53}
54
55/// Default stats implementation that returns `None` for all metrics.
56pub struct StatsUnavailable;
57impl Stats for StatsUnavailable {}
58
59/// Error trait for WebTransport operations.
60///
61/// Implementations must be Send + Sync + 'static for use across async boundaries.
62pub trait Error: std::error::Error + MaybeSend + MaybeSync + 'static {
63 /// Returns the error code and reason if this was an application error.
64 ///
65 /// NOTE: Reason reasons are technically bytes on the wire, but we convert to a String for convenience.
66 fn session_error(&self) -> Option<(u32, String)>;
67
68 /// Returns the error code if this was a stream error.
69 fn stream_error(&self) -> Option<u32> {
70 None
71 }
72}
73
74/// A WebTransport Session, able to accept/create streams and send/recv datagrams.
75///
76/// The session can be cloned to create multiple handles.
77/// The session will be closed on drop.
78pub trait Session: Clone + MaybeSend + MaybeSync + 'static {
79 type SendStream: SendStream;
80 type RecvStream: RecvStream;
81 type Error: Error;
82
83 /// Block until the peer creates a new unidirectional stream.
84 fn accept_uni(&self)
85 -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + MaybeSend;
86
87 /// Block until the peer creates a new bidirectional stream.
88 fn accept_bi(
89 &self,
90 ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + MaybeSend;
91
92 /// Open a new bidirectional stream, which may block when there are too many concurrent streams.
93 fn open_bi(
94 &self,
95 ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + MaybeSend;
96
97 /// Open a new unidirectional stream, which may block when there are too many concurrent streams.
98 fn open_uni(&self) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + MaybeSend;
99
100 /// Send a datagram over the network.
101 ///
102 /// QUIC datagrams may be dropped for any reason:
103 /// - Network congestion.
104 /// - Random packet loss.
105 /// - Payload is larger than `max_datagram_size()`
106 /// - Peer is not receiving datagrams.
107 /// - Peer has too many outstanding datagrams.
108 /// - ???
109 fn send_datagram(&self, payload: Bytes) -> Result<(), Self::Error>;
110
111 /// Receive a datagram over the network.
112 fn recv_datagram(&self) -> impl Future<Output = Result<Bytes, Self::Error>> + MaybeSend;
113
114 /// The maximum size of a datagram that can be sent.
115 fn max_datagram_size(&self) -> usize;
116
117 /// Return the negotiated WebTransport subprotocol, if any.
118 fn protocol(&self) -> Option<&str> {
119 None
120 }
121
122 /// Close the connection immediately with a code and reason.
123 fn close(&self, code: u32, reason: &str);
124
125 /// Block until the connection is closed by either side.
126 fn closed(&self) -> impl Future<Output = Self::Error> + MaybeSend;
127
128 /// Return connection-level statistics, if supported.
129 fn stats(&self) -> impl Stats {
130 StatsUnavailable
131 }
132}
133
134/// An outgoing stream of bytes to the peer.
135///
136/// QUIC streams have flow control, which means the send rate is limited by the peer's receive window.
137/// The stream will be closed with a graceful FIN when dropped.
138pub trait SendStream: MaybeSend {
139 type Error: Error;
140
141 /// Write some of the buffer to the stream.
142 fn write(&mut self, buf: &[u8])
143 -> impl Future<Output = Result<usize, Self::Error>> + MaybeSend;
144
145 /// Write the given buffer to the stream, advancing the internal position.
146 fn write_buf<B: Buf + MaybeSend>(
147 &mut self,
148 buf: &mut B,
149 ) -> impl Future<Output = Result<usize, Self::Error>> + MaybeSend {
150 async move {
151 let chunk = buf.chunk();
152 let size = self.write(chunk).await?;
153 buf.advance(size);
154 Ok(size)
155 }
156 }
157
158 /// Write the entire [Bytes] chunk to the stream, potentially avoiding a copy.
159 fn write_chunk(
160 &mut self,
161 chunk: Bytes,
162 ) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
163 async move {
164 // Just so the arg isn't mut
165 let mut c = chunk;
166 self.write_buf(&mut c).await?;
167 Ok(())
168 }
169 }
170
171 /// A helper to write all the data in the buffer.
172 fn write_all(
173 &mut self,
174 buf: &[u8],
175 ) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
176 async move {
177 let mut pos = 0;
178 while pos < buf.len() {
179 pos += self.write(&buf[pos..]).await?;
180 }
181 Ok(())
182 }
183 }
184
185 /// A helper to write all of the data in the buffer.
186 fn write_all_buf<B: Buf + MaybeSend>(
187 &mut self,
188 buf: &mut B,
189 ) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
190 async move {
191 while buf.has_remaining() {
192 self.write_buf(buf).await?;
193 }
194 Ok(())
195 }
196 }
197
198 /// Set the stream's priority.
199 ///
200 /// Streams with lower values will be sent first, but are not guaranteed to arrive first.
201 fn set_priority(&mut self, order: u8);
202
203 /// Mark the stream as finished, erroring on any future writes.
204 ///
205 /// [SendStream::reset] can still be called to abandon any queued data.
206 /// [SendStream::closed] should return when the FIN is acknowledged by the peer.
207 ///
208 /// NOTE: Quinn implicitly calls this on Drop, but it's a common footgun.
209 /// Implementations SHOULD [SendStream::reset] on Drop instead.
210 fn finish(&mut self) -> Result<(), Self::Error>;
211
212 /// Immediately closes the stream and discards any remaining data.
213 ///
214 /// This translates into a RESET_STREAM QUIC code.
215 /// The peer may not receive the reset code if the stream is already closed.
216 fn reset(&mut self, code: u32);
217
218 /// Block until the stream is closed by either side.
219 ///
220 /// This includes:
221 /// - We sent a RESET_STREAM via [SendStream::reset]
222 /// - We received a STOP_SENDING via [RecvStream::stop]
223 /// - A FIN is acknowledged by the peer via [SendStream::finish]
224 ///
225 /// Some implementations do not support FIN acknowledgement, in which case this will block until the FIN is sent.
226 ///
227 /// NOTE: This takes a &mut to match Quinn and to simplify the implementation.
228 fn closed(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;
229}
230
231/// An incoming stream of bytes from the peer.
232///
233/// All bytes are flushed in order and the stream is flow controlled.
234/// The stream will be closed with STOP_SENDING code=0 when dropped.
235pub trait RecvStream: MaybeSend {
236 type Error: Error;
237
238 /// Read the next chunk of data, up to the max size.
239 ///
240 /// This returns a chunk of data instead of copying, which may be more efficient.
241 fn read(
242 &mut self,
243 dst: &mut [u8],
244 ) -> impl Future<Output = Result<Option<usize>, Self::Error>> + MaybeSend;
245
246 /// Read some data into the provided buffer.
247 ///
248 /// The number of bytes read is returned, or None if the stream is closed.
249 /// The buffer will be advanced by the number of bytes read.
250 fn read_buf<B: BufMut + MaybeSend>(
251 &mut self,
252 buf: &mut B,
253 ) -> impl Future<Output = Result<Option<usize>, Self::Error>> + MaybeSend {
254 async move {
255 let dst = unsafe {
256 std::mem::transmute::<&mut bytes::buf::UninitSlice, &mut [u8]>(buf.chunk_mut())
257 };
258 let size = match self.read(dst).await? {
259 Some(size) if size > 0 => size,
260 _ => return Ok(None),
261 };
262
263 unsafe { buf.advance_mut(size) };
264
265 Ok(Some(size))
266 }
267 }
268
269 /// Read the next chunk of data, up to the max size.
270 ///
271 /// This returns a chunk of data instead of copying, which may be more efficient.
272 fn read_chunk(
273 &mut self,
274 max: usize,
275 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + MaybeSend {
276 async move {
277 // Don't allocate too much. Write your own if you want to increase this buffer.
278 let mut buf = BytesMut::with_capacity(max.min(8 * 1024));
279
280 // TODO Test this, I think it will work?
281 Ok(self.read_buf(&mut buf).await?.map(|_| buf.freeze()))
282 }
283 }
284
285 /// Send a `STOP_SENDING` QUIC code, informing the peer that no more data will be read.
286 ///
287 /// An implementation MUST do this on Drop otherwise flow control will be leaked.
288 /// Call this method manually if you want to specify a code yourself.
289 fn stop(&mut self, code: u32);
290
291 /// Block until the stream has been closed by either side.
292 ///
293 /// This includes:
294 /// - We received a RESET_STREAM via [SendStream::reset]
295 /// - We sent a STOP_SENDING via [RecvStream::stop]
296 /// - We received a FIN via [SendStream::finish] and read all data.
297 fn closed(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;
298
299 /// A helper to keep reading until the stream is closed.
300 fn read_all(&mut self) -> impl Future<Output = Result<Bytes, Self::Error>> + MaybeSend {
301 async move {
302 let mut buf = BytesMut::new();
303 self.read_all_buf(&mut buf).await?;
304 Ok(buf.freeze())
305 }
306 }
307
308 /// A helper to keep reading until the buffer is full.
309 fn read_all_buf<B: BufMut + MaybeSend>(
310 &mut self,
311 buf: &mut B,
312 ) -> impl Future<Output = Result<usize, Self::Error>> + MaybeSend {
313 async move {
314 let mut size = 0;
315 while buf.has_remaining_mut() {
316 match self.read_buf(buf).await? {
317 Some(n) if n > 0 => size += n,
318 _ => break,
319 }
320 }
321 Ok(size)
322 }
323 }
324}