kvarn_quinn/
recv_stream.rs

1use std::{
2    future::Future,
3    io,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use proto::{Chunk, Chunks, ConnectionError, ReadableError, StreamId};
10use thiserror::Error;
11use tokio::io::ReadBuf;
12
13use crate::{
14    connection::{ConnectionRef, UnknownStream},
15    VarInt,
16};
17
18/// A stream that can only be used to receive data
19///
20/// `stop(0)` is implicitly called on drop unless:
21/// - A variant of [`ReadError`] has been yielded by a read call
22/// - [`stop()`] was called explicitly
23///
24/// # Common issues
25///
26/// ## Data never received on a locally-opened stream
27///
28/// Peers are not notified of streams until they or a later-numbered stream are used to send
29/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
30/// never see it. Application protocols should always arrange for the endpoint which will first
31/// transmit on a stream to be the endpoint responsible for opening it.
32///
33/// ## Data never received on a remotely-opened stream
34///
35/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
36/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
37/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
38/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
39/// will be bidirectional stream 0.
40///
41/// ## Unexpected [`WriteError::Stopped`] in sender
42///
43/// When a stream is expected to be closed gracefully the sender should call
44/// [`SendStream::finish`].  However there is no guarantee the connected [`RecvStream`] will
45/// receive the "finished" notification in the same QUIC frame as the last frame which
46/// carried data.
47///
48/// Even if the application layer logic already knows it read all the data because it does
49/// its own framing, it should still read until it reaches the end of the [`RecvStream`].
50/// Otherwise it risks inadvertently calling [`RecvStream::stop`] if it drops the stream.
51/// And calling [`RecvStream::stop`] could result in the connected [`SendStream::finish`]
52/// call failing with a [`WriteError::Stopped`] error.
53///
54/// For example if exactly 10 bytes are to be read, you still need to explicitly read the
55/// end of the stream:
56///
57/// ```no_run
58/// # use quinn::{SendStream, RecvStream};
59/// # async fn func(
60/// #     mut send_stream: SendStream,
61/// #     mut recv_stream: RecvStream,
62/// # ) -> anyhow::Result<()>
63/// # {
64/// // In the sending task
65/// send_stream.write(&b"0123456789"[..]).await?;
66/// send_stream.finish().await?;
67///
68/// // In the receiving task
69/// let mut buf = [0u8; 10];
70/// let data = recv_stream.read_exact(&mut buf).await?;
71/// if recv_stream.read_to_end(0).await.is_err() {
72///     // Discard unexpected data and notify the peer to stop sending it
73///     let _ = recv_stream.stop(0u8.into());
74/// }
75/// # Ok(())
76/// # }
77/// ```
78///
79/// An alternative approach, used in HTTP/3, is to specify a particular error code used with `stop`
80/// that indicates graceful receiver-initiated stream shutdown, rather than a true error condition.
81///
82/// [`RecvStream::read_chunk`] could be used instead which does not take ownership and
83/// allows using an explicit call to [`RecvStream::stop`] with a custom error code.
84///
85/// [`ReadError`]: crate::ReadError
86/// [`stop()`]: RecvStream::stop
87/// [`SendStream::finish`]: crate::SendStream::finish
88/// [`WriteError::Stopped`]: crate::WriteError::Stopped
89/// [`id`]: RecvStream::id
90/// [`Connection::accept_bi`]: crate::Connection::accept_bi
91#[derive(Debug)]
92pub struct RecvStream {
93    conn: ConnectionRef,
94    stream: StreamId,
95    is_0rtt: bool,
96    all_data_read: bool,
97    reset: Option<VarInt>,
98}
99
100impl RecvStream {
101    pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
102        Self {
103            conn,
104            stream,
105            is_0rtt,
106            all_data_read: false,
107            reset: None,
108        }
109    }
110
111    /// Read data contiguously from the stream.
112    ///
113    /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
114    pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
115        Read {
116            stream: self,
117            buf: ReadBuf::new(buf),
118        }
119        .await
120    }
121
122    /// Read an exact number of bytes contiguously from the stream.
123    ///
124    /// See [`read()`] for details.
125    ///
126    /// [`read()`]: RecvStream::read
127    pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
128        ReadExact {
129            stream: self,
130            buf: ReadBuf::new(buf),
131        }
132        .await
133    }
134
135    fn poll_read(
136        &mut self,
137        cx: &mut Context,
138        buf: &mut ReadBuf<'_>,
139    ) -> Poll<Result<(), ReadError>> {
140        if buf.remaining() == 0 {
141            return Poll::Ready(Ok(()));
142        }
143
144        self.poll_read_generic(cx, true, |chunks| {
145            let mut read = false;
146            loop {
147                if buf.remaining() == 0 {
148                    // We know `read` is `true` because `buf.remaining()` was not 0 before
149                    return ReadStatus::Readable(());
150                }
151
152                match chunks.next(buf.remaining()) {
153                    Ok(Some(chunk)) => {
154                        buf.put_slice(&chunk.bytes);
155                        read = true;
156                    }
157                    res => return (if read { Some(()) } else { None }, res.err()).into(),
158                }
159            }
160        })
161        .map(|res| res.map(|_| ()))
162    }
163
164    /// Read the next segment of data
165    ///
166    /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
167    /// offset in the stream. If `ordered` is `true`, the chunk's offset will be immediately after
168    /// the last data yielded by `read()` or `read_chunk()`. If `ordered` is `false`, segments may
169    /// be received in any order, and the `Chunk`'s `offset` field can be used to determine
170    /// ordering in the caller. Unordered reads are less prone to head-of-line blocking within a
171    /// stream, but require the application to manage reassembling the original data.
172    ///
173    /// Slightly more efficient than `read` due to not copying. Chunk boundaries do not correspond
174    /// to peer writes, and hence cannot be used as framing.
175    pub async fn read_chunk(
176        &mut self,
177        max_length: usize,
178        ordered: bool,
179    ) -> Result<Option<Chunk>, ReadError> {
180        ReadChunk {
181            stream: self,
182            max_length,
183            ordered,
184        }
185        .await
186    }
187
188    /// Foundation of [`Self::read_chunk`]
189    fn poll_read_chunk(
190        &mut self,
191        cx: &mut Context,
192        max_length: usize,
193        ordered: bool,
194    ) -> Poll<Result<Option<Chunk>, ReadError>> {
195        self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
196            Ok(Some(chunk)) => ReadStatus::Readable(chunk),
197            res => (None, res.err()).into(),
198        })
199    }
200
201    /// Read the next segments of data
202    ///
203    /// Fills `bufs` with the segments of data beginning immediately after the
204    /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
205    /// finished.
206    ///
207    /// Slightly more efficient than `read` due to not copying. Chunk boundaries
208    /// do not correspond to peer writes, and hence cannot be used as framing.
209    pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
210        ReadChunks { stream: self, bufs }.await
211    }
212
213    /// Foundation of [`Self::read_chunks`]
214    fn poll_read_chunks(
215        &mut self,
216        cx: &mut Context,
217        bufs: &mut [Bytes],
218    ) -> Poll<Result<Option<usize>, ReadError>> {
219        if bufs.is_empty() {
220            return Poll::Ready(Ok(Some(0)));
221        }
222
223        self.poll_read_generic(cx, true, |chunks| {
224            let mut read = 0;
225            loop {
226                if read >= bufs.len() {
227                    // We know `read > 0` because `bufs` cannot be empty here
228                    return ReadStatus::Readable(read);
229                }
230
231                match chunks.next(usize::MAX) {
232                    Ok(Some(chunk)) => {
233                        bufs[read] = chunk.bytes;
234                        read += 1;
235                    }
236                    res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
237                }
238            }
239        })
240    }
241
242    /// Convenience method to read all remaining data into a buffer
243    ///
244    /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
245    /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
246    /// allow. `size_limit` should be set to limit worst-case memory use.
247    ///
248    /// If unordered reads have already been made, the resulting buffer may have gaps containing
249    /// arbitrary data.
250    ///
251    /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
252    pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
253        ReadToEnd {
254            stream: self,
255            size_limit,
256            read: Vec::new(),
257            start: u64::max_value(),
258            end: 0,
259        }
260        .await
261    }
262
263    /// Stop accepting data
264    ///
265    /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
266    /// attempts to operate on a stream will yield `UnknownStream` errors.
267    pub fn stop(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
268        let mut conn = self.conn.state.lock("RecvStream::stop");
269        if self.is_0rtt && conn.check_0rtt().is_err() {
270            return Ok(());
271        }
272        conn.inner.recv_stream(self.stream).stop(error_code)?;
273        conn.wake();
274        self.all_data_read = true;
275        Ok(())
276    }
277
278    /// Check if this stream has been opened during 0-RTT.
279    ///
280    /// In which case any non-idempotent request should be considered dangerous at the application
281    /// level. Because read data is subject to replay attacks.
282    pub fn is_0rtt(&self) -> bool {
283        self.is_0rtt
284    }
285
286    /// Get the identity of this stream
287    pub fn id(&self) -> StreamId {
288        self.stream
289    }
290
291    /// Handle common logic related to reading out of a receive stream
292    ///
293    /// This takes an `FnMut` closure that takes care of the actual reading process, matching
294    /// the detailed read semantics for the calling function with a particular return type.
295    /// The closure can read from the passed `&mut Chunks` and has to return the status after
296    /// reading: the amount of data read, and the status after the final read call.
297    fn poll_read_generic<T, U>(
298        &mut self,
299        cx: &mut Context,
300        ordered: bool,
301        mut read_fn: T,
302    ) -> Poll<Result<Option<U>, ReadError>>
303    where
304        T: FnMut(&mut Chunks) -> ReadStatus<U>,
305    {
306        use proto::ReadError::*;
307        if self.all_data_read {
308            return Poll::Ready(Ok(None));
309        }
310
311        let mut conn = self.conn.state.lock("RecvStream::poll_read");
312        if self.is_0rtt {
313            conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
314        }
315
316        // If we stored an error during a previous call, return it now. This can happen if a
317        // `read_fn` both wants to return data and also returns an error in its final stream status.
318        let status = match self.reset.take() {
319            Some(code) => ReadStatus::Failed(None, Reset(code)),
320            None => {
321                let mut recv = conn.inner.recv_stream(self.stream);
322                let mut chunks = recv.read(ordered)?;
323                let status = read_fn(&mut chunks);
324                if chunks.finalize().should_transmit() {
325                    conn.wake();
326                }
327                status
328            }
329        };
330
331        match status {
332            ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
333            ReadStatus::Finished(read) => {
334                self.all_data_read = true;
335                Poll::Ready(Ok(read))
336            }
337            ReadStatus::Failed(read, Blocked) => match read {
338                Some(val) => Poll::Ready(Ok(Some(val))),
339                None => {
340                    if let Some(ref x) = conn.error {
341                        return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
342                    }
343                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
344                    Poll::Pending
345                }
346            },
347            ReadStatus::Failed(read, Reset(error_code)) => match read {
348                None => {
349                    self.all_data_read = true;
350                    Poll::Ready(Err(ReadError::Reset(error_code)))
351                }
352                done => {
353                    self.reset = Some(error_code);
354                    Poll::Ready(Ok(done))
355                }
356            },
357        }
358    }
359}
360
361enum ReadStatus<T> {
362    Readable(T),
363    Finished(Option<T>),
364    Failed(Option<T>, proto::ReadError),
365}
366
367impl<T> From<(Option<T>, Option<proto::ReadError>)> for ReadStatus<T> {
368    fn from(status: (Option<T>, Option<proto::ReadError>)) -> Self {
369        match status {
370            (read, None) => Self::Finished(read),
371            (read, Some(e)) => Self::Failed(read, e),
372        }
373    }
374}
375
376/// Future produced by [`RecvStream::read_to_end()`].
377///
378/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
379#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
380struct ReadToEnd<'a> {
381    stream: &'a mut RecvStream,
382    read: Vec<(Bytes, u64)>,
383    start: u64,
384    end: u64,
385    size_limit: usize,
386}
387
388impl Future for ReadToEnd<'_> {
389    type Output = Result<Vec<u8>, ReadToEndError>;
390    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
391        loop {
392            match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
393                Some(chunk) => {
394                    self.start = self.start.min(chunk.offset);
395                    let end = chunk.bytes.len() as u64 + chunk.offset;
396                    if (end - self.start) > self.size_limit as u64 {
397                        return Poll::Ready(Err(ReadToEndError::TooLong));
398                    }
399                    self.end = self.end.max(end);
400                    self.read.push((chunk.bytes, chunk.offset));
401                }
402                None => {
403                    if self.end == 0 {
404                        // Never received anything
405                        return Poll::Ready(Ok(Vec::new()));
406                    }
407                    let start = self.start;
408                    let mut buffer = vec![0; (self.end - start) as usize];
409                    for (data, offset) in self.read.drain(..) {
410                        let offset = (offset - start) as usize;
411                        buffer[offset..offset + data.len()].copy_from_slice(&data);
412                    }
413                    return Poll::Ready(Ok(buffer));
414                }
415            }
416        }
417    }
418}
419
420/// Errors from [`RecvStream::read_to_end`]
421#[derive(Debug, Error, Clone, PartialEq, Eq)]
422pub enum ReadToEndError {
423    /// An error occurred during reading
424    #[error("read error: {0}")]
425    Read(#[from] ReadError),
426    /// The stream is larger than the user-supplied limit
427    #[error("stream too long")]
428    TooLong,
429}
430
431#[cfg(feature = "futures-io")]
432impl futures_io::AsyncRead for RecvStream {
433    fn poll_read(
434        self: Pin<&mut Self>,
435        cx: &mut Context,
436        buf: &mut [u8],
437    ) -> Poll<io::Result<usize>> {
438        let mut buf = ReadBuf::new(buf);
439        ready!(RecvStream::poll_read(self.get_mut(), cx, &mut buf))?;
440        Poll::Ready(Ok(buf.filled().len()))
441    }
442}
443
444impl tokio::io::AsyncRead for RecvStream {
445    fn poll_read(
446        self: Pin<&mut Self>,
447        cx: &mut Context<'_>,
448        buf: &mut ReadBuf<'_>,
449    ) -> Poll<io::Result<()>> {
450        ready!(Self::poll_read(self.get_mut(), cx, buf))?;
451        Poll::Ready(Ok(()))
452    }
453}
454
455impl Drop for RecvStream {
456    fn drop(&mut self) {
457        let mut conn = self.conn.state.lock("RecvStream::drop");
458
459        // clean up any previously registered wakers
460        conn.blocked_readers.remove(&self.stream);
461
462        if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
463            return;
464        }
465        if !self.all_data_read {
466            // Ignore UnknownStream errors
467            let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
468            conn.wake();
469        }
470    }
471}
472
473/// Errors that arise from reading from a stream.
474#[derive(Debug, Error, Clone, PartialEq, Eq)]
475pub enum ReadError {
476    /// The peer abandoned transmitting data on this stream
477    ///
478    /// Carries an application-defined error code.
479    #[error("stream reset by peer: error {0}")]
480    Reset(VarInt),
481    /// The connection was lost
482    #[error("connection lost")]
483    ConnectionLost(#[from] ConnectionError),
484    /// The stream has already been stopped, finished, or reset
485    #[error("unknown stream")]
486    UnknownStream,
487    /// Attempted an ordered read following an unordered read
488    ///
489    /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
490    /// stream which cannot be recovered, making further ordered reads impossible.
491    #[error("ordered read after unordered read")]
492    IllegalOrderedRead,
493    /// This was a 0-RTT stream and the server rejected it
494    ///
495    /// Can only occur on clients for 0-RTT streams, which can be opened using
496    /// [`Connecting::into_0rtt()`].
497    ///
498    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
499    #[error("0-RTT rejected")]
500    ZeroRttRejected,
501}
502
503impl From<ReadableError> for ReadError {
504    fn from(e: ReadableError) -> Self {
505        match e {
506            ReadableError::UnknownStream => Self::UnknownStream,
507            ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
508        }
509    }
510}
511
512impl From<ReadError> for io::Error {
513    fn from(x: ReadError) -> Self {
514        use self::ReadError::*;
515        let kind = match x {
516            Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
517            ConnectionLost(_) | UnknownStream => io::ErrorKind::NotConnected,
518            IllegalOrderedRead => io::ErrorKind::InvalidInput,
519        };
520        Self::new(kind, x)
521    }
522}
523
524/// Future produced by [`RecvStream::read()`].
525///
526/// [`RecvStream::read()`]: crate::RecvStream::read
527#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
528struct Read<'a> {
529    stream: &'a mut RecvStream,
530    buf: ReadBuf<'a>,
531}
532
533impl<'a> Future for Read<'a> {
534    type Output = Result<Option<usize>, ReadError>;
535
536    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
537        let this = self.get_mut();
538        ready!(this.stream.poll_read(cx, &mut this.buf))?;
539        match this.buf.filled().len() {
540            0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
541            n => Poll::Ready(Ok(Some(n))),
542        }
543    }
544}
545
546/// Future produced by [`RecvStream::read_exact()`].
547///
548/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
549#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
550struct ReadExact<'a> {
551    stream: &'a mut RecvStream,
552    buf: ReadBuf<'a>,
553}
554
555impl<'a> Future for ReadExact<'a> {
556    type Output = Result<(), ReadExactError>;
557    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
558        let this = self.get_mut();
559        let total = this.buf.remaining();
560        let mut remaining = total;
561        while remaining > 0 {
562            ready!(this.stream.poll_read(cx, &mut this.buf))?;
563            let new = this.buf.remaining();
564            if new == remaining {
565                let read = total - remaining;
566                return Poll::Ready(Err(ReadExactError::FinishedEarly(read)));
567            }
568            remaining = new;
569        }
570        Poll::Ready(Ok(()))
571    }
572}
573
574/// Errors that arise from reading from a stream.
575#[derive(Debug, Error, Clone, PartialEq, Eq)]
576pub enum ReadExactError {
577    /// The stream finished before all bytes were read
578    #[error("stream finished early ({0} bytes read)")]
579    FinishedEarly(usize),
580    /// A read error occurred
581    #[error(transparent)]
582    ReadError(#[from] ReadError),
583}
584
585/// Future produced by [`RecvStream::read_chunk()`].
586///
587/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
588#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
589struct ReadChunk<'a> {
590    stream: &'a mut RecvStream,
591    max_length: usize,
592    ordered: bool,
593}
594
595impl<'a> Future for ReadChunk<'a> {
596    type Output = Result<Option<Chunk>, ReadError>;
597    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
598        let (max_length, ordered) = (self.max_length, self.ordered);
599        self.stream.poll_read_chunk(cx, max_length, ordered)
600    }
601}
602
603/// Future produced by [`RecvStream::read_chunks()`].
604///
605/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
606#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
607struct ReadChunks<'a> {
608    stream: &'a mut RecvStream,
609    bufs: &'a mut [Bytes],
610}
611
612impl<'a> Future for ReadChunks<'a> {
613    type Output = Result<Option<usize>, ReadError>;
614    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
615        let this = self.get_mut();
616        this.stream.poll_read_chunks(cx, this.bufs)
617    }
618}