ant_quic/connection/streams/
mod.rs

1use std::{
2    collections::{BinaryHeap, hash_map},
3    io,
4};
5
6use bytes::Bytes;
7use thiserror::Error;
8use tracing::{trace, warn};
9
10use super::spaces::{Retransmits, ThinRetransmits};
11use crate::{
12    Dir, StreamId, VarInt,
13    connection::streams::state::{get_or_insert_recv, get_or_insert_send},
14    frame,
15};
16
17mod recv;
18use recv::Recv;
19pub use recv::{Chunks, ReadError, ReadableError};
20
21mod send;
22pub(crate) use send::{ByteSlice, BytesArray};
23use send::{BytesSource, Send, SendState};
24pub use send::{FinishError, WriteError, Written};
25
26mod state;
27#[allow(unreachable_pub)] // fuzzing only
28pub use state::StreamsState;
29
30/// Access to streams
31pub struct Streams<'a> {
32    pub(super) state: &'a mut StreamsState,
33    pub(super) conn_state: &'a super::State,
34}
35
36impl<'a> Streams<'a> {
37    #[cfg(fuzzing)]
38    pub fn new(state: &'a mut StreamsState, conn_state: &'a super::State) -> Self {
39        Self { state, conn_state }
40    }
41
42    /// Open a single stream if possible
43    ///
44    /// Returns `None` if the streams in the given direction are currently exhausted.
45    pub fn open(&mut self, dir: Dir) -> Option<StreamId> {
46        if self.conn_state.is_closed() {
47            return None;
48        }
49
50        if self.state.next[dir as usize] >= self.state.max[dir as usize] {
51            return None;
52        }
53
54        self.state.next[dir as usize] += 1;
55        let id = StreamId::new(self.state.side, dir, self.state.next[dir as usize] - 1);
56        self.state.insert(false, id);
57        self.state.send_streams += 1;
58        Some(id)
59    }
60
61    /// Accept a remotely initiated stream of a certain directionality, if possible
62    ///
63    /// Returns `None` if there are no new incoming streams for this connection.
64    pub fn accept(&mut self, dir: Dir) -> Option<StreamId> {
65        if self.state.next_remote[dir as usize] == self.state.next_reported_remote[dir as usize] {
66            return None;
67        }
68
69        let x = self.state.next_reported_remote[dir as usize];
70        self.state.next_reported_remote[dir as usize] = x + 1;
71        if dir == Dir::Bi {
72            self.state.send_streams += 1;
73        }
74
75        Some(StreamId::new(!self.state.side, dir, x))
76    }
77
78    #[cfg(fuzzing)]
79    pub fn state(&mut self) -> &mut StreamsState {
80        self.state
81    }
82
83    /// The number of streams that may have unacknowledged data.
84    pub fn send_streams(&self) -> usize {
85        self.state.send_streams
86    }
87
88    /// The number of remotely initiated open streams of a certain directionality.
89    pub fn remote_open_streams(&self, dir: Dir) -> u64 {
90        self.state.next_remote[dir as usize]
91            - (self.state.max_remote[dir as usize]
92                - self.state.allocated_remote_count[dir as usize])
93    }
94}
95
96/// Access to streams
97pub struct RecvStream<'a> {
98    pub(super) id: StreamId,
99    pub(super) state: &'a mut StreamsState,
100    pub(super) pending: &'a mut Retransmits,
101}
102
103impl RecvStream<'_> {
104    /// Read from the given recv stream
105    ///
106    /// `max_length` limits the maximum size of the returned `Bytes` value.
107    /// `ordered` ensures the returned chunk's offset is sequential.
108    ///
109    /// Yields `Ok(None)` if the stream was finished. Otherwise, yields a segment of data and its
110    /// offset in the stream.
111    ///
112    /// Unordered reads can improve performance when packet loss occurs, but ordered reads
113    /// on streams that have seen previous unordered reads will return `ReadError::IllegalOrderedRead`.
114    pub fn read(&mut self, ordered: bool) -> Result<Chunks, ReadableError> {
115        if self.state.conn_closed() {
116            return Err(ReadableError::ConnectionClosed);
117        }
118        
119        Chunks::new(self.id, ordered, self.state, self.pending)
120    }
121
122    /// Stop accepting data on the given receive stream
123    ///
124    /// Discards unread data and notifies the peer to stop transmitting.
125    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
126        if self.state.conn_closed() {
127            return Err(ClosedStream { _private: () });
128        }
129        
130        let mut entry = match self.state.recv.entry(self.id) {
131            hash_map::Entry::Occupied(s) => s,
132            hash_map::Entry::Vacant(_) => return Err(ClosedStream { _private: () }),
133        };
134        let stream = get_or_insert_recv(self.state.stream_receive_window)(entry.get_mut());
135
136        let (read_credits, stop_sending) = stream.stop()?;
137        if stop_sending.should_transmit() {
138            self.pending.stop_sending.push(frame::StopSending {
139                id: self.id,
140                error_code,
141            });
142        }
143
144        // Clean up stream state if possible
145        if !stream.final_offset_unknown() {
146            let recv = entry.remove().expect("must have recv when stopping");
147            self.state.stream_recv_freed(self.id, recv);
148        }
149
150        // Update flow control if needed
151        if self.state.add_read_credits(read_credits).should_transmit() {
152            self.pending.max_data = true;
153        }
154
155        Ok(())
156    }
157
158    /// Check whether this stream has been reset by the peer
159    ///
160    /// Returns the reset error code if the stream was reset.
161    pub fn received_reset(&mut self) -> Result<Option<VarInt>, ClosedStream> {
162        if self.state.conn_closed() {
163            return Err(ClosedStream { _private: () });
164        }
165        
166        let hash_map::Entry::Occupied(entry) = self.state.recv.entry(self.id) else {
167            return Err(ClosedStream { _private: () });
168        };
169        
170        let Some(s) = entry.get().as_ref().and_then(|s| s.as_open_recv()) else {
171            return Ok(None);
172        };
173        
174        if s.stopped {
175            return Err(ClosedStream { _private: () });
176        }
177        
178        let Some(code) = s.reset_code() else {
179            return Ok(None);
180        };
181
182        // Clean up state after application observes the reset
183        let (_, recv) = entry.remove_entry();
184        self.state
185            .stream_recv_freed(self.id, recv.expect("must have recv on reset"));
186        self.state.queue_max_stream_id(self.pending);
187
188        Ok(Some(code))
189    }
190}
191
192/// Access to streams
193pub struct SendStream<'a> {
194    pub(super) id: StreamId,
195    pub(super) state: &'a mut StreamsState,
196    pub(super) pending: &'a mut Retransmits,
197    pub(super) conn_state: &'a super::State,
198}
199
200#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
201impl<'a> SendStream<'a> {
202    #[cfg(fuzzing)]
203    pub fn new(
204        id: StreamId,
205        state: &'a mut StreamsState,
206        pending: &'a mut Retransmits,
207        conn_state: &'a super::State,
208    ) -> Self {
209        Self {
210            id,
211            state,
212            pending,
213            conn_state,
214        }
215    }
216
217    /// Send data on the given stream
218    ///
219    /// Returns the number of bytes successfully written.
220    pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
221        Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
222    }
223
224    /// Send data on the given stream
225    ///
226    /// Returns the number of bytes and chunks successfully written.
227    /// Note that this method might also write a partial chunk. In this case
228    /// [`Written::chunks`] will not count this chunk as fully written. However
229    /// the chunk will be advanced and contain only non-written data after the call.
230    pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
231        self.write_source(&mut BytesArray::from_chunks(data))
232    }
233
234    fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> {
235        if self.conn_state.is_closed() {
236            trace!(%self.id, "write blocked; connection draining");
237            return Err(WriteError::Blocked);
238        }
239
240        let limit = self.state.write_limit();
241
242        let max_send_data = self.state.max_send_data(self.id);
243
244        let stream = self
245            .state
246            .send
247            .get_mut(&self.id)
248            .map(get_or_insert_send(max_send_data))
249            .ok_or(WriteError::ClosedStream)?;
250
251        if limit == 0 {
252            trace!(
253                stream = %self.id, max_data = self.state.max_data, data_sent = self.state.data_sent,
254                "write blocked by connection-level flow control or send window"
255            );
256            if !stream.connection_blocked {
257                stream.connection_blocked = true;
258                self.state.connection_blocked.push(self.id);
259            }
260            return Err(WriteError::Blocked);
261        }
262
263        let was_pending = stream.is_pending();
264        let written = stream.write(source, limit)?;
265        self.state.data_sent += written.bytes as u64;
266        self.state.unacked_data += written.bytes as u64;
267        trace!(stream = %self.id, "wrote {} bytes", written.bytes);
268        if !was_pending {
269            self.state.pending.push_pending(self.id, stream.priority);
270        }
271        Ok(written)
272    }
273
274    /// Check if this stream was stopped, get the reason if it was
275    pub fn stopped(&self) -> Result<Option<VarInt>, ClosedStream> {
276        match self.state.send.get(&self.id).as_ref() {
277            Some(Some(s)) => Ok(s.stop_reason),
278            Some(None) => Ok(None),
279            None => Err(ClosedStream { _private: () }),
280        }
281    }
282
283    /// Finish a send stream, signalling that no more data will be sent.
284    ///
285    /// If this fails, no [`StreamEvent::Finished`] will be generated.
286    ///
287    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
288    pub fn finish(&mut self) -> Result<(), FinishError> {
289        let max_send_data = self.state.max_send_data(self.id);
290        let stream = self
291            .state
292            .send
293            .get_mut(&self.id)
294            .map(get_or_insert_send(max_send_data))
295            .ok_or(FinishError::ClosedStream)?;
296
297        let was_pending = stream.is_pending();
298        stream.finish()?;
299        if !was_pending {
300            self.state.pending.push_pending(self.id, stream.priority);
301        }
302
303        Ok(())
304    }
305
306    /// Abandon transmitting data on a stream
307    ///
308    /// # Panics
309    /// - when applied to a receive stream
310    pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
311        let max_send_data = self.state.max_send_data(self.id);
312        let stream = self
313            .state
314            .send
315            .get_mut(&self.id)
316            .map(get_or_insert_send(max_send_data))
317            .ok_or(ClosedStream { _private: () })?;
318
319        if matches!(stream.state, SendState::ResetSent) {
320            // Redundant reset call
321            return Err(ClosedStream { _private: () });
322        }
323
324        // Restore the portion of the send window consumed by the data that we aren't about to
325        // send. We leave flow control alone because the peer's responsible for issuing additional
326        // credit based on the final offset communicated in the RESET_STREAM frame we send.
327        self.state.unacked_data -= stream.pending.unacked();
328        stream.reset();
329        self.pending.reset_stream.push((self.id, error_code));
330
331        // Don't reopen an already-closed stream we haven't forgotten yet
332        Ok(())
333    }
334
335    /// Set the priority of a stream
336    ///
337    /// # Panics
338    /// - when applied to a receive stream
339    pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> {
340        let max_send_data = self.state.max_send_data(self.id);
341        let stream = self
342            .state
343            .send
344            .get_mut(&self.id)
345            .map(get_or_insert_send(max_send_data))
346            .ok_or(ClosedStream { _private: () })?;
347
348        stream.priority = priority;
349        Ok(())
350    }
351
352    /// Get the priority of a stream
353    ///
354    /// # Panics
355    /// - when applied to a receive stream
356    pub fn priority(&self) -> Result<i32, ClosedStream> {
357        let stream = self
358            .state
359            .send
360            .get(&self.id)
361            .ok_or(ClosedStream { _private: () })?;
362
363        Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
364    }
365}
366
367/// A queue of streams with pending outgoing data, sorted by priority
368struct PendingStreamsQueue {
369    streams: BinaryHeap<PendingStream>,
370    /// The next stream to write out. This is `Some` when writing a stream is
371    /// interrupted while the stream still has some pending data.
372    next: Option<PendingStream>,
373    /// A monotonically decreasing counter for round-robin scheduling of streams with the same priority
374    recency: u64,
375}
376
377impl PendingStreamsQueue {
378    fn new() -> Self {
379        Self {
380            streams: BinaryHeap::new(),
381            next: None,
382            recency: u64::MAX,
383        }
384    }
385
386    /// Reinsert a stream that was pending and still contains unsent data.
387    fn reinsert_pending(&mut self, id: StreamId, priority: i32) {
388        if self.next.is_some() {
389            warn!("Attempting to reinsert a pending stream when next is already set");
390            return;
391        }
392
393        self.next = Some(PendingStream {
394            priority,
395            recency: self.recency,
396            id,
397        });
398    }
399
400    /// Push a pending stream ID with the given priority
401    fn push_pending(&mut self, id: StreamId, priority: i32) {
402        // Decrement recency to ensure round-robin scheduling for streams of the same priority
403        self.recency = self.recency.saturating_sub(1);
404        self.streams.push(PendingStream {
405            priority,
406            recency: self.recency,
407            id,
408        });
409    }
410
411    /// Pop the highest priority stream
412    fn pop(&mut self) -> Option<PendingStream> {
413        self.next.take().or_else(|| self.streams.pop())
414    }
415
416    /// Clear all pending streams
417    fn clear(&mut self) {
418        self.next = None;
419        self.streams.clear();
420    }
421
422    /// Iterate over all pending streams
423    fn iter(&self) -> impl Iterator<Item = &PendingStream> {
424        self.next.iter().chain(self.streams.iter())
425    }
426
427    #[cfg(test)]
428    fn len(&self) -> usize {
429        self.streams.len() + self.next.is_some() as usize
430    }
431}
432
433/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency
434#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
435struct PendingStream {
436    /// The priority of the stream
437    // Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct
438    // (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable)
439    priority: i32,
440    /// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling:
441    /// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know
442    /// that it currently has the highest recency value, so it is deprioritized by setting its recency to 1 less than the
443    /// previous lowest recency value, such that all other streams of this priority will get processed once before we get back
444    /// round to this one
445    recency: u64,
446    /// The ID of the stream
447    // The way this type is used ensures that every instance has a unique `recency` value, so this field should be kept below
448    // the `priority` and `recency` fields, so that it does not interfere with the behaviour of the `Ord` derive
449    id: StreamId,
450}
451
452/// Application events about streams
453#[derive(Debug, PartialEq, Eq)]
454pub enum StreamEvent {
455    /// One or more new streams has been opened and might be readable
456    Opened {
457        /// Directionality for which streams have been opened
458        dir: Dir,
459    },
460    /// A currently open stream likely has data or errors waiting to be read
461    Readable {
462        /// Which stream is now readable
463        id: StreamId,
464    },
465    /// A formerly write-blocked stream might be ready for a write or have been stopped
466    ///
467    /// Only generated for streams that are currently open.
468    Writable {
469        /// Which stream is now writable
470        id: StreamId,
471    },
472    /// A finished stream has been fully acknowledged or stopped
473    Finished {
474        /// Which stream has been finished
475        id: StreamId,
476    },
477    /// The peer asked us to stop sending on an outgoing stream
478    Stopped {
479        /// Which stream has been stopped
480        id: StreamId,
481        /// Error code supplied by the peer
482        error_code: VarInt,
483    },
484    /// At least one new stream of a certain directionality may be opened
485    Available {
486        /// Directionality for which streams are newly available
487        dir: Dir,
488    },
489}
490
491/// Indicates whether a frame needs to be transmitted
492///
493/// This type wraps around bool and uses the `#[must_use]` attribute in order
494/// to prevent accidental loss of the frame transmission requirement.
495#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
496#[must_use = "A frame might need to be enqueued"]
497pub struct ShouldTransmit(bool);
498
499impl ShouldTransmit {
500    /// Returns whether a frame should be transmitted
501    pub fn should_transmit(self) -> bool {
502        self.0
503    }
504}
505
506/// Error indicating that a stream has not been opened or has already been finished or reset
507#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
508#[error("closed stream")]
509pub struct ClosedStream {
510    _private: (),
511}
512
513impl From<ClosedStream> for io::Error {
514    fn from(x: ClosedStream) -> Self {
515        Self::new(io::ErrorKind::NotConnected, x)
516    }
517}
518
519#[derive(Debug, Copy, Clone, Eq, PartialEq)]
520enum StreamHalf {
521    Send,
522    Recv,
523}