ant_quic/connection/streams/
mod.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    collections::{BinaryHeap, hash_map},
10    io,
11};
12
13use bytes::Bytes;
14use thiserror::Error;
15use tracing::{trace, warn};
16
17use super::spaces::{Retransmits, ThinRetransmits};
18use crate::{
19    Dir, StreamId, VarInt,
20    connection::streams::state::{get_or_insert_recv, get_or_insert_send},
21    frame,
22};
23
24mod recv;
25use recv::Recv;
26pub use recv::{Chunks, ReadError, ReadableError};
27
28mod send;
29pub(crate) use send::{ByteSlice, BytesArray};
30use send::{BytesSource, Send, SendState};
31pub use send::{FinishError, WriteError, Written};
32
33mod state;
34pub use state::StreamsState;
35
36/// Access to streams
37pub struct Streams<'a> {
38    pub(super) state: &'a mut StreamsState,
39    pub(super) conn_state: &'a super::State,
40}
41
42impl<'a> Streams<'a> {
43    #[cfg(fuzzing)]
44    pub fn new(state: &'a mut StreamsState, conn_state: &'a super::State) -> Self {
45        Self { state, conn_state }
46    }
47
48    /// Open a single stream if possible
49    ///
50    /// Returns `None` if the streams in the given direction are currently exhausted.
51    pub fn open(&mut self, dir: Dir) -> Option<StreamId> {
52        if self.conn_state.is_closed() {
53            return None;
54        }
55
56        if self.state.next[dir as usize] >= self.state.max[dir as usize] {
57            return None;
58        }
59
60        self.state.next[dir as usize] += 1;
61        let id = StreamId::new(self.state.side, dir, self.state.next[dir as usize] - 1);
62        self.state.insert(false, id);
63        self.state.send_streams += 1;
64        Some(id)
65    }
66
67    /// Accept a remotely initiated stream of a certain directionality, if possible
68    ///
69    /// Returns `None` if there are no new incoming streams for this connection.
70    pub fn accept(&mut self, dir: Dir) -> Option<StreamId> {
71        if self.state.next_remote[dir as usize] == self.state.next_reported_remote[dir as usize] {
72            return None;
73        }
74
75        let x = self.state.next_reported_remote[dir as usize];
76        self.state.next_reported_remote[dir as usize] = x + 1;
77        if dir == Dir::Bi {
78            self.state.send_streams += 1;
79        }
80
81        Some(StreamId::new(!self.state.side, dir, x))
82    }
83
84    #[cfg(fuzzing)]
85    pub fn state(&mut self) -> &mut StreamsState {
86        self.state
87    }
88
89    /// The number of streams that may have unacknowledged data.
90    pub fn send_streams(&self) -> usize {
91        self.state.send_streams
92    }
93
94    /// The number of remotely initiated open streams of a certain directionality.
95    pub fn remote_open_streams(&self, dir: Dir) -> u64 {
96        self.state.next_remote[dir as usize]
97            - (self.state.max_remote[dir as usize]
98                - self.state.allocated_remote_count[dir as usize])
99    }
100}
101
102/// Access to streams
103pub struct RecvStream<'a> {
104    pub(super) id: StreamId,
105    pub(super) state: &'a mut StreamsState,
106    pub(super) pending: &'a mut Retransmits,
107}
108
109impl RecvStream<'_> {
110    /// Read from the given recv stream
111    ///
112    /// `max_length` limits the maximum size of the returned `Bytes` value.
113    /// `ordered` ensures the returned chunk's offset is sequential.
114    ///
115    /// Yields `Ok(None)` if the stream was finished. Otherwise, yields a segment of data and its
116    /// offset in the stream.
117    ///
118    /// Unordered reads can improve performance when packet loss occurs, but ordered reads
119    /// on streams that have seen previous unordered reads will return `ReadError::IllegalOrderedRead`.
120    pub fn read(&mut self, ordered: bool) -> Result<Chunks<'_>, ReadableError> {
121        if self.state.conn_closed() {
122            return Err(ReadableError::ConnectionClosed);
123        }
124
125        Chunks::new(self.id, ordered, self.state, self.pending)
126    }
127
128    /// Stop accepting data on the given receive stream
129    ///
130    /// Discards unread data and notifies the peer to stop transmitting.
131    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
132        if self.state.conn_closed() {
133            return Err(ClosedStream { _private: () });
134        }
135
136        let mut entry = match self.state.recv.entry(self.id) {
137            hash_map::Entry::Occupied(s) => s,
138            hash_map::Entry::Vacant(_) => return Err(ClosedStream { _private: () }),
139        };
140        let stream = get_or_insert_recv(self.state.stream_receive_window)(entry.get_mut());
141
142        let (read_credits, stop_sending) = stream.stop()?;
143        if stop_sending.should_transmit() {
144            self.pending.stop_sending.push(frame::StopSending {
145                id: self.id,
146                error_code,
147            });
148        }
149
150        // Clean up stream state if possible
151        if !stream.final_offset_unknown() {
152            let recv = entry.remove().expect("must have recv when stopping");
153            self.state.stream_recv_freed(self.id, recv);
154        }
155
156        // Update flow control if needed
157        if self.state.add_read_credits(read_credits).should_transmit() {
158            self.pending.max_data = true;
159        }
160
161        Ok(())
162    }
163
164    /// Check whether this stream has been reset by the peer
165    ///
166    /// Returns the reset error code if the stream was reset.
167    pub fn received_reset(&mut self) -> Result<Option<VarInt>, ClosedStream> {
168        if self.state.conn_closed() {
169            return Err(ClosedStream { _private: () });
170        }
171
172        let hash_map::Entry::Occupied(entry) = self.state.recv.entry(self.id) else {
173            return Err(ClosedStream { _private: () });
174        };
175
176        let Some(s) = entry.get().as_ref().and_then(|s| s.as_open_recv()) else {
177            return Ok(None);
178        };
179
180        if s.stopped {
181            return Err(ClosedStream { _private: () });
182        }
183
184        let Some(code) = s.reset_code() else {
185            return Ok(None);
186        };
187
188        // Clean up state after application observes the reset
189        let (_, recv) = entry.remove_entry();
190        self.state
191            .stream_recv_freed(self.id, recv.expect("must have recv on reset"));
192        self.state.queue_max_stream_id(self.pending);
193
194        Ok(Some(code))
195    }
196}
197
198/// Access to streams
199pub struct SendStream<'a> {
200    pub(super) id: StreamId,
201    pub(super) state: &'a mut StreamsState,
202    pub(super) pending: &'a mut Retransmits,
203    pub(super) conn_state: &'a super::State,
204}
205
206#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
207impl<'a> SendStream<'a> {
208    #[cfg(fuzzing)]
209    pub fn new(
210        id: StreamId,
211        state: &'a mut StreamsState,
212        pending: &'a mut Retransmits,
213        conn_state: &'a super::State,
214    ) -> Self {
215        Self {
216            id,
217            state,
218            pending,
219            conn_state,
220        }
221    }
222
223    /// Send data on the given stream
224    ///
225    /// Returns the number of bytes successfully written.
226    pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
227        Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
228    }
229
230    /// Send data on the given stream
231    ///
232    /// Returns the number of bytes and chunks successfully written.
233    /// Note that this method might also write a partial chunk. In this case
234    /// [`Written::chunks`] will not count this chunk as fully written. However
235    /// the chunk will be advanced and contain only non-written data after the call.
236    pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
237        self.write_source(&mut BytesArray::from_chunks(data))
238    }
239
240    fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> {
241        if self.conn_state.is_closed() {
242            trace!(%self.id, "write blocked; connection draining");
243            return Err(WriteError::Blocked);
244        }
245
246        let limit = self.state.write_limit();
247
248        let max_send_data = self.state.max_send_data(self.id);
249
250        let stream = self
251            .state
252            .send
253            .get_mut(&self.id)
254            .map(get_or_insert_send(max_send_data))
255            .ok_or(WriteError::ClosedStream)?;
256
257        if limit == 0 {
258            trace!(
259                stream = %self.id, max_data = self.state.max_data, data_sent = self.state.data_sent,
260                "write blocked by connection-level flow control or send window"
261            );
262            if !stream.connection_blocked {
263                stream.connection_blocked = true;
264                self.state.connection_blocked.push(self.id);
265            }
266            return Err(WriteError::Blocked);
267        }
268
269        let was_pending = stream.is_pending();
270        let written = stream.write(source, limit)?;
271        self.state.data_sent += written.bytes as u64;
272        self.state.unacked_data += written.bytes as u64;
273        trace!(stream = %self.id, "wrote {} bytes", written.bytes);
274        if !was_pending {
275            self.state.pending.push_pending(self.id, stream.priority);
276        }
277        Ok(written)
278    }
279
280    /// Check if this stream was stopped, get the reason if it was
281    pub fn stopped(&self) -> Result<Option<VarInt>, ClosedStream> {
282        match self.state.send.get(&self.id).as_ref() {
283            Some(Some(s)) => Ok(s.stop_reason),
284            Some(None) => Ok(None),
285            None => Err(ClosedStream { _private: () }),
286        }
287    }
288
289    /// Finish a send stream, signalling that no more data will be sent.
290    ///
291    /// If this fails, no [`StreamEvent::Finished`] will be generated.
292    ///
293    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
294    pub fn finish(&mut self) -> Result<(), FinishError> {
295        let max_send_data = self.state.max_send_data(self.id);
296        let stream = self
297            .state
298            .send
299            .get_mut(&self.id)
300            .map(get_or_insert_send(max_send_data))
301            .ok_or(FinishError::ClosedStream)?;
302
303        let was_pending = stream.is_pending();
304        stream.finish()?;
305        if !was_pending {
306            self.state.pending.push_pending(self.id, stream.priority);
307        }
308
309        Ok(())
310    }
311
312    /// Abandon transmitting data on a stream
313    ///
314    /// # Panics
315    /// - when applied to a receive stream
316    pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
317        let max_send_data = self.state.max_send_data(self.id);
318        let stream = self
319            .state
320            .send
321            .get_mut(&self.id)
322            .map(get_or_insert_send(max_send_data))
323            .ok_or(ClosedStream { _private: () })?;
324
325        if matches!(stream.state, SendState::ResetSent) {
326            // Redundant reset call
327            return Err(ClosedStream { _private: () });
328        }
329
330        // Restore the portion of the send window consumed by the data that we aren't about to
331        // send. We leave flow control alone because the peer's responsible for issuing additional
332        // credit based on the final offset communicated in the RESET_STREAM frame we send.
333        self.state.unacked_data -= stream.pending.unacked();
334        stream.reset();
335        self.pending.reset_stream.push((self.id, error_code));
336
337        // Don't reopen an already-closed stream we haven't forgotten yet
338        Ok(())
339    }
340
341    /// Set the priority of a stream
342    ///
343    /// # Panics
344    /// - when applied to a receive stream
345    pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> {
346        let max_send_data = self.state.max_send_data(self.id);
347        let stream = self
348            .state
349            .send
350            .get_mut(&self.id)
351            .map(get_or_insert_send(max_send_data))
352            .ok_or(ClosedStream { _private: () })?;
353
354        stream.priority = priority;
355        Ok(())
356    }
357
358    /// Get the priority of a stream
359    ///
360    /// # Panics
361    /// - when applied to a receive stream
362    pub fn priority(&self) -> Result<i32, ClosedStream> {
363        let stream = self
364            .state
365            .send
366            .get(&self.id)
367            .ok_or(ClosedStream { _private: () })?;
368
369        Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
370    }
371}
372
373/// A queue of streams with pending outgoing data, sorted by priority
374struct PendingStreamsQueue {
375    streams: BinaryHeap<PendingStream>,
376    /// The next stream to write out. This is `Some` when writing a stream is
377    /// interrupted while the stream still has some pending data.
378    next: Option<PendingStream>,
379    /// A monotonically decreasing counter for round-robin scheduling of streams with the same priority
380    recency: u64,
381}
382
383impl PendingStreamsQueue {
384    fn new() -> Self {
385        Self {
386            streams: BinaryHeap::new(),
387            next: None,
388            recency: u64::MAX,
389        }
390    }
391
392    /// Reinsert a stream that was pending and still contains unsent data.
393    fn reinsert_pending(&mut self, id: StreamId, priority: i32) {
394        if self.next.is_some() {
395            warn!("Attempting to reinsert a pending stream when next is already set");
396            return;
397        }
398
399        self.next = Some(PendingStream {
400            priority,
401            recency: self.recency,
402            id,
403        });
404    }
405
406    /// Push a pending stream ID with the given priority
407    fn push_pending(&mut self, id: StreamId, priority: i32) {
408        // Decrement recency to ensure round-robin scheduling for streams of the same priority
409        self.recency = self.recency.saturating_sub(1);
410        self.streams.push(PendingStream {
411            priority,
412            recency: self.recency,
413            id,
414        });
415    }
416
417    /// Pop the highest priority stream
418    fn pop(&mut self) -> Option<PendingStream> {
419        self.next.take().or_else(|| self.streams.pop())
420    }
421
422    /// Clear all pending streams
423    fn clear(&mut self) {
424        self.next = None;
425        self.streams.clear();
426    }
427
428    /// Iterate over all pending streams
429    fn iter(&self) -> impl Iterator<Item = &PendingStream> {
430        self.next.iter().chain(self.streams.iter())
431    }
432
433    #[cfg(test)]
434    fn len(&self) -> usize {
435        self.streams.len() + self.next.is_some() as usize
436    }
437}
438
439/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency
440#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
441struct PendingStream {
442    /// The priority of the stream
443    // Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct
444    // (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable)
445    priority: i32,
446    /// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling:
447    /// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know
448    /// that it currently has the highest recency value, so it is deprioritized by setting its recency to 1 less than the
449    /// previous lowest recency value, such that all other streams of this priority will get processed once before we get back
450    /// round to this one
451    recency: u64,
452    /// The ID of the stream
453    // The way this type is used ensures that every instance has a unique `recency` value, so this field should be kept below
454    // the `priority` and `recency` fields, so that it does not interfere with the behaviour of the `Ord` derive
455    id: StreamId,
456}
457
458/// Application events about streams
459#[derive(Debug, PartialEq, Eq)]
460pub enum StreamEvent {
461    /// One or more new streams has been opened and might be readable
462    Opened {
463        /// Directionality for which streams have been opened
464        dir: Dir,
465    },
466    /// A currently open stream likely has data or errors waiting to be read
467    Readable {
468        /// Which stream is now readable
469        id: StreamId,
470    },
471    /// A formerly write-blocked stream might be ready for a write or have been stopped
472    ///
473    /// Only generated for streams that are currently open.
474    Writable {
475        /// Which stream is now writable
476        id: StreamId,
477    },
478    /// A finished stream has been fully acknowledged or stopped
479    Finished {
480        /// Which stream has been finished
481        id: StreamId,
482    },
483    /// The peer asked us to stop sending on an outgoing stream
484    Stopped {
485        /// Which stream has been stopped
486        id: StreamId,
487        /// Error code supplied by the peer
488        error_code: VarInt,
489    },
490    /// At least one new stream of a certain directionality may be opened
491    Available {
492        /// Directionality for which streams are newly available
493        dir: Dir,
494    },
495}
496
497/// Indicates whether a frame needs to be transmitted
498///
499/// This type wraps around bool and uses the `#[must_use]` attribute in order
500/// to prevent accidental loss of the frame transmission requirement.
501#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
502#[must_use = "A frame might need to be enqueued"]
503pub struct ShouldTransmit(bool);
504
505impl ShouldTransmit {
506    /// Returns whether a frame should be transmitted
507    pub fn should_transmit(self) -> bool {
508        self.0
509    }
510}
511
512/// Error indicating that a stream has not been opened or has already been finished or reset
513#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
514#[error("closed stream")]
515pub struct ClosedStream {
516    _private: (),
517}
518
519impl From<ClosedStream> for io::Error {
520    fn from(x: ClosedStream) -> Self {
521        Self::new(io::ErrorKind::NotConnected, x)
522    }
523}
524
525#[derive(Debug, Copy, Clone, Eq, PartialEq)]
526enum StreamHalf {
527    Send,
528    Recv,
529}