compio_quic/
recv_stream.rs

1use std::{
2    collections::BTreeMap,
3    io,
4    sync::Arc,
5    task::{Context, Poll},
6};
7
8use compio_buf::{
9    BufResult, IoBufMut,
10    bytes::{BufMut, Bytes},
11};
12use compio_io::AsyncRead;
13use futures_util::{future::poll_fn, ready};
14use quinn_proto::{Chunk, Chunks, ClosedStream, ReadableError, StreamId, VarInt};
15use thiserror::Error;
16
17use crate::{ConnectionError, ConnectionInner, StoppedError};
18
19/// A stream that can only be used to receive data
20///
21/// `stop(0)` is implicitly called on drop unless:
22/// - A variant of [`ReadError`] has been yielded by a read call
23/// - [`stop()`] was called explicitly
24///
25/// # Cancellation
26///
27/// A `read` method is said to be *cancel-safe* when dropping its future before
28/// the future becomes ready cannot lead to loss of stream data. This is true of
29/// methods which succeed immediately when any progress is made, and is not true
30/// of methods which might need to perform multiple reads internally before
31/// succeeding. Each `read` method documents whether it is cancel-safe.
32///
33/// # Common issues
34///
35/// ## Data never received on a locally-opened stream
36///
37/// Peers are not notified of streams until they or a later-numbered stream are
38/// used to send data. If a bidirectional stream is locally opened but never
39/// used to send, then the peer may never see it. Application protocols should
40/// always arrange for the endpoint which will first transmit on a stream to be
41/// the endpoint responsible for opening it.
42///
43/// ## Data never received on a remotely-opened stream
44///
45/// Verify that the stream you are receiving is the same one that the server is
46/// sending on, e.g. by logging the [`id`] of each. Streams are always accepted
47/// in the same order as they are created, i.e. ascending order by [`StreamId`].
48/// For example, even if a sender first transmits on bidirectional stream 1, the
49/// first stream yielded by [`Connection::accept_bi`] on the receiver
50/// will be bidirectional stream 0.
51///
52/// [`stop()`]: RecvStream::stop
53/// [`id`]: RecvStream::id
54/// [`Connection::accept_bi`]: crate::Connection::accept_bi
55#[derive(Debug)]
56pub struct RecvStream {
57    conn: Arc<ConnectionInner>,
58    stream: StreamId,
59    is_0rtt: bool,
60    all_data_read: bool,
61    reset: Option<VarInt>,
62}
63
64impl RecvStream {
65    pub(crate) fn new(conn: Arc<ConnectionInner>, stream: StreamId, is_0rtt: bool) -> Self {
66        Self {
67            conn,
68            stream,
69            is_0rtt,
70            all_data_read: false,
71            reset: None,
72        }
73    }
74
75    /// Get the identity of this stream
76    pub fn id(&self) -> StreamId {
77        self.stream
78    }
79
80    /// Check if this stream has been opened during 0-RTT.
81    ///
82    /// In which case any non-idempotent request should be considered dangerous
83    /// at the application level. Because read data is subject to replay
84    /// attacks.
85    pub fn is_0rtt(&self) -> bool {
86        self.is_0rtt
87    }
88
89    /// Stop accepting data
90    ///
91    /// Discards unread data and notifies the peer to stop transmitting. Once
92    /// stopped, further attempts to operate on a stream will yield
93    /// `ClosedStream` errors.
94    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
95        let mut state = self.conn.state();
96        if self.is_0rtt && !state.check_0rtt() {
97            return Ok(());
98        }
99        state.conn.recv_stream(self.stream).stop(error_code)?;
100        state.wake();
101        self.all_data_read = true;
102        Ok(())
103    }
104
105    /// Completes when the stream has been reset by the peer or otherwise
106    /// closed.
107    ///
108    /// Yields `Some` with the reset error code when the stream is reset by the
109    /// peer. Yields `None` when the stream was previously
110    /// [`stop()`](Self::stop)ed, or when the stream was
111    /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has
112    /// been received, after which it is no longer meaningful for the stream to
113    /// be reset.
114    ///
115    /// This operation is cancel-safe.
116    pub async fn stopped(&mut self) -> Result<Option<VarInt>, StoppedError> {
117        poll_fn(|cx| {
118            let mut state = self.conn.state();
119
120            if self.is_0rtt && !state.check_0rtt() {
121                return Poll::Ready(Err(StoppedError::ZeroRttRejected));
122            }
123            if let Some(code) = self.reset {
124                return Poll::Ready(Ok(Some(code)));
125            }
126
127            match state.conn.recv_stream(self.stream).received_reset() {
128                Err(_) => Poll::Ready(Ok(None)),
129                Ok(Some(error_code)) => {
130                    // Stream state has just now been freed, so the connection may need to issue new
131                    // stream ID flow control credit
132                    state.wake();
133                    Poll::Ready(Ok(Some(error_code)))
134                }
135                Ok(None) => {
136                    if let Some(e) = &state.error {
137                        return Poll::Ready(Err(e.clone().into()));
138                    }
139                    // Resets always notify readers, since a reset is an immediate read error. We
140                    // could introduce a dedicated channel to reduce the risk of spurious wakeups,
141                    // but that increased complexity is probably not justified, as an application
142                    // that is expecting a reset is not likely to receive large amounts of data.
143                    state.readable.insert(self.stream, cx.waker().clone());
144                    Poll::Pending
145                }
146            }
147        })
148        .await
149    }
150
151    /// Handle common logic related to reading out of a receive stream.
152    ///
153    /// This takes an `FnMut` closure that takes care of the actual reading
154    /// process, matching the detailed read semantics for the calling
155    /// function with a particular return type. The closure can read from
156    /// the passed `&mut Chunks` and has to return the status after reading:
157    /// the amount of data read, and the status after the final read call.
158    fn execute_poll_read<F, T>(
159        &mut self,
160        cx: &mut Context,
161        ordered: bool,
162        mut read_fn: F,
163    ) -> Poll<Result<Option<T>, ReadError>>
164    where
165        F: FnMut(&mut Chunks) -> ReadStatus<T>,
166    {
167        use quinn_proto::ReadError::*;
168
169        if self.all_data_read {
170            return Poll::Ready(Ok(None));
171        }
172
173        let mut state = self.conn.state();
174        if self.is_0rtt && !state.check_0rtt() {
175            return Poll::Ready(Err(ReadError::ZeroRttRejected));
176        }
177
178        // If we stored an error during a previous call, return it now. This can happen
179        // if a `read_fn` both wants to return data and also returns an error in
180        // its final stream status.
181        let status = match self.reset {
182            Some(code) => ReadStatus::Failed(None, Reset(code)),
183            None => {
184                let mut recv = state.conn.recv_stream(self.stream);
185                let mut chunks = recv.read(ordered)?;
186                let status = read_fn(&mut chunks);
187                if chunks.finalize().should_transmit() {
188                    state.wake();
189                }
190                status
191            }
192        };
193
194        match status {
195            ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
196            ReadStatus::Finished(read) => {
197                self.all_data_read = true;
198                Poll::Ready(Ok(read))
199            }
200            ReadStatus::Failed(read, Blocked) => match read {
201                Some(val) => Poll::Ready(Ok(Some(val))),
202                None => {
203                    if let Some(error) = &state.error {
204                        return Poll::Ready(Err(error.clone().into()));
205                    }
206                    state.readable.insert(self.stream, cx.waker().clone());
207                    Poll::Pending
208                }
209            },
210            ReadStatus::Failed(read, Reset(error_code)) => match read {
211                None => {
212                    self.all_data_read = true;
213                    self.reset = Some(error_code);
214                    Poll::Ready(Err(ReadError::Reset(error_code)))
215                }
216                done => {
217                    self.reset = Some(error_code);
218                    Poll::Ready(Ok(done))
219                }
220            },
221        }
222    }
223
224    fn poll_read(
225        &mut self,
226        cx: &mut Context,
227        mut buf: impl BufMut,
228    ) -> Poll<Result<Option<usize>, ReadError>> {
229        if !buf.has_remaining_mut() {
230            return Poll::Ready(Ok(Some(0)));
231        }
232
233        self.execute_poll_read(cx, true, |chunks| {
234            let mut read = 0;
235            loop {
236                if !buf.has_remaining_mut() {
237                    // We know `read` is `true` because `buf.remaining()` was not 0 before
238                    return ReadStatus::Readable(read);
239                }
240
241                match chunks.next(buf.remaining_mut()) {
242                    Ok(Some(chunk)) => {
243                        read += chunk.bytes.len();
244                        buf.put(chunk.bytes);
245                    }
246                    res => {
247                        return (if read == 0 { None } else { Some(read) }, res.err()).into();
248                    }
249                }
250            }
251        })
252    }
253
254    /// Read data contiguously from the stream.
255    ///
256    /// Yields the number of bytes read into `buf` on success, or `None` if the
257    /// stream was finished.
258    ///
259    /// This operation is cancel-safe.
260    pub async fn read(&mut self, mut buf: impl BufMut) -> Result<Option<usize>, ReadError> {
261        poll_fn(|cx| self.poll_read(cx, &mut buf)).await
262    }
263
264    /// Read an exact number of bytes contiguously from the stream.
265    ///
266    /// See [`read()`] for details. This operation is *not* cancel-safe.
267    ///
268    /// [`read()`]: RecvStream::read
269    pub async fn read_exact(&mut self, mut buf: impl BufMut) -> Result<(), ReadExactError> {
270        poll_fn(|cx| {
271            while buf.has_remaining_mut() {
272                if ready!(self.poll_read(cx, &mut buf))?.is_none() {
273                    return Poll::Ready(Err(ReadExactError::FinishedEarly(buf.remaining_mut())));
274                }
275            }
276            Poll::Ready(Ok(()))
277        })
278        .await
279    }
280
281    /// Read the next segment of data.
282    ///
283    /// Yields `None` if the stream was finished. Otherwise, yields a segment of
284    /// data and its offset in the stream. If `ordered` is `true`, the chunk's
285    /// offset will be immediately after the last data yielded by
286    /// [`read()`](Self::read) or [`read_chunk()`](Self::read_chunk). If
287    /// `ordered` is `false`, segments may be received in any order, and the
288    /// `Chunk`'s `offset` field can be used to determine ordering in the
289    /// caller. Unordered reads are less prone to head-of-line blocking within a
290    /// stream, but require the application to manage reassembling the original
291    /// data.
292    ///
293    /// Slightly more efficient than `read` due to not copying. Chunk boundaries
294    /// do not correspond to peer writes, and hence cannot be used as framing.
295    ///
296    /// This operation is cancel-safe.
297    pub async fn read_chunk(
298        &mut self,
299        max_length: usize,
300        ordered: bool,
301    ) -> Result<Option<Chunk>, ReadError> {
302        poll_fn(|cx| {
303            self.execute_poll_read(cx, ordered, |chunks| match chunks.next(max_length) {
304                Ok(Some(chunk)) => ReadStatus::Readable(chunk),
305                res => (None, res.err()).into(),
306            })
307        })
308        .await
309    }
310
311    /// Read the next segments of data.
312    ///
313    /// Fills `bufs` with the segments of data beginning immediately after the
314    /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
315    /// finished.
316    ///
317    /// Slightly more efficient than `read` due to not copying. Chunk boundaries
318    /// do not correspond to peer writes, and hence cannot be used as framing.
319    ///
320    /// This operation is cancel-safe.
321    pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
322        if bufs.is_empty() {
323            return Ok(Some(0));
324        }
325
326        poll_fn(|cx| {
327            self.execute_poll_read(cx, true, |chunks| {
328                let mut read = 0;
329                loop {
330                    if read >= bufs.len() {
331                        // We know `read > 0` because `bufs` cannot be empty here
332                        return ReadStatus::Readable(read);
333                    }
334
335                    match chunks.next(usize::MAX) {
336                        Ok(Some(chunk)) => {
337                            bufs[read] = chunk.bytes;
338                            read += 1;
339                        }
340                        res => {
341                            return (if read == 0 { None } else { Some(read) }, res.err()).into();
342                        }
343                    }
344                }
345            })
346        })
347        .await
348    }
349
350    /// Convenience method to read all remaining data into a buffer.
351    ///
352    /// Uses unordered reads to be more efficient than using [`AsyncRead`]. If
353    /// unordered reads have already been made, the resulting buffer may have
354    /// gaps containing zero.
355    ///
356    /// Depending on [`BufMut`] implementation, this method may fail with
357    /// [`ReadError::BufferTooShort`] if the buffer is not large enough to
358    /// hold the entire stream. For example when using a `&mut [u8]` it will
359    /// never receive bytes more than the length of the slice, but when using a
360    /// `&mut Vec<u8>` it will allocate more memory as needed.
361    ///
362    /// This operation is *not* cancel-safe.
363    pub async fn read_to_end(&mut self, mut buf: impl BufMut) -> Result<usize, ReadError> {
364        let mut start = u64::MAX;
365        let mut end = 0;
366        let mut chunks = BTreeMap::new();
367        loop {
368            let Some(chunk) = self.read_chunk(usize::MAX, false).await? else {
369                break;
370            };
371            start = start.min(chunk.offset);
372            end = end.max(chunk.offset + chunk.bytes.len() as u64);
373            if end - start > buf.remaining_mut() as u64 {
374                return Err(ReadError::BufferTooShort);
375            }
376            chunks.insert(chunk.offset, chunk.bytes);
377        }
378        let mut last = 0;
379        for (offset, bytes) in chunks {
380            let offset = (offset - start) as usize;
381            if offset > last {
382                buf.put_bytes(0, offset - last);
383            }
384            last = offset + bytes.len();
385            buf.put(bytes);
386        }
387        Ok((end - start) as usize)
388    }
389}
390
391impl Drop for RecvStream {
392    fn drop(&mut self) {
393        let mut state = self.conn.state();
394
395        // clean up any previously registered wakers
396        state.readable.remove(&self.stream);
397
398        if state.error.is_some() || (self.is_0rtt && !state.check_0rtt()) {
399            return;
400        }
401        if !self.all_data_read {
402            // Ignore ClosedStream errors
403            let _ = state.conn.recv_stream(self.stream).stop(0u32.into());
404            state.wake();
405        }
406    }
407}
408
409enum ReadStatus<T> {
410    Readable(T),
411    Finished(Option<T>),
412    Failed(Option<T>, quinn_proto::ReadError),
413}
414
415impl<T> From<(Option<T>, Option<quinn_proto::ReadError>)> for ReadStatus<T> {
416    fn from(status: (Option<T>, Option<quinn_proto::ReadError>)) -> Self {
417        match status {
418            (read, None) => Self::Finished(read),
419            (read, Some(e)) => Self::Failed(read, e),
420        }
421    }
422}
423
424/// Errors that arise from reading from a stream.
425#[derive(Debug, Error, Clone, PartialEq, Eq)]
426pub enum ReadError {
427    /// The peer abandoned transmitting data on this stream.
428    ///
429    /// Carries an application-defined error code.
430    #[error("stream reset by peer: error {0}")]
431    Reset(VarInt),
432    /// The connection was lost.
433    #[error("connection lost")]
434    ConnectionLost(#[from] ConnectionError),
435    /// The stream has already been stopped, finished, or reset.
436    #[error("closed stream")]
437    ClosedStream,
438    /// Attempted an ordered read following an unordered read.
439    ///
440    /// Performing an unordered read allows discontinuities to arise in the
441    /// receive buffer of a stream which cannot be recovered, making further
442    /// ordered reads impossible.
443    #[error("ordered read after unordered read")]
444    IllegalOrderedRead,
445    /// This was a 0-RTT stream and the server rejected it.
446    ///
447    /// Can only occur on clients for 0-RTT streams, which can be opened using
448    /// [`Connecting::into_0rtt()`].
449    ///
450    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
451    #[error("0-RTT rejected")]
452    ZeroRttRejected,
453    /// The stream is larger than the user-supplied buffer capacity.
454    ///
455    /// Can only occur when using [`read_to_end()`](RecvStream::read_to_end).
456    #[error("buffer too short")]
457    BufferTooShort,
458}
459
460impl From<ReadableError> for ReadError {
461    fn from(e: ReadableError) -> Self {
462        match e {
463            ReadableError::ClosedStream => Self::ClosedStream,
464            ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
465        }
466    }
467}
468
469impl From<StoppedError> for ReadError {
470    fn from(e: StoppedError) -> Self {
471        match e {
472            StoppedError::ConnectionLost(e) => Self::ConnectionLost(e),
473            StoppedError::ZeroRttRejected => Self::ZeroRttRejected,
474        }
475    }
476}
477
478impl From<ReadError> for io::Error {
479    fn from(x: ReadError) -> Self {
480        use self::ReadError::*;
481        let kind = match x {
482            Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
483            ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
484            IllegalOrderedRead | BufferTooShort => io::ErrorKind::InvalidInput,
485        };
486        Self::new(kind, x)
487    }
488}
489
490/// Errors that arise from reading from a stream.
491#[derive(Debug, Error, Clone, PartialEq, Eq)]
492pub enum ReadExactError {
493    /// The stream finished before all bytes were read
494    #[error("stream finished early (expected {0} bytes more)")]
495    FinishedEarly(usize),
496    /// A read error occurred
497    #[error(transparent)]
498    ReadError(#[from] ReadError),
499}
500
501impl AsyncRead for RecvStream {
502    async fn read<B: IoBufMut>(&mut self, mut buf: B) -> BufResult<usize, B> {
503        let res = self
504            .read(buf.as_mut_slice())
505            .await
506            .map(|n| {
507                let n = n.unwrap_or_default();
508                unsafe { buf.set_buf_init(n) }
509                n
510            })
511            .map_err(Into::into);
512        BufResult(res, buf)
513    }
514}
515
516#[cfg(feature = "io-compat")]
517impl futures_util::AsyncRead for RecvStream {
518    fn poll_read(
519        self: std::pin::Pin<&mut Self>,
520        cx: &mut Context<'_>,
521        buf: &mut [u8],
522    ) -> Poll<io::Result<usize>> {
523        self.get_mut()
524            .poll_read(cx, buf)
525            .map_ok(Option::unwrap_or_default)
526            .map_err(Into::into)
527    }
528}
529
530#[cfg(feature = "h3")]
531pub(crate) mod h3_impl {
532    use h3::quic::{self, StreamErrorIncoming};
533
534    use super::*;
535
536    impl From<ReadError> for StreamErrorIncoming {
537        fn from(e: ReadError) -> Self {
538            use ReadError::*;
539            match e {
540                Reset(code) => Self::StreamTerminated {
541                    error_code: code.into_inner(),
542                },
543                ConnectionLost(e) => Self::ConnectionErrorIncoming {
544                    connection_error: e.into(),
545                },
546                IllegalOrderedRead => unreachable!("illegal ordered read"),
547                e => Self::Unknown(Box::new(e)),
548            }
549        }
550    }
551
552    impl quic::RecvStream for RecvStream {
553        type Buf = Bytes;
554
555        fn poll_data(
556            &mut self,
557            cx: &mut Context<'_>,
558        ) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
559            self.execute_poll_read(cx, true, |chunks| match chunks.next(usize::MAX) {
560                Ok(Some(chunk)) => ReadStatus::Readable(chunk.bytes),
561                res => (None, res.err()).into(),
562            })
563            .map_err(Into::into)
564        }
565
566        fn stop_sending(&mut self, error_code: u64) {
567            self.stop(error_code.try_into().expect("invalid error_code"))
568                .ok();
569        }
570
571        fn recv_id(&self) -> quic::StreamId {
572            u64::from(self.stream).try_into().unwrap()
573        }
574    }
575}