ant_quic/high_level/
recv_stream.rs

1use std::{
2    future::{Future, poll_fn},
3    io,
4    pin::Pin,
5    task::{Context, Poll, ready},
6};
7
8use crate::{Chunk, Chunks, ClosedStream, ConnectionError, ReadableError, StreamId};
9use bytes::Bytes;
10use thiserror::Error;
11use tokio::io::ReadBuf;
12
13use super::connection::ConnectionRef;
14use crate::VarInt;
15
16/// A stream that can only be used to receive data
17///
18/// `stop(0)` is implicitly called on drop unless:
19/// - A variant of [`ReadError`] has been yielded by a read call
20/// - [`stop()`] was called explicitly
21///
22/// # Cancellation
23///
24/// A `read` method is said to be *cancel-safe* when dropping its future before the future becomes
25/// ready cannot lead to loss of stream data. This is true of methods which succeed immediately when
26/// any progress is made, and is not true of methods which might need to perform multiple reads
27/// internally before succeeding. Each `read` method documents whether it is cancel-safe.
28///
29/// # Common issues
30///
31/// ## Data never received on a locally-opened stream
32///
33/// Peers are not notified of streams until they or a later-numbered stream are used to send
34/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
35/// never see it. Application protocols should always arrange for the endpoint which will first
36/// transmit on a stream to be the endpoint responsible for opening it.
37///
38/// ## Data never received on a remotely-opened stream
39///
40/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
41/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
42/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
43/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
44/// will be bidirectional stream 0.
45///
46/// [`ReadError`]: crate::ReadError
47/// [`stop()`]: RecvStream::stop
48/// [`SendStream::finish`]: crate::SendStream::finish
49/// [`WriteError::Stopped`]: crate::WriteError::Stopped
50/// [`id`]: RecvStream::id
51/// [`Connection::accept_bi`]: crate::Connection::accept_bi
52#[derive(Debug)]
53pub struct RecvStream {
54    conn: ConnectionRef,
55    stream: StreamId,
56    is_0rtt: bool,
57    all_data_read: bool,
58    reset: Option<VarInt>,
59}
60
61impl RecvStream {
62    pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
63        Self {
64            conn,
65            stream,
66            is_0rtt,
67            all_data_read: false,
68            reset: None,
69        }
70    }
71
72    /// Read data contiguously from the stream.
73    ///
74    /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
75    ///
76    /// This operation is cancel-safe.
77    pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
78        Read {
79            stream: self,
80            buf: ReadBuf::new(buf),
81        }
82        .await
83    }
84
85    /// Read an exact number of bytes contiguously from the stream.
86    ///
87    /// See [`read()`] for details. This operation is *not* cancel-safe.
88    ///
89    /// [`read()`]: RecvStream::read
90    pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
91        ReadExact {
92            stream: self,
93            buf: ReadBuf::new(buf),
94        }
95        .await
96    }
97
98    /// Attempts to read from the stream into the provided buffer
99    ///
100    /// On success, returns `Poll::Ready(Ok(num_bytes_read))` and places data into `buf`. If this
101    /// returns zero bytes read (and `buf` has a non-zero length), that indicates that the remote
102    /// side has [`finish`]ed the stream and the local side has already read all bytes.
103    ///
104    /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
105    /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
106    /// closed.
107    ///
108    /// [`finish`]: crate::SendStream::finish
109    pub fn poll_read(
110        &mut self,
111        cx: &mut Context,
112        buf: &mut [u8],
113    ) -> Poll<Result<usize, ReadError>> {
114        let mut buf = ReadBuf::new(buf);
115        ready!(self.poll_read_buf(cx, &mut buf))?;
116        Poll::Ready(Ok(buf.filled().len()))
117    }
118
119    /// Attempts to read from the stream into the provided buffer, which may be uninitialized
120    ///
121    /// On success, returns `Poll::Ready(Ok(()))` and places data into the unfilled portion of
122    /// `buf`. If this does not write any bytes to `buf` (and `buf.remaining()` is non-zero), that
123    /// indicates that the remote side has [`finish`]ed the stream and the local side has already
124    /// read all bytes.
125    ///
126    /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
127    /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
128    /// closed.
129    ///
130    /// [`finish`]: crate::SendStream::finish
131    pub fn poll_read_buf(
132        &mut self,
133        cx: &mut Context,
134        buf: &mut ReadBuf<'_>,
135    ) -> Poll<Result<(), ReadError>> {
136        if buf.remaining() == 0 {
137            return Poll::Ready(Ok(()));
138        }
139
140        self.poll_read_generic(cx, true, |chunks| {
141            let mut read = false;
142            loop {
143                if buf.remaining() == 0 {
144                    // We know `read` is `true` because `buf.remaining()` was not 0 before
145                    return ReadStatus::Readable(());
146                }
147
148                match chunks.next(buf.remaining()) {
149                    Ok(Some(chunk)) => {
150                        buf.put_slice(&chunk.bytes);
151                        read = true;
152                    }
153                    res => return (if read { Some(()) } else { None }, res.err()).into(),
154                }
155            }
156        })
157        .map(|res| res.map(|_| ()))
158    }
159
160    /// Read the next segment of data
161    ///
162    /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
163    /// offset in the stream. If `ordered` is `true`, the chunk's offset will be immediately after
164    /// the last data yielded by `read()` or `read_chunk()`. If `ordered` is `false`, segments may
165    /// be received in any order, and the `Chunk`'s `offset` field can be used to determine
166    /// ordering in the caller. Unordered reads are less prone to head-of-line blocking within a
167    /// stream, but require the application to manage reassembling the original data.
168    ///
169    /// Slightly more efficient than `read` due to not copying. Chunk boundaries do not correspond
170    /// to peer writes, and hence cannot be used as framing.
171    ///
172    /// This operation is cancel-safe.
173    pub async fn read_chunk(
174        &mut self,
175        max_length: usize,
176        ordered: bool,
177    ) -> Result<Option<Chunk>, ReadError> {
178        ReadChunk {
179            stream: self,
180            max_length,
181            ordered,
182        }
183        .await
184    }
185
186    /// Attempts to read a chunk from the stream.
187    ///
188    /// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
189    /// is returned, it implies that EOF has been reached.
190    ///
191    /// If no data is available for reading, the method returns `Poll::Pending`
192    /// and arranges for the current task (via cx.waker()) to receive a notification
193    /// when the stream becomes readable or is closed.
194    fn poll_read_chunk(
195        &mut self,
196        cx: &mut Context,
197        max_length: usize,
198        ordered: bool,
199    ) -> Poll<Result<Option<Chunk>, ReadError>> {
200        self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
201            Ok(Some(chunk)) => ReadStatus::Readable(chunk),
202            res => (None, res.err()).into(),
203        })
204    }
205
206    /// Read the next segments of data
207    ///
208    /// Fills `bufs` with the segments of data beginning immediately after the
209    /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
210    /// finished.
211    ///
212    /// Slightly more efficient than `read` due to not copying. Chunk boundaries
213    /// do not correspond to peer writes, and hence cannot be used as framing.
214    ///
215    /// This operation is cancel-safe.
216    pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
217        ReadChunks { stream: self, bufs }.await
218    }
219
220    /// Foundation of [`Self::read_chunks`]
221    fn poll_read_chunks(
222        &mut self,
223        cx: &mut Context,
224        bufs: &mut [Bytes],
225    ) -> Poll<Result<Option<usize>, ReadError>> {
226        if bufs.is_empty() {
227            return Poll::Ready(Ok(Some(0)));
228        }
229
230        self.poll_read_generic(cx, true, |chunks| {
231            let mut read = 0;
232            loop {
233                if read >= bufs.len() {
234                    // We know `read > 0` because `bufs` cannot be empty here
235                    return ReadStatus::Readable(read);
236                }
237
238                match chunks.next(usize::MAX) {
239                    Ok(Some(chunk)) => {
240                        bufs[read] = chunk.bytes;
241                        read += 1;
242                    }
243                    res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
244                }
245            }
246        })
247    }
248
249    /// Convenience method to read all remaining data into a buffer
250    ///
251    /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
252    /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
253    /// allow. `size_limit` should be set to limit worst-case memory use.
254    ///
255    /// If unordered reads have already been made, the resulting buffer may have gaps containing
256    /// arbitrary data.
257    ///
258    /// This operation is *not* cancel-safe.
259    ///
260    /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
261    pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
262        ReadToEnd {
263            stream: self,
264            size_limit,
265            read: Vec::new(),
266            start: u64::MAX,
267            end: 0,
268        }
269        .await
270    }
271
272    /// Stop accepting data
273    ///
274    /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
275    /// attempts to operate on a stream will yield `ClosedStream` errors.
276    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
277        let mut conn = self.conn.state.lock("RecvStream::stop");
278        if self.is_0rtt && conn.check_0rtt().is_err() {
279            return Ok(());
280        }
281        conn.inner.recv_stream(self.stream).stop(error_code)?;
282        conn.wake();
283        self.all_data_read = true;
284        Ok(())
285    }
286
287    /// Check if this stream has been opened during 0-RTT.
288    ///
289    /// In which case any non-idempotent request should be considered dangerous at the application
290    /// level. Because read data is subject to replay attacks.
291    pub fn is_0rtt(&self) -> bool {
292        self.is_0rtt
293    }
294
295    /// Get the identity of this stream
296    pub fn id(&self) -> StreamId {
297        self.stream
298    }
299
300    /// Completes when the stream has been reset by the peer or otherwise closed
301    ///
302    /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
303    /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
304    /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
305    /// which it is no longer meaningful for the stream to be reset.
306    ///
307    /// This operation is cancel-safe.
308    pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
309        poll_fn(|cx| {
310            let mut conn = self.conn.state.lock("RecvStream::reset");
311            if self.is_0rtt && conn.check_0rtt().is_err() {
312                return Poll::Ready(Err(ResetError::ZeroRttRejected));
313            }
314
315            if let Some(code) = self.reset {
316                return Poll::Ready(Ok(Some(code)));
317            }
318
319            match conn.inner.recv_stream(self.stream).received_reset() {
320                Err(_) => Poll::Ready(Ok(None)),
321                Ok(Some(error_code)) => {
322                    // Stream state has just now been freed, so the connection may need to issue new
323                    // stream ID flow control credit
324                    conn.wake();
325                    Poll::Ready(Ok(Some(error_code)))
326                }
327                Ok(None) => {
328                    if let Some(e) = &conn.error {
329                        return Poll::Ready(Err(e.clone().into()));
330                    }
331                    // Resets always notify readers, since a reset is an immediate read error. We
332                    // could introduce a dedicated channel to reduce the risk of spurious wakeups,
333                    // but that increased complexity is probably not justified, as an application
334                    // that is expecting a reset is not likely to receive large amounts of data.
335                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
336                    Poll::Pending
337                }
338            }
339        })
340        .await
341    }
342
343    /// Handle common logic related to reading out of a receive stream
344    ///
345    /// This takes an `FnMut` closure that takes care of the actual reading process, matching
346    /// the detailed read semantics for the calling function with a particular return type.
347    /// The closure can read from the passed `&mut Chunks` and has to return the status after
348    /// reading: the amount of data read, and the status after the final read call.
349    fn poll_read_generic<T, U>(
350        &mut self,
351        cx: &mut Context,
352        ordered: bool,
353        mut read_fn: T,
354    ) -> Poll<Result<Option<U>, ReadError>>
355    where
356        T: FnMut(&mut Chunks) -> ReadStatus<U>,
357    {
358        use crate::ReadError::*;
359        if self.all_data_read {
360            return Poll::Ready(Ok(None));
361        }
362
363        let mut conn = self.conn.state.lock("RecvStream::poll_read");
364        if self.is_0rtt {
365            conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
366        }
367
368        // If we stored an error during a previous call, return it now. This can happen if a
369        // `read_fn` both wants to return data and also returns an error in its final stream status.
370        let status = match self.reset {
371            Some(code) => ReadStatus::Failed(None, Reset(code)),
372            None => {
373                let mut recv = conn.inner.recv_stream(self.stream);
374                let mut chunks = recv.read(ordered)?;
375                let status = read_fn(&mut chunks);
376                if chunks.finalize().should_transmit() {
377                    conn.wake();
378                }
379                status
380            }
381        };
382
383        match status {
384            ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
385            ReadStatus::Finished(read) => {
386                self.all_data_read = true;
387                Poll::Ready(Ok(read))
388            }
389            ReadStatus::Failed(read, Blocked) => match read {
390                Some(val) => Poll::Ready(Ok(Some(val))),
391                None => {
392                    if let Some(ref x) = conn.error {
393                        return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
394                    }
395                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
396                    Poll::Pending
397                }
398            },
399            ReadStatus::Failed(read, Reset(error_code)) => match read {
400                None => {
401                    self.all_data_read = true;
402                    self.reset = Some(error_code);
403                    Poll::Ready(Err(ReadError::Reset(error_code)))
404                }
405                done => {
406                    self.reset = Some(error_code);
407                    Poll::Ready(Ok(done))
408                }
409            },
410            ReadStatus::Failed(_read, ConnectionClosed) => {
411                self.all_data_read = true;
412                Poll::Ready(Err(ReadError::ConnectionLost(
413                    ConnectionError::LocallyClosed,
414                )))
415            }
416        }
417    }
418}
419
420enum ReadStatus<T> {
421    Readable(T),
422    Finished(Option<T>),
423    Failed(Option<T>, crate::ReadError),
424}
425
426impl<T> From<(Option<T>, Option<crate::ReadError>)> for ReadStatus<T> {
427    fn from(status: (Option<T>, Option<crate::ReadError>)) -> Self {
428        match status {
429            (read, None) => Self::Finished(read),
430            (read, Some(e)) => Self::Failed(read, e),
431        }
432    }
433}
434
435/// Future produced by [`RecvStream::read_to_end()`].
436///
437/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
438struct ReadToEnd<'a> {
439    stream: &'a mut RecvStream,
440    read: Vec<(Bytes, u64)>,
441    start: u64,
442    end: u64,
443    size_limit: usize,
444}
445
446impl Future for ReadToEnd<'_> {
447    type Output = Result<Vec<u8>, ReadToEndError>;
448    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
449        loop {
450            match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
451                Some(chunk) => {
452                    self.start = self.start.min(chunk.offset);
453                    let end = chunk.bytes.len() as u64 + chunk.offset;
454                    if (end - self.start) > self.size_limit as u64 {
455                        return Poll::Ready(Err(ReadToEndError::TooLong));
456                    }
457                    self.end = self.end.max(end);
458                    self.read.push((chunk.bytes, chunk.offset));
459                }
460                None => {
461                    if self.end == 0 {
462                        // Never received anything
463                        return Poll::Ready(Ok(Vec::new()));
464                    }
465                    let start = self.start;
466                    let mut buffer = vec![0; (self.end - start) as usize];
467                    for (data, offset) in self.read.drain(..) {
468                        let offset = (offset - start) as usize;
469                        buffer[offset..offset + data.len()].copy_from_slice(&data);
470                    }
471                    return Poll::Ready(Ok(buffer));
472                }
473            }
474        }
475    }
476}
477
478/// Errors from [`RecvStream::read_to_end`]
479#[derive(Debug, Error, Clone, PartialEq, Eq)]
480pub enum ReadToEndError {
481    /// An error occurred during reading
482    #[error("read error: {0}")]
483    Read(#[from] ReadError),
484    /// The stream is larger than the user-supplied limit
485    #[error("stream too long")]
486    TooLong,
487}
488
489/* TODO: Enable when futures-io feature is added
490#[cfg(feature = "futures-io")]
491impl futures_io::AsyncRead for RecvStream {
492    fn poll_read(
493        self: Pin<&mut Self>,
494        cx: &mut Context,
495        buf: &mut [u8],
496    ) -> Poll<io::Result<usize>> {
497        let mut buf = ReadBuf::new(buf);
498        ready!(Self::poll_read_buf(self.get_mut(), cx, &mut buf))?;
499        Poll::Ready(Ok(buf.filled().len()))
500    }
501}
502*/
503
504impl tokio::io::AsyncRead for RecvStream {
505    fn poll_read(
506        self: Pin<&mut Self>,
507        cx: &mut Context<'_>,
508        buf: &mut ReadBuf<'_>,
509    ) -> Poll<io::Result<()>> {
510        ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
511        Poll::Ready(Ok(()))
512    }
513}
514
515impl Drop for RecvStream {
516    fn drop(&mut self) {
517        let mut conn = self.conn.state.lock("RecvStream::drop");
518
519        // clean up any previously registered wakers
520        conn.blocked_readers.remove(&self.stream);
521
522        if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
523            return;
524        }
525        if !self.all_data_read {
526            // Ignore ClosedStream errors
527            let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
528            conn.wake();
529        }
530    }
531}
532
533/// Errors that arise from reading from a stream.
534#[derive(Debug, Error, Clone, PartialEq, Eq)]
535pub enum ReadError {
536    /// The peer abandoned transmitting data on this stream
537    ///
538    /// Carries an application-defined error code.
539    #[error("stream reset by peer: error {0}")]
540    Reset(VarInt),
541    /// The connection was lost
542    #[error("connection lost")]
543    ConnectionLost(#[from] ConnectionError),
544    /// The stream has already been stopped, finished, or reset
545    #[error("closed stream")]
546    ClosedStream,
547    /// Attempted an ordered read following an unordered read
548    ///
549    /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
550    /// stream which cannot be recovered, making further ordered reads impossible.
551    #[error("ordered read after unordered read")]
552    IllegalOrderedRead,
553    /// This was a 0-RTT stream and the server rejected it
554    ///
555    /// Can only occur on clients for 0-RTT streams, which can be opened using
556    /// [`Connecting::into_0rtt()`].
557    ///
558    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
559    #[error("0-RTT rejected")]
560    ZeroRttRejected,
561}
562
563impl From<ReadableError> for ReadError {
564    fn from(e: ReadableError) -> Self {
565        match e {
566            ReadableError::ClosedStream => Self::ClosedStream,
567            ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
568            ReadableError::ConnectionClosed => Self::ConnectionLost(ConnectionError::LocallyClosed),
569        }
570    }
571}
572
573impl From<ResetError> for ReadError {
574    fn from(e: ResetError) -> Self {
575        match e {
576            ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
577            ResetError::ZeroRttRejected => Self::ZeroRttRejected,
578        }
579    }
580}
581
582impl From<ReadError> for io::Error {
583    fn from(x: ReadError) -> Self {
584        use ReadError::*;
585        let kind = match x {
586            Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
587            ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
588            IllegalOrderedRead => io::ErrorKind::InvalidInput,
589        };
590        Self::new(kind, x)
591    }
592}
593
594/// Errors that arise while waiting for a stream to be reset
595#[derive(Debug, Error, Clone, PartialEq, Eq)]
596pub enum ResetError {
597    /// The connection was lost
598    #[error("connection lost")]
599    ConnectionLost(#[from] ConnectionError),
600    /// This was a 0-RTT stream and the server rejected it
601    ///
602    /// Can only occur on clients for 0-RTT streams, which can be opened using
603    /// [`Connecting::into_0rtt()`].
604    ///
605    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
606    #[error("0-RTT rejected")]
607    ZeroRttRejected,
608}
609
610impl From<ResetError> for io::Error {
611    fn from(x: ResetError) -> Self {
612        use ResetError::*;
613        let kind = match x {
614            ZeroRttRejected => io::ErrorKind::ConnectionReset,
615            ConnectionLost(_) => io::ErrorKind::NotConnected,
616        };
617        Self::new(kind, x)
618    }
619}
620
621/// Future produced by [`RecvStream::read()`].
622///
623/// [`RecvStream::read()`]: crate::RecvStream::read
624struct Read<'a> {
625    stream: &'a mut RecvStream,
626    buf: ReadBuf<'a>,
627}
628
629impl Future for Read<'_> {
630    type Output = Result<Option<usize>, ReadError>;
631
632    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
633        let this = self.get_mut();
634        ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
635        match this.buf.filled().len() {
636            0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
637            n => Poll::Ready(Ok(Some(n))),
638        }
639    }
640}
641
642/// Future produced by [`RecvStream::read_exact()`].
643///
644/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
645struct ReadExact<'a> {
646    stream: &'a mut RecvStream,
647    buf: ReadBuf<'a>,
648}
649
650impl Future for ReadExact<'_> {
651    type Output = Result<(), ReadExactError>;
652    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
653        let this = self.get_mut();
654        let mut remaining = this.buf.remaining();
655        while remaining > 0 {
656            ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
657            let new = this.buf.remaining();
658            if new == remaining {
659                return Poll::Ready(Err(ReadExactError::FinishedEarly(this.buf.filled().len())));
660            }
661            remaining = new;
662        }
663        Poll::Ready(Ok(()))
664    }
665}
666
667/// Errors that arise from reading from a stream.
668#[derive(Debug, Error, Clone, PartialEq, Eq)]
669pub enum ReadExactError {
670    /// The stream finished before all bytes were read
671    #[error("stream finished early ({0} bytes read)")]
672    FinishedEarly(usize),
673    /// A read error occurred
674    #[error(transparent)]
675    ReadError(#[from] ReadError),
676}
677
678/// Future produced by [`RecvStream::read_chunk()`].
679///
680/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
681struct ReadChunk<'a> {
682    stream: &'a mut RecvStream,
683    max_length: usize,
684    ordered: bool,
685}
686
687impl Future for ReadChunk<'_> {
688    type Output = Result<Option<Chunk>, ReadError>;
689    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
690        let (max_length, ordered) = (self.max_length, self.ordered);
691        self.stream.poll_read_chunk(cx, max_length, ordered)
692    }
693}
694
695/// Future produced by [`RecvStream::read_chunks()`].
696///
697/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
698struct ReadChunks<'a> {
699    stream: &'a mut RecvStream,
700    bufs: &'a mut [Bytes],
701}
702
703impl Future for ReadChunks<'_> {
704    type Output = Result<Option<usize>, ReadError>;
705    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
706        let this = self.get_mut();
707        this.stream.poll_read_chunks(cx, this.bufs)
708    }
709}