ant_quic/connection/streams/
recv.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::collections::hash_map::Entry;
9use std::mem;
10
11use thiserror::Error;
12use tracing::debug;
13
14use super::state::get_or_insert_recv;
15use super::{ClosedStream, Retransmits, ShouldTransmit, StreamId, StreamsState};
16use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
17use crate::connection::streams::state::StreamRecv;
18use crate::{TransportError, VarInt, frame};
19
20#[derive(Debug, Default)]
21pub(super) struct Recv {
22    // NB: when adding or removing fields, remember to update `reinit`.
23    state: RecvState,
24    pub(super) assembler: Assembler,
25    sent_max_stream_data: u64,
26    pub(super) end: u64,
27    pub(super) stopped: bool,
28}
29
30impl Recv {
31    pub(super) fn new(initial_max_data: u64) -> Box<Self> {
32        Box::new(Self {
33            state: RecvState::default(),
34            assembler: Assembler::new(),
35            sent_max_stream_data: initial_max_data,
36            end: 0,
37            stopped: false,
38        })
39    }
40
41    /// Reset to the initial state
42    pub(super) fn reinit(&mut self, initial_max_data: u64) {
43        self.state = RecvState::default();
44        self.assembler.reinit();
45        self.sent_max_stream_data = initial_max_data;
46        self.end = 0;
47        self.stopped = false;
48    }
49
50    /// Process a STREAM frame
51    ///
52    /// Return value is `(number_of_new_bytes_ingested, stream_is_closed)`
53    pub(super) fn ingest(
54        &mut self,
55        frame: frame::Stream,
56        payload_len: usize,
57        received: u64,
58        max_data: u64,
59    ) -> Result<(u64, bool), TransportError> {
60        let end = frame.offset + frame.data.len() as u64;
61        if end >= 2u64.pow(62) {
62            return Err(TransportError::FLOW_CONTROL_ERROR(
63                "maximum stream offset too large",
64            ));
65        }
66
67        if let Some(final_offset) = self.final_offset() {
68            if end > final_offset || (frame.fin && end != final_offset) {
69                debug!(end, final_offset, "final size error");
70                return Err(TransportError::FINAL_SIZE_ERROR(""));
71            }
72        }
73
74        let new_bytes = self.credit_consumed_by(end, received, max_data)?;
75
76        // Stopped streams don't need to wait for the actual data, they just need to know
77        // how much there was.
78        if frame.fin && !self.stopped {
79            if let RecvState::Recv { ref mut size } = self.state {
80                *size = Some(end);
81            }
82        }
83
84        self.end = self.end.max(end);
85        // Don't bother storing data or releasing stream-level flow control credit if the stream's
86        // already stopped
87        if !self.stopped {
88            self.assembler.insert(frame.offset, frame.data, payload_len);
89        }
90
91        Ok((new_bytes, frame.fin && self.stopped))
92    }
93
94    pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), ClosedStream> {
95        if self.stopped {
96            return Err(ClosedStream { _private: () });
97        }
98
99        self.stopped = true;
100        self.assembler.clear();
101        // Issue flow control credit for unread data
102        let read_credits = self.end - self.assembler.bytes_read();
103        // This may send a spurious STOP_SENDING if we've already received all data, but it's a bit
104        // fiddly to distinguish that from the case where we've received a FIN but are missing some
105        // data that the peer might still be trying to retransmit, in which case a STOP_SENDING is
106        // still useful.
107        Ok((read_credits, ShouldTransmit(self.is_receiving())))
108    }
109
110    /// Returns the window that should be advertised in a `MAX_STREAM_DATA` frame
111    ///
112    /// The method returns a tuple which consists of the window that should be
113    /// announced, as well as a boolean parameter which indicates if a new
114    /// transmission of the value is recommended. If the boolean value is
115    /// `false` the new window should only be transmitted if a previous transmission
116    /// had failed.
117    pub(super) fn max_stream_data(&mut self, stream_receive_window: u64) -> (u64, ShouldTransmit) {
118        let max_stream_data = self.assembler.bytes_read() + stream_receive_window;
119
120        // Only announce a window update if it's significant enough
121        // to make it worthwhile sending a MAX_STREAM_DATA frame.
122        // We use here a fraction of the configured stream receive window to make
123        // the decision, and accommodate for streams using bigger windows requiring
124        // less updates. A fixed size would also work - but it would need to be
125        // smaller than `stream_receive_window` in order to make sure the stream
126        // does not get stuck.
127        let diff = max_stream_data - self.sent_max_stream_data;
128        let transmit = self.can_send_flow_control() && diff >= (stream_receive_window / 8);
129        (max_stream_data, ShouldTransmit(transmit))
130    }
131
132    /// Records that a `MAX_STREAM_DATA` announcing a certain window was sent
133    ///
134    /// This will suppress enqueuing further `MAX_STREAM_DATA` frames unless
135    /// either the previous transmission was not acknowledged or the window
136    /// further increased.
137    pub(super) fn record_sent_max_stream_data(&mut self, sent_value: u64) {
138        if sent_value > self.sent_max_stream_data {
139            self.sent_max_stream_data = sent_value;
140        }
141    }
142
143    /// Whether the total amount of data that the peer will send on this stream is unknown
144    ///
145    /// True until we've received either a reset or the final frame.
146    ///
147    /// Implies that the sender might benefit from stream-level flow control updates, and we might
148    /// need to issue connection-level flow control updates due to flow control budget use by this
149    /// stream in the future, even if it's been stopped.
150    pub(super) fn final_offset_unknown(&self) -> bool {
151        matches!(self.state, RecvState::Recv { size: None })
152    }
153
154    /// Whether stream-level flow control updates should be sent for this stream
155    pub(super) fn can_send_flow_control(&self) -> bool {
156        // Stream-level flow control is redundant if the sender has already sent the whole stream,
157        // and moot if we no longer want data on this stream.
158        self.final_offset_unknown() && !self.stopped
159    }
160
161    /// Whether data is still being accepted from the peer
162    pub(super) fn is_receiving(&self) -> bool {
163        matches!(self.state, RecvState::Recv { .. })
164    }
165
166    fn final_offset(&self) -> Option<u64> {
167        match self.state {
168            RecvState::Recv { size } => size,
169            RecvState::ResetRecvd { size, .. } => Some(size),
170        }
171    }
172
173    /// Returns `false` iff the reset was redundant
174    pub(super) fn reset(
175        &mut self,
176        error_code: VarInt,
177        final_offset: VarInt,
178        received: u64,
179        max_data: u64,
180    ) -> Result<bool, TransportError> {
181        // Validate final_offset
182        if let Some(offset) = self.final_offset() {
183            if offset != final_offset.into_inner() {
184                return Err(TransportError::FINAL_SIZE_ERROR("inconsistent value"));
185            }
186        } else if self.end > u64::from(final_offset) {
187            return Err(TransportError::FINAL_SIZE_ERROR(
188                "lower than high water mark",
189            ));
190        }
191        self.credit_consumed_by(final_offset.into(), received, max_data)?;
192
193        if matches!(self.state, RecvState::ResetRecvd { .. }) {
194            return Ok(false);
195        }
196        self.state = RecvState::ResetRecvd {
197            size: final_offset.into(),
198            error_code,
199        };
200        // Nuke buffers so that future reads fail immediately, which ensures future reads don't
201        // issue flow control credit redundant to that already issued. We could instead special-case
202        // reset streams during read, but it's unclear if there's any benefit to retaining data for
203        // reset streams.
204        self.assembler.clear();
205        Ok(true)
206    }
207
208    pub(super) fn reset_code(&self) -> Option<VarInt> {
209        match self.state {
210            RecvState::ResetRecvd { error_code, .. } => Some(error_code),
211            _ => None,
212        }
213    }
214
215    /// Compute the amount of flow control credit consumed, or return an error if more was consumed
216    /// than issued
217    fn credit_consumed_by(
218        &self,
219        offset: u64,
220        received: u64,
221        max_data: u64,
222    ) -> Result<u64, TransportError> {
223        let prev_end = self.end;
224        let new_bytes = offset.saturating_sub(prev_end);
225        if offset > self.sent_max_stream_data || received + new_bytes > max_data {
226            debug!(
227                received,
228                new_bytes,
229                max_data,
230                offset,
231                stream_max_data = self.sent_max_stream_data,
232                "flow control error"
233            );
234            return Err(TransportError::FLOW_CONTROL_ERROR(""));
235        }
236
237        Ok(new_bytes)
238    }
239}
240
241/// Chunks returned from [`RecvStream::read()`][crate::RecvStream::read].
242///
243/// ### Note: Finalization Needed
244/// Bytes read from the stream are not released from the congestion window until
245/// either [`Self::finalize()`] is called, or this type is dropped.
246///
247/// It is recommended that you call [`Self::finalize()`] because it returns a flag
248/// telling you whether reading from the stream has resulted in the need to transmit a packet.
249///
250/// If this type is leaked, the stream will remain blocked on the remote peer until
251/// another read from the stream is done.
252pub struct Chunks<'a> {
253    id: StreamId,
254    ordered: bool,
255    streams: &'a mut StreamsState,
256    pending: &'a mut Retransmits,
257    state: ChunksState,
258    read: u64,
259}
260
261impl<'a> Chunks<'a> {
262    pub(super) fn new(
263        id: StreamId,
264        ordered: bool,
265        streams: &'a mut StreamsState,
266        pending: &'a mut Retransmits,
267    ) -> Result<Self, ReadableError> {
268        let mut entry = match streams.recv.entry(id) {
269            Entry::Occupied(entry) => entry,
270            Entry::Vacant(_) => return Err(ReadableError::ClosedStream),
271        };
272
273        let mut recv =
274            match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
275                true => return Err(ReadableError::ClosedStream),
276                false => entry.remove().unwrap().into_inner(), // this can't fail due to the previous get_or_insert_with
277            };
278
279        recv.assembler.ensure_ordering(ordered)?;
280        Ok(Self {
281            id,
282            ordered,
283            streams,
284            pending,
285            state: ChunksState::Readable(recv),
286            read: 0,
287        })
288    }
289
290    /// Next
291    ///
292    /// Should call finalize() when done calling this.
293    pub fn next(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
294        let rs = match self.state {
295            ChunksState::Readable(ref mut rs) => rs,
296            ChunksState::Reset(error_code) => {
297                return Err(ReadError::Reset(error_code));
298            }
299            ChunksState::Finished => {
300                return Ok(None);
301            }
302            ChunksState::Finalized => panic!("must not call next() after finalize()"),
303        };
304
305        if let Some(chunk) = rs.assembler.read(max_length, self.ordered) {
306            self.read += chunk.bytes.len() as u64;
307            return Ok(Some(chunk));
308        }
309
310        match rs.state {
311            RecvState::ResetRecvd { error_code, .. } => {
312                debug_assert_eq!(self.read, 0, "reset streams have empty buffers");
313                let state = mem::replace(&mut self.state, ChunksState::Reset(error_code));
314                // At this point if we have `rs` self.state must be `ChunksState::Readable`
315                let recv = match state {
316                    ChunksState::Readable(recv) => StreamRecv::Open(recv),
317                    _ => unreachable!("state must be ChunkState::Readable"),
318                };
319                self.streams.stream_recv_freed(self.id, recv);
320                Err(ReadError::Reset(error_code))
321            }
322            RecvState::Recv { size } => {
323                if size == Some(rs.end) && rs.assembler.bytes_read() == rs.end {
324                    let state = mem::replace(&mut self.state, ChunksState::Finished);
325                    // At this point if we have `rs` self.state must be `ChunksState::Readable`
326                    let recv = match state {
327                        ChunksState::Readable(recv) => StreamRecv::Open(recv),
328                        _ => unreachable!("state must be ChunkState::Readable"),
329                    };
330                    self.streams.stream_recv_freed(self.id, recv);
331                    Ok(None)
332                } else {
333                    // We don't need a distinct `ChunksState` variant for a blocked stream because
334                    // retrying a read harmlessly re-traces our steps back to returning
335                    // `Err(Blocked)` again. The buffers can't refill and the stream's own state
336                    // can't change so long as this `Chunks` exists.
337                    Err(ReadError::Blocked)
338                }
339            }
340        }
341    }
342
343    /// Mark the read data as consumed from the stream.
344    ///
345    /// The number of read bytes will be released from the congestion window,
346    /// allowing the remote peer to send more data if it was previously blocked.
347    ///
348    /// If [`ShouldTransmit::should_transmit()`] returns `true`,
349    /// a packet needs to be sent to the peer informing them that the stream is unblocked.
350    /// This means that you should call [`Connection::poll_transmit()`][crate::Connection::poll_transmit]
351    /// and send the returned packet as soon as is reasonable, to unblock the remote peer.
352    pub fn finalize(mut self) -> ShouldTransmit {
353        self.finalize_inner()
354    }
355
356    fn finalize_inner(&mut self) -> ShouldTransmit {
357        let state = mem::replace(&mut self.state, ChunksState::Finalized);
358        if let ChunksState::Finalized = state {
359            // Noop on repeated calls
360            return ShouldTransmit(false);
361        }
362
363        // We issue additional stream ID credit after the application is notified that a previously
364        // open stream has finished or been reset and we've therefore disposed of its state, as
365        // recorded by `stream_freed` calls in `next`.
366        let mut should_transmit = self.streams.queue_max_stream_id(self.pending);
367
368        // If the stream hasn't finished, we may need to issue stream-level flow control credit
369        if let ChunksState::Readable(mut rs) = state {
370            let (_, max_stream_data) = rs.max_stream_data(self.streams.stream_receive_window);
371            should_transmit |= max_stream_data.0;
372            if max_stream_data.0 {
373                self.pending.max_stream_data.insert(self.id);
374            }
375            // Return the stream to storage for future use
376            self.streams
377                .recv
378                .insert(self.id, Some(StreamRecv::Open(rs)));
379        }
380
381        // Issue connection-level flow control credit for any data we read regardless of state
382        let max_data = self.streams.add_read_credits(self.read);
383        self.pending.max_data |= max_data.0;
384        should_transmit |= max_data.0;
385        ShouldTransmit(should_transmit)
386    }
387}
388
389impl Drop for Chunks<'_> {
390    fn drop(&mut self) {
391        let _ = self.finalize_inner();
392    }
393}
394
395enum ChunksState {
396    Readable(Box<Recv>),
397    Reset(VarInt),
398    Finished,
399    Finalized,
400}
401
402/// Errors triggered when reading from a recv stream
403#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
404pub enum ReadError {
405    /// No more data is currently available on this stream.
406    ///
407    /// If more data on this stream is received from the peer, an `Event::StreamReadable` will be
408    /// generated for this stream, indicating that retrying the read might succeed.
409    #[error("blocked")]
410    Blocked,
411    /// The peer abandoned transmitting data on this stream.
412    ///
413    /// Carries an application-defined error code.
414    #[error("reset by peer: code {0}")]
415    Reset(VarInt),
416    /// The stream has been closed due to connection error
417    #[error("stream closed due to connection error")]
418    ConnectionClosed,
419}
420
421/// Errors triggered when opening a recv stream for reading
422#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
423pub enum ReadableError {
424    /// The stream has not been opened or was already stopped, finished, or reset
425    #[error("closed stream")]
426    ClosedStream,
427    /// Attempted an ordered read following an unordered read
428    ///
429    /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
430    /// stream which cannot be recovered, making further ordered reads impossible.
431    #[error("ordered read after unordered read")]
432    IllegalOrderedRead,
433    /// The stream has been closed due to connection error
434    #[error("stream closed due to connection error")]
435    ConnectionClosed,
436}
437
438impl From<IllegalOrderedRead> for ReadableError {
439    fn from(_: IllegalOrderedRead) -> Self {
440        Self::IllegalOrderedRead
441    }
442}
443
444#[derive(Debug, Copy, Clone, Eq, PartialEq)]
445enum RecvState {
446    Recv { size: Option<u64> },
447    ResetRecvd { size: u64, error_code: VarInt },
448}
449
450impl Default for RecvState {
451    fn default() -> Self {
452        Self::Recv { size: None }
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use bytes::Bytes;
459
460    use crate::{Dir, Side};
461
462    use super::*;
463
464    #[test]
465    fn reordered_frames_while_stopped() {
466        const INITIAL_BYTES: u64 = 3;
467        const INITIAL_OFFSET: u64 = 3;
468        const RECV_WINDOW: u64 = 8;
469        let mut s = Recv::new(RECV_WINDOW);
470        let mut data_recvd = 0;
471        // Receive bytes 3..6
472        let (new_bytes, is_closed) = s
473            .ingest(
474                frame::Stream {
475                    id: StreamId::new(Side::Client, Dir::Uni, 0),
476                    offset: INITIAL_OFFSET,
477                    fin: false,
478                    data: Bytes::from_static(&[0; INITIAL_BYTES as usize]),
479                },
480                123,
481                data_recvd,
482                data_recvd + 1024,
483            )
484            .unwrap();
485        data_recvd += new_bytes;
486        assert_eq!(new_bytes, INITIAL_OFFSET + INITIAL_BYTES);
487        assert!(!is_closed);
488
489        let (credits, transmit) = s.stop().unwrap();
490        assert!(transmit.should_transmit());
491        assert_eq!(
492            credits,
493            INITIAL_OFFSET + INITIAL_BYTES,
494            "full connection flow control credit is issued by stop"
495        );
496
497        let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
498        assert!(!transmit.should_transmit());
499        assert_eq!(
500            max_stream_data, RECV_WINDOW,
501            "stream flow control credit isn't issued by stop"
502        );
503
504        // Receive byte 7
505        let (new_bytes, is_closed) = s
506            .ingest(
507                frame::Stream {
508                    id: StreamId::new(Side::Client, Dir::Uni, 0),
509                    offset: RECV_WINDOW - 1,
510                    fin: false,
511                    data: Bytes::from_static(&[0; 1]),
512                },
513                123,
514                data_recvd,
515                data_recvd + 1024,
516            )
517            .unwrap();
518        data_recvd += new_bytes;
519        assert_eq!(new_bytes, RECV_WINDOW - (INITIAL_OFFSET + INITIAL_BYTES));
520        assert!(!is_closed);
521
522        let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
523        assert!(!transmit.should_transmit());
524        assert_eq!(
525            max_stream_data, RECV_WINDOW,
526            "stream flow control credit isn't issued after stop"
527        );
528
529        // Receive bytes 0..3
530        let (new_bytes, is_closed) = s
531            .ingest(
532                frame::Stream {
533                    id: StreamId::new(Side::Client, Dir::Uni, 0),
534                    offset: 0,
535                    fin: false,
536                    data: Bytes::from_static(&[0; INITIAL_OFFSET as usize]),
537                },
538                123,
539                data_recvd,
540                data_recvd + 1024,
541            )
542            .unwrap();
543        assert_eq!(
544            new_bytes, 0,
545            "reordered frames don't issue connection-level flow control for stopped streams"
546        );
547        assert!(!is_closed);
548
549        let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
550        assert!(!transmit.should_transmit());
551        assert_eq!(
552            max_stream_data, RECV_WINDOW,
553            "stream flow control credit isn't issued after stop"
554        );
555    }
556}