web_transport_trait/lib.rs
1mod util;
2
3use std::future::Future;
4use std::time::Duration;
5
6pub use crate::util::{MaybeSend, MaybeSync};
7use bytes::{Buf, BufMut, Bytes, BytesMut};
8
9/// Connection-level statistics.
10///
11/// Methods return `Option` — `None` means the implementation doesn't track
12/// this metric, while `Some(0)` means actually zero.
13pub trait Stats {
14 /// Total bytes sent over the connection, including retransmissions and overhead.
15 fn bytes_sent(&self) -> Option<u64> {
16 None
17 }
18
19 /// Total bytes received over the connection, including duplicate and overhead.
20 fn bytes_received(&self) -> Option<u64> {
21 None
22 }
23
24 /// Total bytes lost (detected via retransmission or acknowledgement).
25 fn bytes_lost(&self) -> Option<u64> {
26 None
27 }
28
29 /// Total number of datagrams sent.
30 fn packets_sent(&self) -> Option<u64> {
31 None
32 }
33
34 /// Total number of datagrams received.
35 fn packets_received(&self) -> Option<u64> {
36 None
37 }
38
39 /// Total number of datagrams detected as lost.
40 fn packets_lost(&self) -> Option<u64> {
41 None
42 }
43
44 /// Smoothed round-trip time estimate.
45 fn rtt(&self) -> Option<Duration> {
46 None
47 }
48
49 /// Estimated available send bandwidth, in bits per second.
50 fn estimated_send_rate(&self) -> Option<u64> {
51 None
52 }
53}
54
55/// Default stats implementation that returns `None` for all metrics.
56pub struct StatsUnavailable;
57impl Stats for StatsUnavailable {}
58
59/// Error trait for WebTransport operations.
60///
61/// Implementations must be Send + Sync + 'static for use across async boundaries.
62pub trait Error: std::error::Error + MaybeSend + MaybeSync + 'static {
63 /// Returns the error code and reason if this was an application error.
64 ///
65 /// NOTE: Reason reasons are technically bytes on the wire, but we convert to a String for convenience.
66 fn session_error(&self) -> Option<(u32, String)>;
67
68 /// Returns the error code if this was a stream error.
69 fn stream_error(&self) -> Option<u32> {
70 None
71 }
72}
73
74/// A WebTransport Session, able to accept/create streams and send/recv datagrams.
75///
76/// The session can be cloned to create multiple handles.
77/// The session will be closed on drop.
78pub trait Session: Clone + MaybeSend + MaybeSync + 'static {
79 type SendStream: SendStream;
80 type RecvStream: RecvStream;
81 type Error: Error;
82
83 /// Block until the peer creates a new unidirectional stream.
84 fn accept_uni(&self)
85 -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + MaybeSend;
86
87 /// Block until the peer creates a new bidirectional stream.
88 fn accept_bi(
89 &self,
90 ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + MaybeSend;
91
92 /// Open a new bidirectional stream, which may block when there are too many concurrent streams.
93 fn open_bi(
94 &self,
95 ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + MaybeSend;
96
97 /// Open a new unidirectional stream, which may block when there are too many concurrent streams.
98 fn open_uni(&self) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + MaybeSend;
99
100 /// Send a datagram over the network.
101 ///
102 /// QUIC datagrams may be dropped for any reason:
103 /// - Network congestion.
104 /// - Random packet loss.
105 /// - Payload is larger than `max_datagram_size()`
106 /// - Peer is not receiving datagrams.
107 /// - Peer has too many outstanding datagrams.
108 /// - ???
109 fn send_datagram(&self, payload: Bytes) -> Result<(), Self::Error>;
110
111 /// Receive a datagram over the network.
112 fn recv_datagram(&self) -> impl Future<Output = Result<Bytes, Self::Error>> + MaybeSend;
113
114 /// The maximum size of a datagram that can be sent.
115 fn max_datagram_size(&self) -> usize;
116
117 /// Return the negotiated WebTransport subprotocol, if any.
118 fn protocol(&self) -> Option<&str> {
119 None
120 }
121
122 /// Close the connection immediately with a code and reason.
123 fn close(&self, code: u32, reason: &str);
124
125 /// Block until the connection is closed by either side.
126 fn closed(&self) -> impl Future<Output = Self::Error> + MaybeSend;
127
128 /// Return connection-level statistics, if supported.
129 fn stats(&self) -> impl Stats {
130 StatsUnavailable
131 }
132}
133
134/// An outgoing stream of bytes to the peer.
135///
136/// QUIC streams have flow control, which means the send rate is limited by the peer's receive window.
137/// The stream will be closed with a graceful FIN when dropped.
138pub trait SendStream: MaybeSend {
139 type Error: Error;
140
141 /// Write some of the buffer to the stream.
142 fn write(&mut self, buf: &[u8])
143 -> impl Future<Output = Result<usize, Self::Error>> + MaybeSend;
144
145 /// Write the given buffer to the stream, advancing the internal position.
146 fn write_buf<B: Buf + MaybeSend>(
147 &mut self,
148 buf: &mut B,
149 ) -> impl Future<Output = Result<usize, Self::Error>> + MaybeSend {
150 async move {
151 let chunk = buf.chunk();
152 let size = self.write(chunk).await?;
153 buf.advance(size);
154 Ok(size)
155 }
156 }
157
158 /// Write the entire [Bytes] chunk to the stream, potentially avoiding a copy.
159 fn write_chunk(
160 &mut self,
161 chunk: Bytes,
162 ) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
163 async move {
164 // Just so the arg isn't mut
165 let mut c = chunk;
166 self.write_buf(&mut c).await?;
167 Ok(())
168 }
169 }
170
171 /// A helper to write all the data in the buffer.
172 fn write_all(
173 &mut self,
174 buf: &[u8],
175 ) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
176 async move {
177 let mut pos = 0;
178 while pos < buf.len() {
179 pos += self.write(&buf[pos..]).await?;
180 }
181 Ok(())
182 }
183 }
184
185 /// A helper to write all of the data in the buffer.
186 fn write_all_buf<B: Buf + MaybeSend>(
187 &mut self,
188 buf: &mut B,
189 ) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
190 async move {
191 while buf.has_remaining() {
192 self.write_buf(buf).await?;
193 }
194 Ok(())
195 }
196 }
197
198 /// Set the stream's priority.
199 ///
200 /// Streams with higher values will be sent first, but are not guaranteed to arrive first.
201 /// This matches the W3C WebTransport `sendOrder` convention (and quinn's scheduler).
202 fn set_priority(&mut self, order: u8);
203
204 /// Mark the stream as finished, erroring on any future writes.
205 ///
206 /// [SendStream::reset] can still be called to abandon any queued data.
207 /// [SendStream::closed] should return when the FIN is acknowledged by the peer.
208 ///
209 /// NOTE: Quinn implicitly calls this on Drop, but it's a common footgun.
210 /// Implementations SHOULD [SendStream::reset] on Drop instead.
211 fn finish(&mut self) -> Result<(), Self::Error>;
212
213 /// Immediately closes the stream and discards any remaining data.
214 ///
215 /// This translates into a RESET_STREAM QUIC code.
216 /// The peer may not receive the reset code if the stream is already closed.
217 fn reset(&mut self, code: u32);
218
219 /// Block until the stream is closed by either side.
220 ///
221 /// This includes:
222 /// - We sent a RESET_STREAM via [SendStream::reset]
223 /// - We received a STOP_SENDING via [RecvStream::stop]
224 /// - A FIN is acknowledged by the peer via [SendStream::finish]
225 ///
226 /// Some implementations do not support FIN acknowledgement, in which case this will block until the FIN is sent.
227 ///
228 /// NOTE: This takes a &mut to match Quinn and to simplify the implementation.
229 fn closed(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;
230}
231
232/// An incoming stream of bytes from the peer.
233///
234/// All bytes are flushed in order and the stream is flow controlled.
235/// The stream will be closed with STOP_SENDING code=0 when dropped.
236pub trait RecvStream: MaybeSend {
237 type Error: Error;
238
239 /// Read the next chunk of data, up to the max size.
240 ///
241 /// This returns a chunk of data instead of copying, which may be more efficient.
242 fn read(
243 &mut self,
244 dst: &mut [u8],
245 ) -> impl Future<Output = Result<Option<usize>, Self::Error>> + MaybeSend;
246
247 /// Read some data into the provided buffer.
248 ///
249 /// The number of bytes read is returned, or None if the stream is closed.
250 /// The buffer will be advanced by the number of bytes read.
251 fn read_buf<B: BufMut + MaybeSend>(
252 &mut self,
253 buf: &mut B,
254 ) -> impl Future<Output = Result<Option<usize>, Self::Error>> + MaybeSend {
255 async move {
256 let dst = unsafe {
257 std::mem::transmute::<&mut bytes::buf::UninitSlice, &mut [u8]>(buf.chunk_mut())
258 };
259 let size = match self.read(dst).await? {
260 Some(size) if size > 0 => size,
261 _ => return Ok(None),
262 };
263
264 unsafe { buf.advance_mut(size) };
265
266 Ok(Some(size))
267 }
268 }
269
270 /// Read the next chunk of data, up to the max size.
271 ///
272 /// This returns a chunk of data instead of copying, which may be more efficient.
273 fn read_chunk(
274 &mut self,
275 max: usize,
276 ) -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + MaybeSend {
277 async move {
278 // Don't allocate too much. Write your own if you want to increase this buffer.
279 let mut buf = BytesMut::with_capacity(max.min(8 * 1024));
280
281 // TODO Test this, I think it will work?
282 Ok(self.read_buf(&mut buf).await?.map(|_| buf.freeze()))
283 }
284 }
285
286 /// Send a `STOP_SENDING` QUIC code, informing the peer that no more data will be read.
287 ///
288 /// An implementation MUST do this on Drop otherwise flow control will be leaked.
289 /// Call this method manually if you want to specify a code yourself.
290 fn stop(&mut self, code: u32);
291
292 /// Block until the stream has been closed by either side.
293 ///
294 /// This includes:
295 /// - We received a RESET_STREAM via [SendStream::reset]
296 /// - We sent a STOP_SENDING via [RecvStream::stop]
297 /// - We received a FIN via [SendStream::finish] and read all data.
298 fn closed(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;
299
300 /// A helper to keep reading until the stream is closed.
301 fn read_all(&mut self) -> impl Future<Output = Result<Bytes, Self::Error>> + MaybeSend {
302 async move {
303 let mut buf = BytesMut::new();
304 self.read_all_buf(&mut buf).await?;
305 Ok(buf.freeze())
306 }
307 }
308
309 /// A helper to keep reading until the buffer is full.
310 fn read_all_buf<B: BufMut + MaybeSend>(
311 &mut self,
312 buf: &mut B,
313 ) -> impl Future<Output = Result<usize, Self::Error>> + MaybeSend {
314 async move {
315 let mut size = 0;
316 while buf.has_remaining_mut() {
317 match self.read_buf(buf).await? {
318 Some(n) if n > 0 => size += n,
319 _ => break,
320 }
321 }
322 Ok(size)
323 }
324 }
325}