ant_quic/high_level/
recv_stream.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    future::{Future, poll_fn},
10    io,
11    pin::Pin,
12    task::{Context, Poll, ready},
13};
14
15use crate::{Chunk, Chunks, ClosedStream, ConnectionError, ReadableError, StreamId};
16use bytes::Bytes;
17use thiserror::Error;
18use tokio::io::ReadBuf;
19
20use super::connection::ConnectionRef;
21use crate::VarInt;
22
23/// A stream that can only be used to receive data
24///
25/// `stop(0)` is implicitly called on drop unless:
26/// - A variant of [`ReadError`] has been yielded by a read call
27/// - [`stop()`] was called explicitly
28///
29/// # Cancellation
30///
31/// A `read` method is said to be *cancel-safe* when dropping its future before the future becomes
32/// ready cannot lead to loss of stream data. This is true of methods which succeed immediately when
33/// any progress is made, and is not true of methods which might need to perform multiple reads
34/// internally before succeeding. Each `read` method documents whether it is cancel-safe.
35///
36/// # Common issues
37///
38/// ## Data never received on a locally-opened stream
39///
40/// Peers are not notified of streams until they or a later-numbered stream are used to send
41/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
42/// never see it. Application protocols should always arrange for the endpoint which will first
43/// transmit on a stream to be the endpoint responsible for opening it.
44///
45/// ## Data never received on a remotely-opened stream
46///
47/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
48/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
49/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
50/// bidirectional stream 1, the first stream yielded by Connection's accept_bi method on the receiver
51/// will be bidirectional stream 0.
52///
53/// [`ReadError`]: crate::ReadError
54/// [`stop()`]: RecvStream::stop
55/// [`SendStream::finish`]: crate::SendStream::finish
56/// [`WriteError::Stopped`]: crate::WriteError::Stopped
57/// [`id`]: RecvStream::id
58/// `Connection::accept_bi`: See the Connection's accept_bi method
59#[derive(Debug)]
60pub struct RecvStream {
61    conn: ConnectionRef,
62    stream: StreamId,
63    is_0rtt: bool,
64    all_data_read: bool,
65    reset: Option<VarInt>,
66}
67
68impl RecvStream {
69    pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
70        Self {
71            conn,
72            stream,
73            is_0rtt,
74            all_data_read: false,
75            reset: None,
76        }
77    }
78
79    /// Read data contiguously from the stream.
80    ///
81    /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
82    ///
83    /// This operation is cancel-safe.
84    pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
85        Read {
86            stream: self,
87            buf: ReadBuf::new(buf),
88        }
89        .await
90    }
91
92    /// Read an exact number of bytes contiguously from the stream.
93    ///
94    /// See [`read()`] for details. This operation is *not* cancel-safe.
95    ///
96    /// [`read()`]: RecvStream::read
97    pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
98        ReadExact {
99            stream: self,
100            buf: ReadBuf::new(buf),
101        }
102        .await
103    }
104
105    /// Attempts to read from the stream into the provided buffer
106    ///
107    /// On success, returns `Poll::Ready(Ok(num_bytes_read))` and places data into `buf`. If this
108    /// returns zero bytes read (and `buf` has a non-zero length), that indicates that the remote
109    /// side has [`finish`]ed the stream and the local side has already read all bytes.
110    ///
111    /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
112    /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
113    /// closed.
114    ///
115    /// [`finish`]: crate::SendStream::finish
116    pub fn poll_read(
117        &mut self,
118        cx: &mut Context,
119        buf: &mut [u8],
120    ) -> Poll<Result<usize, ReadError>> {
121        let mut buf = ReadBuf::new(buf);
122        ready!(self.poll_read_buf(cx, &mut buf))?;
123        Poll::Ready(Ok(buf.filled().len()))
124    }
125
126    /// Attempts to read from the stream into the provided buffer, which may be uninitialized
127    ///
128    /// On success, returns `Poll::Ready(Ok(()))` and places data into the unfilled portion of
129    /// `buf`. If this does not write any bytes to `buf` (and `buf.remaining()` is non-zero), that
130    /// indicates that the remote side has [`finish`]ed the stream and the local side has already
131    /// read all bytes.
132    ///
133    /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
134    /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
135    /// closed.
136    ///
137    /// [`finish`]: crate::SendStream::finish
138    pub fn poll_read_buf(
139        &mut self,
140        cx: &mut Context,
141        buf: &mut ReadBuf<'_>,
142    ) -> Poll<Result<(), ReadError>> {
143        if buf.remaining() == 0 {
144            return Poll::Ready(Ok(()));
145        }
146
147        self.poll_read_generic(cx, true, |chunks| {
148            let mut read = false;
149            loop {
150                if buf.remaining() == 0 {
151                    // We know `read` is `true` because `buf.remaining()` was not 0 before
152                    return ReadStatus::Readable(());
153                }
154
155                match chunks.next(buf.remaining()) {
156                    Ok(Some(chunk)) => {
157                        buf.put_slice(&chunk.bytes);
158                        read = true;
159                    }
160                    res => return (if read { Some(()) } else { None }, res.err()).into(),
161                }
162            }
163        })
164        .map(|res| res.map(|_| ()))
165    }
166
167    /// Read the next segment of data
168    ///
169    /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
170    /// offset in the stream. If `ordered` is `true`, the chunk's offset will be immediately after
171    /// the last data yielded by `read()` or `read_chunk()`. If `ordered` is `false`, segments may
172    /// be received in any order, and the `Chunk`'s `offset` field can be used to determine
173    /// ordering in the caller. Unordered reads are less prone to head-of-line blocking within a
174    /// stream, but require the application to manage reassembling the original data.
175    ///
176    /// Slightly more efficient than `read` due to not copying. Chunk boundaries do not correspond
177    /// to peer writes, and hence cannot be used as framing.
178    ///
179    /// This operation is cancel-safe.
180    pub async fn read_chunk(
181        &mut self,
182        max_length: usize,
183        ordered: bool,
184    ) -> Result<Option<Chunk>, ReadError> {
185        ReadChunk {
186            stream: self,
187            max_length,
188            ordered,
189        }
190        .await
191    }
192
193    /// Attempts to read a chunk from the stream.
194    ///
195    /// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
196    /// is returned, it implies that EOF has been reached.
197    ///
198    /// If no data is available for reading, the method returns `Poll::Pending`
199    /// and arranges for the current task (via cx.waker()) to receive a notification
200    /// when the stream becomes readable or is closed.
201    fn poll_read_chunk(
202        &mut self,
203        cx: &mut Context,
204        max_length: usize,
205        ordered: bool,
206    ) -> Poll<Result<Option<Chunk>, ReadError>> {
207        self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
208            Ok(Some(chunk)) => ReadStatus::Readable(chunk),
209            res => (None, res.err()).into(),
210        })
211    }
212
213    /// Read the next segments of data
214    ///
215    /// Fills `bufs` with the segments of data beginning immediately after the
216    /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
217    /// finished.
218    ///
219    /// Slightly more efficient than `read` due to not copying. Chunk boundaries
220    /// do not correspond to peer writes, and hence cannot be used as framing.
221    ///
222    /// This operation is cancel-safe.
223    pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
224        ReadChunks { stream: self, bufs }.await
225    }
226
227    /// Foundation of [`Self::read_chunks`]
228    fn poll_read_chunks(
229        &mut self,
230        cx: &mut Context,
231        bufs: &mut [Bytes],
232    ) -> Poll<Result<Option<usize>, ReadError>> {
233        if bufs.is_empty() {
234            return Poll::Ready(Ok(Some(0)));
235        }
236
237        self.poll_read_generic(cx, true, |chunks| {
238            let mut read = 0;
239            loop {
240                if read >= bufs.len() {
241                    // We know `read > 0` because `bufs` cannot be empty here
242                    return ReadStatus::Readable(read);
243                }
244
245                match chunks.next(usize::MAX) {
246                    Ok(Some(chunk)) => {
247                        bufs[read] = chunk.bytes;
248                        read += 1;
249                    }
250                    res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
251                }
252            }
253        })
254    }
255
256    /// Convenience method to read all remaining data into a buffer
257    ///
258    /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
259    /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
260    /// allow. `size_limit` should be set to limit worst-case memory use.
261    ///
262    /// If unordered reads have already been made, the resulting buffer may have gaps containing
263    /// arbitrary data.
264    ///
265    /// This operation is *not* cancel-safe.
266    ///
267    /// `ReadToEndError::TooLong`: Error returned when size limit is exceeded
268    pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
269        ReadToEnd {
270            stream: self,
271            size_limit,
272            read: Vec::new(),
273            start: u64::MAX,
274            end: 0,
275        }
276        .await
277    }
278
279    /// Stop accepting data
280    ///
281    /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
282    /// attempts to operate on a stream will yield `ClosedStream` errors.
283    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
284        let mut conn = self.conn.state.lock("RecvStream::stop");
285        if self.is_0rtt && conn.check_0rtt().is_err() {
286            return Ok(());
287        }
288        conn.inner.recv_stream(self.stream).stop(error_code)?;
289        conn.wake();
290        self.all_data_read = true;
291        Ok(())
292    }
293
294    /// Check if this stream has been opened during 0-RTT.
295    ///
296    /// In which case any non-idempotent request should be considered dangerous at the application
297    /// level. Because read data is subject to replay attacks.
298    pub fn is_0rtt(&self) -> bool {
299        self.is_0rtt
300    }
301
302    /// Get the identity of this stream
303    pub fn id(&self) -> StreamId {
304        self.stream
305    }
306
307    /// Completes when the stream has been reset by the peer or otherwise closed
308    ///
309    /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
310    /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
311    /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
312    /// which it is no longer meaningful for the stream to be reset.
313    ///
314    /// This operation is cancel-safe.
315    pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
316        poll_fn(|cx| {
317            let mut conn = self.conn.state.lock("RecvStream::reset");
318            if self.is_0rtt && conn.check_0rtt().is_err() {
319                return Poll::Ready(Err(ResetError::ZeroRttRejected));
320            }
321
322            if let Some(code) = self.reset {
323                return Poll::Ready(Ok(Some(code)));
324            }
325
326            match conn.inner.recv_stream(self.stream).received_reset() {
327                Err(_) => Poll::Ready(Ok(None)),
328                Ok(Some(error_code)) => {
329                    // Stream state has just now been freed, so the connection may need to issue new
330                    // stream ID flow control credit
331                    conn.wake();
332                    Poll::Ready(Ok(Some(error_code)))
333                }
334                Ok(None) => {
335                    if let Some(e) = &conn.error {
336                        return Poll::Ready(Err(e.clone().into()));
337                    }
338                    // Resets always notify readers, since a reset is an immediate read error. We
339                    // could introduce a dedicated channel to reduce the risk of spurious wakeups,
340                    // but that increased complexity is probably not justified, as an application
341                    // that is expecting a reset is not likely to receive large amounts of data.
342                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
343                    Poll::Pending
344                }
345            }
346        })
347        .await
348    }
349
350    /// Handle common logic related to reading out of a receive stream
351    ///
352    /// This takes an `FnMut` closure that takes care of the actual reading process, matching
353    /// the detailed read semantics for the calling function with a particular return type.
354    /// The closure can read from the passed `&mut Chunks` and has to return the status after
355    /// reading: the amount of data read, and the status after the final read call.
356    fn poll_read_generic<T, U>(
357        &mut self,
358        cx: &mut Context,
359        ordered: bool,
360        mut read_fn: T,
361    ) -> Poll<Result<Option<U>, ReadError>>
362    where
363        T: FnMut(&mut Chunks) -> ReadStatus<U>,
364    {
365        use crate::ReadError::*;
366        if self.all_data_read {
367            return Poll::Ready(Ok(None));
368        }
369
370        let mut conn = self.conn.state.lock("RecvStream::poll_read");
371        if self.is_0rtt {
372            conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
373        }
374
375        // If we stored an error during a previous call, return it now. This can happen if a
376        // `read_fn` both wants to return data and also returns an error in its final stream status.
377        let status = match self.reset {
378            Some(code) => ReadStatus::Failed(None, Reset(code)),
379            None => {
380                let mut recv = conn.inner.recv_stream(self.stream);
381                let mut chunks = recv.read(ordered)?;
382                let status = read_fn(&mut chunks);
383                if chunks.finalize().should_transmit() {
384                    conn.wake();
385                }
386                status
387            }
388        };
389
390        match status {
391            ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
392            ReadStatus::Finished(read) => {
393                self.all_data_read = true;
394                Poll::Ready(Ok(read))
395            }
396            ReadStatus::Failed(read, Blocked) => match read {
397                Some(val) => Poll::Ready(Ok(Some(val))),
398                None => {
399                    if let Some(ref x) = conn.error {
400                        return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
401                    }
402                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
403                    Poll::Pending
404                }
405            },
406            ReadStatus::Failed(read, Reset(error_code)) => match read {
407                None => {
408                    self.all_data_read = true;
409                    self.reset = Some(error_code);
410                    Poll::Ready(Err(ReadError::Reset(error_code)))
411                }
412                done => {
413                    self.reset = Some(error_code);
414                    Poll::Ready(Ok(done))
415                }
416            },
417            ReadStatus::Failed(_read, ConnectionClosed) => {
418                self.all_data_read = true;
419                Poll::Ready(Err(ReadError::ConnectionLost(
420                    ConnectionError::LocallyClosed,
421                )))
422            }
423        }
424    }
425}
426
427enum ReadStatus<T> {
428    Readable(T),
429    Finished(Option<T>),
430    Failed(Option<T>, crate::ReadError),
431}
432
433impl<T> From<(Option<T>, Option<crate::ReadError>)> for ReadStatus<T> {
434    fn from(status: (Option<T>, Option<crate::ReadError>)) -> Self {
435        match status {
436            (read, None) => Self::Finished(read),
437            (read, Some(e)) => Self::Failed(read, e),
438        }
439    }
440}
441
442/// Future produced by [`RecvStream::read_to_end()`].
443///
444/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
445struct ReadToEnd<'a> {
446    stream: &'a mut RecvStream,
447    read: Vec<(Bytes, u64)>,
448    start: u64,
449    end: u64,
450    size_limit: usize,
451}
452
453impl Future for ReadToEnd<'_> {
454    type Output = Result<Vec<u8>, ReadToEndError>;
455    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
456        loop {
457            match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
458                Some(chunk) => {
459                    self.start = self.start.min(chunk.offset);
460                    let end = chunk.bytes.len() as u64 + chunk.offset;
461                    if (end - self.start) > self.size_limit as u64 {
462                        return Poll::Ready(Err(ReadToEndError::TooLong));
463                    }
464                    self.end = self.end.max(end);
465                    self.read.push((chunk.bytes, chunk.offset));
466                }
467                None => {
468                    if self.end == 0 {
469                        // Never received anything
470                        return Poll::Ready(Ok(Vec::new()));
471                    }
472                    let start = self.start;
473                    let mut buffer = vec![0; (self.end - start) as usize];
474                    for (data, offset) in self.read.drain(..) {
475                        let offset = (offset - start) as usize;
476                        buffer[offset..offset + data.len()].copy_from_slice(&data);
477                    }
478                    return Poll::Ready(Ok(buffer));
479                }
480            }
481        }
482    }
483}
484
485/// Errors from [`RecvStream::read_to_end`]
486#[derive(Debug, Error, Clone, PartialEq, Eq)]
487pub enum ReadToEndError {
488    /// An error occurred during reading
489    #[error("read error: {0}")]
490    Read(#[from] ReadError),
491    /// The stream is larger than the user-supplied limit
492    #[error("stream too long")]
493    TooLong,
494}
495
496/* TODO: Enable when futures-io feature is added
497#[cfg(feature = "futures-io")]
498impl futures_io::AsyncRead for RecvStream {
499    fn poll_read(
500        self: Pin<&mut Self>,
501        cx: &mut Context,
502        buf: &mut [u8],
503    ) -> Poll<io::Result<usize>> {
504        let mut buf = ReadBuf::new(buf);
505        ready!(Self::poll_read_buf(self.get_mut(), cx, &mut buf))?;
506        Poll::Ready(Ok(buf.filled().len()))
507    }
508}
509*/
510
511impl tokio::io::AsyncRead for RecvStream {
512    fn poll_read(
513        self: Pin<&mut Self>,
514        cx: &mut Context<'_>,
515        buf: &mut ReadBuf<'_>,
516    ) -> Poll<io::Result<()>> {
517        ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
518        Poll::Ready(Ok(()))
519    }
520}
521
522impl Drop for RecvStream {
523    fn drop(&mut self) {
524        let mut conn = self.conn.state.lock("RecvStream::drop");
525
526        // clean up any previously registered wakers
527        conn.blocked_readers.remove(&self.stream);
528
529        if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
530            return;
531        }
532        if !self.all_data_read {
533            // Ignore ClosedStream errors
534            let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
535            conn.wake();
536        }
537    }
538}
539
540/// Errors that arise from reading from a stream.
541#[derive(Debug, Error, Clone, PartialEq, Eq)]
542pub enum ReadError {
543    /// The peer abandoned transmitting data on this stream
544    ///
545    /// Carries an application-defined error code.
546    #[error("stream reset by peer: error {0}")]
547    Reset(VarInt),
548    /// The connection was lost
549    #[error("connection lost")]
550    ConnectionLost(#[from] ConnectionError),
551    /// The stream has already been stopped, finished, or reset
552    #[error("closed stream")]
553    ClosedStream,
554    /// Attempted an ordered read following an unordered read
555    ///
556    /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
557    /// stream which cannot be recovered, making further ordered reads impossible.
558    #[error("ordered read after unordered read")]
559    IllegalOrderedRead,
560    /// This was a 0-RTT stream and the server rejected it
561    ///
562    /// Can only occur on clients for 0-RTT streams, which can be opened using
563    /// [`Connecting::into_0rtt()`].
564    ///
565    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
566    #[error("0-RTT rejected")]
567    ZeroRttRejected,
568}
569
570impl From<ReadableError> for ReadError {
571    fn from(e: ReadableError) -> Self {
572        match e {
573            ReadableError::ClosedStream => Self::ClosedStream,
574            ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
575            ReadableError::ConnectionClosed => Self::ConnectionLost(ConnectionError::LocallyClosed),
576        }
577    }
578}
579
580impl From<ResetError> for ReadError {
581    fn from(e: ResetError) -> Self {
582        match e {
583            ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
584            ResetError::ZeroRttRejected => Self::ZeroRttRejected,
585        }
586    }
587}
588
589impl From<ReadError> for io::Error {
590    fn from(x: ReadError) -> Self {
591        use ReadError::*;
592        let kind = match x {
593            Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
594            ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
595            IllegalOrderedRead => io::ErrorKind::InvalidInput,
596        };
597        Self::new(kind, x)
598    }
599}
600
601/// Errors that arise while waiting for a stream to be reset
602#[derive(Debug, Error, Clone, PartialEq, Eq)]
603pub enum ResetError {
604    /// The connection was lost
605    #[error("connection lost")]
606    ConnectionLost(#[from] ConnectionError),
607    /// This was a 0-RTT stream and the server rejected it
608    ///
609    /// Can only occur on clients for 0-RTT streams, which can be opened using
610    /// [`Connecting::into_0rtt()`].
611    ///
612    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
613    #[error("0-RTT rejected")]
614    ZeroRttRejected,
615}
616
617impl From<ResetError> for io::Error {
618    fn from(x: ResetError) -> Self {
619        use ResetError::*;
620        let kind = match x {
621            ZeroRttRejected => io::ErrorKind::ConnectionReset,
622            ConnectionLost(_) => io::ErrorKind::NotConnected,
623        };
624        Self::new(kind, x)
625    }
626}
627
628/// Future produced by [`RecvStream::read()`].
629///
630/// [`RecvStream::read()`]: crate::RecvStream::read
631struct Read<'a> {
632    stream: &'a mut RecvStream,
633    buf: ReadBuf<'a>,
634}
635
636impl Future for Read<'_> {
637    type Output = Result<Option<usize>, ReadError>;
638
639    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
640        let this = self.get_mut();
641        ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
642        match this.buf.filled().len() {
643            0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
644            n => Poll::Ready(Ok(Some(n))),
645        }
646    }
647}
648
649/// Future produced by [`RecvStream::read_exact()`].
650///
651/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
652struct ReadExact<'a> {
653    stream: &'a mut RecvStream,
654    buf: ReadBuf<'a>,
655}
656
657impl Future for ReadExact<'_> {
658    type Output = Result<(), ReadExactError>;
659    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
660        let this = self.get_mut();
661        let mut remaining = this.buf.remaining();
662        while remaining > 0 {
663            ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
664            let new = this.buf.remaining();
665            if new == remaining {
666                return Poll::Ready(Err(ReadExactError::FinishedEarly(this.buf.filled().len())));
667            }
668            remaining = new;
669        }
670        Poll::Ready(Ok(()))
671    }
672}
673
674/// Errors that arise from reading from a stream.
675#[derive(Debug, Error, Clone, PartialEq, Eq)]
676pub enum ReadExactError {
677    /// The stream finished before all bytes were read
678    #[error("stream finished early ({0} bytes read)")]
679    FinishedEarly(usize),
680    /// A read error occurred
681    #[error(transparent)]
682    ReadError(#[from] ReadError),
683}
684
685/// Future produced by [`RecvStream::read_chunk()`].
686///
687/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
688struct ReadChunk<'a> {
689    stream: &'a mut RecvStream,
690    max_length: usize,
691    ordered: bool,
692}
693
694impl Future for ReadChunk<'_> {
695    type Output = Result<Option<Chunk>, ReadError>;
696    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
697        let (max_length, ordered) = (self.max_length, self.ordered);
698        self.stream.poll_read_chunk(cx, max_length, ordered)
699    }
700}
701
702/// Future produced by [`RecvStream::read_chunks()`].
703///
704/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
705struct ReadChunks<'a> {
706    stream: &'a mut RecvStream,
707    bufs: &'a mut [Bytes],
708}
709
710impl Future for ReadChunks<'_> {
711    type Output = Result<Option<usize>, ReadError>;
712    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
713        let this = self.get_mut();
714        this.stream.poll_read_chunks(cx, this.bufs)
715    }
716}