quinn/
recv_stream.rs

1use std::{
2    future::{Future, poll_fn},
3    io,
4    pin::Pin,
5    task::{Context, Poll, ready},
6};
7
8use bytes::Bytes;
9use proto::{Chunk, Chunks, ClosedStream, ConnectionError, ReadableError, StreamId};
10use thiserror::Error;
11use tokio::io::ReadBuf;
12
13use crate::{VarInt, connection::ConnectionRef};
14
15/// A stream that can only be used to receive data
16///
17/// `stop(0)` is implicitly called on drop unless:
18/// - A variant of [`ReadError`] has been yielded by a read call
19/// - [`stop()`] was called explicitly
20///
21/// # Cancellation
22///
23/// A `read` method is said to be *cancel-safe* when dropping its future before the future becomes
24/// ready cannot lead to loss of stream data. This is true of methods which succeed immediately when
25/// any progress is made, and is not true of methods which might need to perform multiple reads
26/// internally before succeeding. Each `read` method documents whether it is cancel-safe.
27///
28/// # Common issues
29///
30/// ## Data never received on a locally-opened stream
31///
32/// Peers are not notified of streams until they or a later-numbered stream are used to send
33/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
34/// never see it. Application protocols should always arrange for the endpoint which will first
35/// transmit on a stream to be the endpoint responsible for opening it.
36///
37/// ## Data never received on a remotely-opened stream
38///
39/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
40/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
41/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
42/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
43/// will be bidirectional stream 0.
44///
45/// [`ReadError`]: crate::ReadError
46/// [`stop()`]: RecvStream::stop
47/// [`SendStream::finish`]: crate::SendStream::finish
48/// [`WriteError::Stopped`]: crate::WriteError::Stopped
49/// [`id`]: RecvStream::id
50/// [`Connection::accept_bi`]: crate::Connection::accept_bi
51#[derive(Debug)]
52pub struct RecvStream {
53    conn: ConnectionRef,
54    stream: StreamId,
55    is_0rtt: bool,
56    all_data_read: bool,
57    reset: Option<VarInt>,
58}
59
60impl RecvStream {
61    pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
62        Self {
63            conn,
64            stream,
65            is_0rtt,
66            all_data_read: false,
67            reset: None,
68        }
69    }
70
71    /// Read data contiguously from the stream.
72    ///
73    /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
74    ///
75    /// This operation is cancel-safe.
76    pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
77        Read {
78            stream: self,
79            buf: ReadBuf::new(buf),
80        }
81        .await
82    }
83
84    /// Read an exact number of bytes contiguously from the stream.
85    ///
86    /// See [`read()`] for details. This operation is *not* cancel-safe.
87    ///
88    /// [`read()`]: RecvStream::read
89    pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
90        ReadExact {
91            stream: self,
92            buf: ReadBuf::new(buf),
93        }
94        .await
95    }
96
97    /// Attempts to read from the stream into buf.
98    ///
99    /// On success, returns Poll::Ready(Ok(num_bytes_read)) and places data in
100    /// the buf. If no data was read, it implies that EOF has been reached.
101    ///
102    /// If no data is available for reading, the method returns Poll::Pending
103    /// and arranges for the current task (via cx.waker()) to receive a notification
104    /// when the stream becomes readable or is closed.
105    pub fn poll_read(
106        &mut self,
107        cx: &mut Context,
108        buf: &mut [u8],
109    ) -> Poll<Result<usize, ReadError>> {
110        let mut buf = ReadBuf::new(buf);
111        ready!(self.poll_read_buf(cx, &mut buf))?;
112        Poll::Ready(Ok(buf.filled().len()))
113    }
114
115    /// Attempts to read from the stream into buf.
116    ///
117    /// On success, returns Poll::Ready(Ok(())) and places data in
118    /// the buf. If no data was read and the buffer had a non null capacity,
119    /// it implies that EOF has been reached.
120    ///
121    /// If no data is available for reading, the method returns Poll::Pending
122    /// and arranges for the current task (via cx.waker()) to receive a notification
123    /// when the stream becomes readable or is closed.
124    pub fn poll_read_buf(
125        &mut self,
126        cx: &mut Context,
127        buf: &mut ReadBuf<'_>,
128    ) -> Poll<Result<(), ReadError>> {
129        if buf.remaining() == 0 {
130            return Poll::Ready(Ok(()));
131        }
132
133        self.poll_read_generic(cx, true, |chunks| {
134            let mut read = false;
135            loop {
136                if buf.remaining() == 0 {
137                    // We know `read` is `true` because `buf.remaining()` was not 0 before
138                    return ReadStatus::Readable(());
139                }
140
141                match chunks.next(buf.remaining()) {
142                    Ok(Some(chunk)) => {
143                        buf.put_slice(&chunk.bytes);
144                        read = true;
145                    }
146                    res => return (if read { Some(()) } else { None }, res.err()).into(),
147                }
148            }
149        })
150        .map(|res| res.map(|_| ()))
151    }
152
153    /// Read the next segment of data
154    ///
155    /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
156    /// offset in the stream. If `ordered` is `true`, the chunk's offset will be immediately after
157    /// the last data yielded by `read()` or `read_chunk()`. If `ordered` is `false`, segments may
158    /// be received in any order, and the `Chunk`'s `offset` field can be used to determine
159    /// ordering in the caller. Unordered reads are less prone to head-of-line blocking within a
160    /// stream, but require the application to manage reassembling the original data.
161    ///
162    /// Slightly more efficient than `read` due to not copying. Chunk boundaries do not correspond
163    /// to peer writes, and hence cannot be used as framing.
164    ///
165    /// This operation is cancel-safe.
166    pub async fn read_chunk(
167        &mut self,
168        max_length: usize,
169        ordered: bool,
170    ) -> Result<Option<Chunk>, ReadError> {
171        ReadChunk {
172            stream: self,
173            max_length,
174            ordered,
175        }
176        .await
177    }
178
179    /// Attempts to read a chunk from the stream.
180    ///
181    /// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
182    /// is returned, it implies that EOF has been reached.
183    ///
184    /// If no data is available for reading, the method returns `Poll::Pending`
185    /// and arranges for the current task (via cx.waker()) to receive a notification
186    /// when the stream becomes readable or is closed.
187    fn poll_read_chunk(
188        &mut self,
189        cx: &mut Context,
190        max_length: usize,
191        ordered: bool,
192    ) -> Poll<Result<Option<Chunk>, ReadError>> {
193        self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
194            Ok(Some(chunk)) => ReadStatus::Readable(chunk),
195            res => (None, res.err()).into(),
196        })
197    }
198
199    /// Read the next segments of data
200    ///
201    /// Fills `bufs` with the segments of data beginning immediately after the
202    /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
203    /// finished.
204    ///
205    /// Slightly more efficient than `read` due to not copying. Chunk boundaries
206    /// do not correspond to peer writes, and hence cannot be used as framing.
207    ///
208    /// This operation is cancel-safe.
209    pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
210        ReadChunks { stream: self, bufs }.await
211    }
212
213    /// Foundation of [`Self::read_chunks`]
214    fn poll_read_chunks(
215        &mut self,
216        cx: &mut Context,
217        bufs: &mut [Bytes],
218    ) -> Poll<Result<Option<usize>, ReadError>> {
219        if bufs.is_empty() {
220            return Poll::Ready(Ok(Some(0)));
221        }
222
223        self.poll_read_generic(cx, true, |chunks| {
224            let mut read = 0;
225            loop {
226                if read >= bufs.len() {
227                    // We know `read > 0` because `bufs` cannot be empty here
228                    return ReadStatus::Readable(read);
229                }
230
231                match chunks.next(usize::MAX) {
232                    Ok(Some(chunk)) => {
233                        bufs[read] = chunk.bytes;
234                        read += 1;
235                    }
236                    res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
237                }
238            }
239        })
240    }
241
242    /// Convenience method to read all remaining data into a buffer
243    ///
244    /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
245    /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
246    /// allow. `size_limit` should be set to limit worst-case memory use.
247    ///
248    /// If unordered reads have already been made, the resulting buffer may have gaps containing
249    /// arbitrary data.
250    ///
251    /// This operation is *not* cancel-safe.
252    ///
253    /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
254    pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
255        ReadToEnd {
256            stream: self,
257            size_limit,
258            read: Vec::new(),
259            start: u64::MAX,
260            end: 0,
261        }
262        .await
263    }
264
265    /// Stop accepting data
266    ///
267    /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
268    /// attempts to operate on a stream will yield `ClosedStream` errors.
269    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
270        let mut conn = self.conn.state.lock("RecvStream::stop");
271        if self.is_0rtt && conn.check_0rtt().is_err() {
272            return Ok(());
273        }
274        conn.inner.recv_stream(self.stream).stop(error_code)?;
275        conn.wake();
276        self.all_data_read = true;
277        Ok(())
278    }
279
280    /// Check if this stream has been opened during 0-RTT.
281    ///
282    /// In which case any non-idempotent request should be considered dangerous at the application
283    /// level. Because read data is subject to replay attacks.
284    pub fn is_0rtt(&self) -> bool {
285        self.is_0rtt
286    }
287
288    /// Get the identity of this stream
289    pub fn id(&self) -> StreamId {
290        self.stream
291    }
292
293    /// Completes when the stream has been reset by the peer or otherwise closed
294    ///
295    /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
296    /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
297    /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
298    /// which it is no longer meaningful for the stream to be reset.
299    ///
300    /// This operation is cancel-safe.
301    pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
302        poll_fn(|cx| {
303            let mut conn = self.conn.state.lock("RecvStream::reset");
304            if self.is_0rtt && conn.check_0rtt().is_err() {
305                return Poll::Ready(Err(ResetError::ZeroRttRejected));
306            }
307
308            if let Some(code) = self.reset {
309                return Poll::Ready(Ok(Some(code)));
310            }
311
312            match conn.inner.recv_stream(self.stream).received_reset() {
313                Err(_) => Poll::Ready(Ok(None)),
314                Ok(Some(error_code)) => {
315                    // Stream state has just now been freed, so the connection may need to issue new
316                    // stream ID flow control credit
317                    conn.wake();
318                    Poll::Ready(Ok(Some(error_code)))
319                }
320                Ok(None) => {
321                    if let Some(e) = &conn.error {
322                        return Poll::Ready(Err(e.clone().into()));
323                    }
324                    // Resets always notify readers, since a reset is an immediate read error. We
325                    // could introduce a dedicated channel to reduce the risk of spurious wakeups,
326                    // but that increased complexity is probably not justified, as an application
327                    // that is expecting a reset is not likely to receive large amounts of data.
328                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
329                    Poll::Pending
330                }
331            }
332        })
333        .await
334    }
335
336    /// Handle common logic related to reading out of a receive stream
337    ///
338    /// This takes an `FnMut` closure that takes care of the actual reading process, matching
339    /// the detailed read semantics for the calling function with a particular return type.
340    /// The closure can read from the passed `&mut Chunks` and has to return the status after
341    /// reading: the amount of data read, and the status after the final read call.
342    fn poll_read_generic<T, U>(
343        &mut self,
344        cx: &mut Context,
345        ordered: bool,
346        mut read_fn: T,
347    ) -> Poll<Result<Option<U>, ReadError>>
348    where
349        T: FnMut(&mut Chunks) -> ReadStatus<U>,
350    {
351        use proto::ReadError::*;
352        if self.all_data_read {
353            return Poll::Ready(Ok(None));
354        }
355
356        let mut conn = self.conn.state.lock("RecvStream::poll_read");
357        if self.is_0rtt {
358            conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
359        }
360
361        // If we stored an error during a previous call, return it now. This can happen if a
362        // `read_fn` both wants to return data and also returns an error in its final stream status.
363        let status = match self.reset {
364            Some(code) => ReadStatus::Failed(None, Reset(code)),
365            None => {
366                let mut recv = conn.inner.recv_stream(self.stream);
367                let mut chunks = recv.read(ordered)?;
368                let status = read_fn(&mut chunks);
369                if chunks.finalize().should_transmit() {
370                    conn.wake();
371                }
372                status
373            }
374        };
375
376        match status {
377            ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
378            ReadStatus::Finished(read) => {
379                self.all_data_read = true;
380                Poll::Ready(Ok(read))
381            }
382            ReadStatus::Failed(read, Blocked) => match read {
383                Some(val) => Poll::Ready(Ok(Some(val))),
384                None => {
385                    if let Some(ref x) = conn.error {
386                        return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
387                    }
388                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
389                    Poll::Pending
390                }
391            },
392            ReadStatus::Failed(read, Reset(error_code)) => match read {
393                None => {
394                    self.all_data_read = true;
395                    self.reset = Some(error_code);
396                    Poll::Ready(Err(ReadError::Reset(error_code)))
397                }
398                done => {
399                    self.reset = Some(error_code);
400                    Poll::Ready(Ok(done))
401                }
402            },
403        }
404    }
405}
406
407enum ReadStatus<T> {
408    Readable(T),
409    Finished(Option<T>),
410    Failed(Option<T>, proto::ReadError),
411}
412
413impl<T> From<(Option<T>, Option<proto::ReadError>)> for ReadStatus<T> {
414    fn from(status: (Option<T>, Option<proto::ReadError>)) -> Self {
415        match status {
416            (read, None) => Self::Finished(read),
417            (read, Some(e)) => Self::Failed(read, e),
418        }
419    }
420}
421
422/// Future produced by [`RecvStream::read_to_end()`].
423///
424/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
425struct ReadToEnd<'a> {
426    stream: &'a mut RecvStream,
427    read: Vec<(Bytes, u64)>,
428    start: u64,
429    end: u64,
430    size_limit: usize,
431}
432
433impl Future for ReadToEnd<'_> {
434    type Output = Result<Vec<u8>, ReadToEndError>;
435    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
436        loop {
437            match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
438                Some(chunk) => {
439                    self.start = self.start.min(chunk.offset);
440                    let end = chunk.bytes.len() as u64 + chunk.offset;
441                    if (end - self.start) > self.size_limit as u64 {
442                        return Poll::Ready(Err(ReadToEndError::TooLong));
443                    }
444                    self.end = self.end.max(end);
445                    self.read.push((chunk.bytes, chunk.offset));
446                }
447                None => {
448                    if self.end == 0 {
449                        // Never received anything
450                        return Poll::Ready(Ok(Vec::new()));
451                    }
452                    let start = self.start;
453                    let mut buffer = vec![0; (self.end - start) as usize];
454                    for (data, offset) in self.read.drain(..) {
455                        let offset = (offset - start) as usize;
456                        buffer[offset..offset + data.len()].copy_from_slice(&data);
457                    }
458                    return Poll::Ready(Ok(buffer));
459                }
460            }
461        }
462    }
463}
464
465/// Errors from [`RecvStream::read_to_end`]
466#[derive(Debug, Error, Clone, PartialEq, Eq)]
467pub enum ReadToEndError {
468    /// An error occurred during reading
469    #[error("read error: {0}")]
470    Read(#[from] ReadError),
471    /// The stream is larger than the user-supplied limit
472    #[error("stream too long")]
473    TooLong,
474}
475
476#[cfg(feature = "futures-io")]
477impl futures_io::AsyncRead for RecvStream {
478    fn poll_read(
479        self: Pin<&mut Self>,
480        cx: &mut Context,
481        buf: &mut [u8],
482    ) -> Poll<io::Result<usize>> {
483        let mut buf = ReadBuf::new(buf);
484        ready!(Self::poll_read_buf(self.get_mut(), cx, &mut buf))?;
485        Poll::Ready(Ok(buf.filled().len()))
486    }
487}
488
489impl tokio::io::AsyncRead for RecvStream {
490    fn poll_read(
491        self: Pin<&mut Self>,
492        cx: &mut Context<'_>,
493        buf: &mut ReadBuf<'_>,
494    ) -> Poll<io::Result<()>> {
495        ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
496        Poll::Ready(Ok(()))
497    }
498}
499
500impl Drop for RecvStream {
501    fn drop(&mut self) {
502        let mut conn = self.conn.state.lock("RecvStream::drop");
503
504        // clean up any previously registered wakers
505        conn.blocked_readers.remove(&self.stream);
506
507        if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
508            return;
509        }
510        if !self.all_data_read {
511            // Ignore ClosedStream errors
512            let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
513            conn.wake();
514        }
515    }
516}
517
518/// Errors that arise from reading from a stream.
519#[derive(Debug, Error, Clone, PartialEq, Eq)]
520pub enum ReadError {
521    /// The peer abandoned transmitting data on this stream
522    ///
523    /// Carries an application-defined error code.
524    #[error("stream reset by peer: error {0}")]
525    Reset(VarInt),
526    /// The connection was lost
527    #[error("connection lost")]
528    ConnectionLost(#[from] ConnectionError),
529    /// The stream has already been stopped, finished, or reset
530    #[error("closed stream")]
531    ClosedStream,
532    /// Attempted an ordered read following an unordered read
533    ///
534    /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
535    /// stream which cannot be recovered, making further ordered reads impossible.
536    #[error("ordered read after unordered read")]
537    IllegalOrderedRead,
538    /// This was a 0-RTT stream and the server rejected it
539    ///
540    /// Can only occur on clients for 0-RTT streams, which can be opened using
541    /// [`Connecting::into_0rtt()`].
542    ///
543    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
544    #[error("0-RTT rejected")]
545    ZeroRttRejected,
546}
547
548impl From<ReadableError> for ReadError {
549    fn from(e: ReadableError) -> Self {
550        match e {
551            ReadableError::ClosedStream => Self::ClosedStream,
552            ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
553        }
554    }
555}
556
557impl From<ResetError> for ReadError {
558    fn from(e: ResetError) -> Self {
559        match e {
560            ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
561            ResetError::ZeroRttRejected => Self::ZeroRttRejected,
562        }
563    }
564}
565
566impl From<ReadError> for io::Error {
567    fn from(x: ReadError) -> Self {
568        use ReadError::*;
569        let kind = match x {
570            Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
571            ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
572            IllegalOrderedRead => io::ErrorKind::InvalidInput,
573        };
574        Self::new(kind, x)
575    }
576}
577
578/// Errors that arise while waiting for a stream to be reset
579#[derive(Debug, Error, Clone, PartialEq, Eq)]
580pub enum ResetError {
581    /// The connection was lost
582    #[error("connection lost")]
583    ConnectionLost(#[from] ConnectionError),
584    /// This was a 0-RTT stream and the server rejected it
585    ///
586    /// Can only occur on clients for 0-RTT streams, which can be opened using
587    /// [`Connecting::into_0rtt()`].
588    ///
589    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
590    #[error("0-RTT rejected")]
591    ZeroRttRejected,
592}
593
594impl From<ResetError> for io::Error {
595    fn from(x: ResetError) -> Self {
596        use ResetError::*;
597        let kind = match x {
598            ZeroRttRejected => io::ErrorKind::ConnectionReset,
599            ConnectionLost(_) => io::ErrorKind::NotConnected,
600        };
601        Self::new(kind, x)
602    }
603}
604
605/// Future produced by [`RecvStream::read()`].
606///
607/// [`RecvStream::read()`]: crate::RecvStream::read
608struct Read<'a> {
609    stream: &'a mut RecvStream,
610    buf: ReadBuf<'a>,
611}
612
613impl Future for Read<'_> {
614    type Output = Result<Option<usize>, ReadError>;
615
616    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
617        let this = self.get_mut();
618        ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
619        match this.buf.filled().len() {
620            0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
621            n => Poll::Ready(Ok(Some(n))),
622        }
623    }
624}
625
626/// Future produced by [`RecvStream::read_exact()`].
627///
628/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
629struct ReadExact<'a> {
630    stream: &'a mut RecvStream,
631    buf: ReadBuf<'a>,
632}
633
634impl Future for ReadExact<'_> {
635    type Output = Result<(), ReadExactError>;
636    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
637        let this = self.get_mut();
638        let mut remaining = this.buf.remaining();
639        while remaining > 0 {
640            ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
641            let new = this.buf.remaining();
642            if new == remaining {
643                return Poll::Ready(Err(ReadExactError::FinishedEarly(this.buf.filled().len())));
644            }
645            remaining = new;
646        }
647        Poll::Ready(Ok(()))
648    }
649}
650
651/// Errors that arise from reading from a stream.
652#[derive(Debug, Error, Clone, PartialEq, Eq)]
653pub enum ReadExactError {
654    /// The stream finished before all bytes were read
655    #[error("stream finished early ({0} bytes read)")]
656    FinishedEarly(usize),
657    /// A read error occurred
658    #[error(transparent)]
659    ReadError(#[from] ReadError),
660}
661
662/// Future produced by [`RecvStream::read_chunk()`].
663///
664/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
665struct ReadChunk<'a> {
666    stream: &'a mut RecvStream,
667    max_length: usize,
668    ordered: bool,
669}
670
671impl Future for ReadChunk<'_> {
672    type Output = Result<Option<Chunk>, ReadError>;
673    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
674        let (max_length, ordered) = (self.max_length, self.ordered);
675        self.stream.poll_read_chunk(cx, max_length, ordered)
676    }
677}
678
679/// Future produced by [`RecvStream::read_chunks()`].
680///
681/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
682struct ReadChunks<'a> {
683    stream: &'a mut RecvStream,
684    bufs: &'a mut [Bytes],
685}
686
687impl Future for ReadChunks<'_> {
688    type Output = Result<Option<usize>, ReadError>;
689    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
690        let this = self.get_mut();
691        this.stream.poll_read_chunks(cx, this.bufs)
692    }
693}