iroh_quinn_proto/connection/streams/
mod.rs

1use std::{
2    collections::{hash_map, BinaryHeap},
3    io,
4};
5
6use bytes::Bytes;
7use thiserror::Error;
8use tracing::trace;
9
10use super::spaces::{Retransmits, ThinRetransmits};
11use crate::{
12    connection::streams::state::{get_or_insert_recv, get_or_insert_send},
13    frame, Dir, StreamId, VarInt,
14};
15
16mod recv;
17use recv::Recv;
18pub use recv::{Chunks, ReadError, ReadableError};
19
20mod send;
21pub(crate) use send::{ByteSlice, BytesArray};
22pub use send::{BytesSource, FinishError, WriteError, Written};
23use send::{Send, SendState};
24
25mod state;
26#[allow(unreachable_pub)] // fuzzing only
27pub use state::StreamsState;
28
29/// Access to streams
30pub struct Streams<'a> {
31    pub(super) state: &'a mut StreamsState,
32    pub(super) conn_state: &'a super::State,
33}
34
35#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
36impl<'a> Streams<'a> {
37    #[cfg(fuzzing)]
38    pub fn new(state: &'a mut StreamsState, conn_state: &'a super::State) -> Self {
39        Self { state, conn_state }
40    }
41
42    /// Open a single stream if possible
43    ///
44    /// Returns `None` if the streams in the given direction are currently exhausted.
45    pub fn open(&mut self, dir: Dir) -> Option<StreamId> {
46        if self.conn_state.is_closed() {
47            return None;
48        }
49
50        // TODO: Queue STREAM_ID_BLOCKED if this fails
51        if self.state.next[dir as usize] >= self.state.max[dir as usize] {
52            return None;
53        }
54
55        self.state.next[dir as usize] += 1;
56        let id = StreamId::new(self.state.side, dir, self.state.next[dir as usize] - 1);
57        self.state.insert(false, id);
58        self.state.send_streams += 1;
59        Some(id)
60    }
61
62    /// Accept a remotely initiated stream of a certain directionality, if possible
63    ///
64    /// Returns `None` if there are no new incoming streams for this connection.
65    /// Has no impact on the data flow-control or stream concurrency limits.
66    pub fn accept(&mut self, dir: Dir) -> Option<StreamId> {
67        if self.state.next_remote[dir as usize] == self.state.next_reported_remote[dir as usize] {
68            return None;
69        }
70
71        let x = self.state.next_reported_remote[dir as usize];
72        self.state.next_reported_remote[dir as usize] = x + 1;
73        if dir == Dir::Bi {
74            self.state.send_streams += 1;
75        }
76
77        Some(StreamId::new(!self.state.side, dir, x))
78    }
79
80    #[cfg(fuzzing)]
81    pub fn state(&mut self) -> &mut StreamsState {
82        self.state
83    }
84
85    /// The number of streams that may have unacknowledged data.
86    pub fn send_streams(&self) -> usize {
87        self.state.send_streams
88    }
89
90    /// The number of remotely initiated open streams of a certain directionality.
91    ///
92    /// Includes remotely initiated streams, which have not been accepted via [`accept`](Self::accept).
93    /// These streams count against the respective concurrency limit reported by
94    /// [`Connection::max_concurrent_streams`](super::Connection::max_concurrent_streams).
95    pub fn remote_open_streams(&self, dir: Dir) -> u64 {
96        // total opened - total closed = total opened - ( total permitted - total permitted unclosed )
97        self.state.next_remote[dir as usize]
98            - (self.state.max_remote[dir as usize]
99                - self.state.allocated_remote_count[dir as usize])
100    }
101}
102
103/// Access to streams
104pub struct RecvStream<'a> {
105    pub(super) id: StreamId,
106    pub(super) state: &'a mut StreamsState,
107    pub(super) pending: &'a mut Retransmits,
108}
109
110impl RecvStream<'_> {
111    /// Read from the given recv stream
112    ///
113    /// `max_length` limits the maximum size of the returned `Bytes` value; passing `usize::MAX`
114    /// will yield the best performance. `ordered` will make sure the returned chunk's offset will
115    /// have an offset exactly equal to the previously returned offset plus the previously returned
116    /// bytes' length.
117    ///
118    /// Yields `Ok(None)` if the stream was finished. Otherwise, yields a segment of data and its
119    /// offset in the stream. If `ordered` is `false`, segments may be received in any order, and
120    /// the `Chunk`'s `offset` field can be used to determine ordering in the caller.
121    ///
122    /// While most applications will prefer to consume stream data in order, unordered reads can
123    /// improve performance when packet loss occurs and data cannot be retransmitted before the flow
124    /// control window is filled. On any given stream, you can switch from ordered to unordered
125    /// reads, but ordered reads on streams that have seen previous unordered reads will return
126    /// `ReadError::IllegalOrderedRead`.
127    pub fn read(&mut self, ordered: bool) -> Result<Chunks, ReadableError> {
128        Chunks::new(self.id, ordered, self.state, self.pending)
129    }
130
131    /// Stop accepting data on the given receive stream
132    ///
133    /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
134    /// attempts to operate on a stream will yield `ClosedStream` errors.
135    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
136        let mut entry = match self.state.recv.entry(self.id) {
137            hash_map::Entry::Occupied(s) => s,
138            hash_map::Entry::Vacant(_) => return Err(ClosedStream { _private: () }),
139        };
140        let stream = get_or_insert_recv(self.state.stream_receive_window)(entry.get_mut());
141
142        let (read_credits, stop_sending) = stream.stop()?;
143        if stop_sending.should_transmit() {
144            self.pending.stop_sending.push(frame::StopSending {
145                id: self.id,
146                error_code,
147            });
148        }
149
150        // We need to keep stopped streams around until they're finished or reset so we can update
151        // connection-level flow control to account for discarded data. Otherwise, we can discard
152        // state immediately.
153        if !stream.final_offset_unknown() {
154            let recv = entry.remove().expect("must have recv when stopping");
155            self.state.stream_recv_freed(self.id, recv);
156        }
157
158        if self.state.add_read_credits(read_credits).should_transmit() {
159            self.pending.max_data = true;
160        }
161
162        Ok(())
163    }
164
165    /// Check whether this stream has been reset by the peer, returning the reset error code if so
166    ///
167    /// After returning `Ok(Some(_))` once, stream state will be discarded and all future calls will
168    /// return `Err(ClosedStream)`.
169    pub fn received_reset(&mut self) -> Result<Option<VarInt>, ClosedStream> {
170        let hash_map::Entry::Occupied(entry) = self.state.recv.entry(self.id) else {
171            return Err(ClosedStream { _private: () });
172        };
173        let Some(s) = entry.get().as_ref().and_then(|s| s.as_open_recv()) else {
174            return Ok(None);
175        };
176        if s.stopped {
177            return Err(ClosedStream { _private: () });
178        }
179        let Some(code) = s.reset_code() else {
180            return Ok(None);
181        };
182
183        // Clean up state after application observes the reset, since there's no reason for the
184        // application to attempt to read or stop the stream once it knows it's reset
185        let (_, recv) = entry.remove_entry();
186        self.state
187            .stream_recv_freed(self.id, recv.expect("must have recv on reset"));
188        self.state.queue_max_stream_id(self.pending);
189
190        Ok(Some(code))
191    }
192}
193
194/// Access to streams
195pub struct SendStream<'a> {
196    pub(super) id: StreamId,
197    pub(super) state: &'a mut StreamsState,
198    pub(super) pending: &'a mut Retransmits,
199    pub(super) conn_state: &'a super::State,
200}
201
202#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
203impl<'a> SendStream<'a> {
204    #[cfg(fuzzing)]
205    pub fn new(
206        id: StreamId,
207        state: &'a mut StreamsState,
208        pending: &'a mut Retransmits,
209        conn_state: &'a super::State,
210    ) -> Self {
211        Self {
212            id,
213            state,
214            pending,
215            conn_state,
216        }
217    }
218
219    /// Send data on the given stream
220    ///
221    /// Returns the number of bytes successfully written.
222    pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
223        Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
224    }
225
226    /// Send data on the given stream
227    ///
228    /// Returns the number of bytes and chunks successfully written.
229    /// Note that this method might also write a partial chunk. In this case
230    /// [`Written::chunks`] will not count this chunk as fully written. However
231    /// the chunk will be advanced and contain only non-written data after the call.
232    pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
233        self.write_source(&mut BytesArray::from_chunks(data))
234    }
235
236    fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> {
237        if self.conn_state.is_closed() {
238            trace!(%self.id, "write blocked; connection draining");
239            return Err(WriteError::Blocked);
240        }
241
242        let limit = self.state.write_limit();
243
244        let max_send_data = self.state.max_send_data(self.id);
245
246        let stream = self
247            .state
248            .send
249            .get_mut(&self.id)
250            .map(get_or_insert_send(max_send_data))
251            .ok_or(WriteError::ClosedStream)?;
252
253        if limit == 0 {
254            trace!(
255                stream = %self.id, max_data = self.state.max_data, data_sent = self.state.data_sent,
256                "write blocked by connection-level flow control or send window"
257            );
258            if !stream.connection_blocked {
259                stream.connection_blocked = true;
260                self.state.connection_blocked.push(self.id);
261            }
262            return Err(WriteError::Blocked);
263        }
264
265        let was_pending = stream.is_pending();
266        let written = stream.write(source, limit)?;
267        self.state.data_sent += written.bytes as u64;
268        self.state.unacked_data += written.bytes as u64;
269        trace!(stream = %self.id, "wrote {} bytes", written.bytes);
270        if !was_pending {
271            self.state.pending.push_pending(self.id, stream.priority);
272        }
273        Ok(written)
274    }
275
276    /// Check if this stream was stopped, get the reason if it was
277    pub fn stopped(&self) -> Result<Option<VarInt>, ClosedStream> {
278        match self.state.send.get(&self.id).as_ref() {
279            Some(Some(s)) => Ok(s.stop_reason),
280            Some(None) => Ok(None),
281            None => Err(ClosedStream { _private: () }),
282        }
283    }
284
285    /// Finish a send stream, signalling that no more data will be sent.
286    ///
287    /// If this fails, no [`StreamEvent::Finished`] will be generated.
288    ///
289    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
290    pub fn finish(&mut self) -> Result<(), FinishError> {
291        let max_send_data = self.state.max_send_data(self.id);
292        let stream = self
293            .state
294            .send
295            .get_mut(&self.id)
296            .map(get_or_insert_send(max_send_data))
297            .ok_or(FinishError::ClosedStream)?;
298
299        let was_pending = stream.is_pending();
300        stream.finish()?;
301        if !was_pending {
302            self.state.pending.push_pending(self.id, stream.priority);
303        }
304
305        Ok(())
306    }
307
308    /// Abandon transmitting data on a stream
309    ///
310    /// # Panics
311    /// - when applied to a receive stream
312    pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
313        let max_send_data = self.state.max_send_data(self.id);
314        let stream = self
315            .state
316            .send
317            .get_mut(&self.id)
318            .map(get_or_insert_send(max_send_data))
319            .ok_or(ClosedStream { _private: () })?;
320
321        if matches!(stream.state, SendState::ResetSent) {
322            // Redundant reset call
323            return Err(ClosedStream { _private: () });
324        }
325
326        // Restore the portion of the send window consumed by the data that we aren't about to
327        // send. We leave flow control alone because the peer's responsible for issuing additional
328        // credit based on the final offset communicated in the RESET_STREAM frame we send.
329        self.state.unacked_data -= stream.pending.unacked();
330        stream.reset();
331        self.pending.reset_stream.push((self.id, error_code));
332
333        // Don't reopen an already-closed stream we haven't forgotten yet
334        Ok(())
335    }
336
337    /// Set the priority of a stream
338    ///
339    /// # Panics
340    /// - when applied to a receive stream
341    pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> {
342        let max_send_data = self.state.max_send_data(self.id);
343        let stream = self
344            .state
345            .send
346            .get_mut(&self.id)
347            .map(get_or_insert_send(max_send_data))
348            .ok_or(ClosedStream { _private: () })?;
349
350        stream.priority = priority;
351        Ok(())
352    }
353
354    /// Get the priority of a stream
355    ///
356    /// # Panics
357    /// - when applied to a receive stream
358    pub fn priority(&self) -> Result<i32, ClosedStream> {
359        let stream = self
360            .state
361            .send
362            .get(&self.id)
363            .ok_or(ClosedStream { _private: () })?;
364
365        Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
366    }
367}
368
369/// A queue of streams with pending outgoing data, sorted by priority
370struct PendingStreamsQueue {
371    streams: BinaryHeap<PendingStream>,
372    /// The next stream to write out. This is `Some` when `TransportConfig::send_fairness(false)` and writing a stream is
373    /// interrupted while the stream still has some pending data. See `reinsert_pending()`.
374    next: Option<PendingStream>,
375    /// A monotonically decreasing counter, used to implement round-robin scheduling for streams of the same priority.
376    /// Underflowing is not a practical concern, as it is initialized to u64::MAX and only decremented by 1 in `push_pending`
377    recency: u64,
378}
379
380impl PendingStreamsQueue {
381    fn new() -> Self {
382        Self {
383            streams: BinaryHeap::new(),
384            next: None,
385            recency: u64::MAX,
386        }
387    }
388
389    /// Reinsert a stream that was pending and still contains unsent data.
390    fn reinsert_pending(&mut self, id: StreamId, priority: i32) {
391        assert!(self.next.is_none());
392
393        self.next = Some(PendingStream {
394            priority,
395            recency: self.recency, // the value here doesn't really matter
396            id,
397        });
398    }
399
400    /// Push a pending stream ID with the given priority, queued after any already-queued streams for the priority
401    fn push_pending(&mut self, id: StreamId, priority: i32) {
402        // Note that in the case where fairness is disabled, if we have a reinserted stream we don't
403        // bump it even if priority > next.priority. In order to minimize fragmentation we
404        // always try to complete a stream once part of it has been written.
405
406        // As the recency counter is monotonically decreasing, we know that using its value to sort this stream will queue it
407        // after all other queued streams of the same priority.
408        // This is enough to implement round-robin scheduling for streams that are still pending even after being handled,
409        // as in that case they are removed from the `BinaryHeap`, handled, and then immediately reinserted.
410        self.recency -= 1;
411        self.streams.push(PendingStream {
412            priority,
413            recency: self.recency,
414            id,
415        });
416    }
417
418    fn pop(&mut self) -> Option<PendingStream> {
419        self.next.take().or_else(|| self.streams.pop())
420    }
421
422    fn clear(&mut self) {
423        self.next = None;
424        self.streams.clear();
425    }
426
427    fn iter(&self) -> impl Iterator<Item = &PendingStream> {
428        self.next.iter().chain(self.streams.iter())
429    }
430
431    #[cfg(test)]
432    fn len(&self) -> usize {
433        self.streams.len() + self.next.is_some() as usize
434    }
435}
436
437/// The [`StreamId`] of a stream with pending data queued, ordered by its priority and recency
438#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
439struct PendingStream {
440    /// The priority of the stream
441    // Note that this field should be kept above the `recency` field, in order for the `Ord` derive to be correct
442    // (See https://doc.rust-lang.org/stable/std/cmp/trait.Ord.html#derivable)
443    priority: i32,
444    /// A tie-breaker for streams of the same priority, used to improve fairness by implementing round-robin scheduling:
445    /// Larger values are prioritized, so it is initialised to `u64::MAX`, and when a stream writes data, we know
446    /// that it currently has the highest recency value, so it is deprioritized by setting its recency to 1 less than the
447    /// previous lowest recency value, such that all other streams of this priority will get processed once before we get back
448    /// round to this one
449    recency: u64,
450    /// The ID of the stream
451    // The way this type is used ensures that every instance has a unique `recency` value, so this field should be kept below
452    // the `priority` and `recency` fields, so that it does not interfere with the behaviour of the `Ord` derive
453    id: StreamId,
454}
455
456/// Application events about streams
457#[derive(Debug, PartialEq, Eq)]
458pub enum StreamEvent {
459    /// One or more new streams has been opened and might be readable
460    Opened {
461        /// Directionality for which streams have been opened
462        dir: Dir,
463    },
464    /// A currently open stream likely has data or errors waiting to be read
465    Readable {
466        /// Which stream is now readable
467        id: StreamId,
468    },
469    /// A formerly write-blocked stream might be ready for a write or have been stopped
470    ///
471    /// Only generated for streams that are currently open.
472    Writable {
473        /// Which stream is now writable
474        id: StreamId,
475    },
476    /// A finished stream has been fully acknowledged or stopped
477    Finished {
478        /// Which stream has been finished
479        id: StreamId,
480    },
481    /// The peer asked us to stop sending on an outgoing stream
482    Stopped {
483        /// Which stream has been stopped
484        id: StreamId,
485        /// Error code supplied by the peer
486        error_code: VarInt,
487    },
488    /// At least one new stream of a certain directionality may be opened
489    Available {
490        /// Directionality for which streams are newly available
491        dir: Dir,
492    },
493}
494
495/// Indicates whether a frame needs to be transmitted
496///
497/// This type wraps around bool and uses the `#[must_use]` attribute in order
498/// to prevent accidental loss of the frame transmission requirement.
499#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
500#[must_use = "A frame might need to be enqueued"]
501pub struct ShouldTransmit(bool);
502
503impl ShouldTransmit {
504    /// Returns whether a frame should be transmitted
505    pub fn should_transmit(self) -> bool {
506        self.0
507    }
508}
509
510/// Error indicating that a stream has not been opened or has already been finished or reset
511#[derive(Debug, Default, Error, Clone, PartialEq, Eq)]
512#[error("closed stream")]
513pub struct ClosedStream {
514    _private: (),
515}
516
517impl From<ClosedStream> for io::Error {
518    fn from(x: ClosedStream) -> Self {
519        Self::new(io::ErrorKind::NotConnected, x)
520    }
521}
522
523#[derive(Debug, Copy, Clone, Eq, PartialEq)]
524enum StreamHalf {
525    Send,
526    Recv,
527}