ant_quic/high_level/recv_stream.rs
1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9 future::{Future, poll_fn},
10 io,
11 pin::Pin,
12 task::{Context, Poll, ready},
13};
14
15use crate::{Chunk, Chunks, ClosedStream, ConnectionError, ReadableError, StreamId};
16use bytes::Bytes;
17use thiserror::Error;
18use tokio::io::ReadBuf;
19
20use super::connection::ConnectionRef;
21use crate::VarInt;
22
23/// A stream that can only be used to receive data
24///
25/// `stop(0)` is implicitly called on drop unless:
26/// - A variant of [`ReadError`] has been yielded by a read call
27/// - [`stop()`] was called explicitly
28///
29/// # Cancellation
30///
31/// A `read` method is said to be *cancel-safe* when dropping its future before the future becomes
32/// ready cannot lead to loss of stream data. This is true of methods which succeed immediately when
33/// any progress is made, and is not true of methods which might need to perform multiple reads
34/// internally before succeeding. Each `read` method documents whether it is cancel-safe.
35///
36/// # Common issues
37///
38/// ## Data never received on a locally-opened stream
39///
40/// Peers are not notified of streams until they or a later-numbered stream are used to send
41/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
42/// never see it. Application protocols should always arrange for the endpoint which will first
43/// transmit on a stream to be the endpoint responsible for opening it.
44///
45/// ## Data never received on a remotely-opened stream
46///
47/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
48/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
49/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
50/// bidirectional stream 1, the first stream yielded by Connection's accept_bi method on the receiver
51/// will be bidirectional stream 0.
52///
53/// [`ReadError`]: crate::ReadError
54/// [`stop()`]: RecvStream::stop
55/// [`SendStream::finish`]: crate::SendStream::finish
56/// [`WriteError::Stopped`]: crate::WriteError::Stopped
57/// [`id`]: RecvStream::id
58/// `Connection::accept_bi`: See the Connection's accept_bi method
59#[derive(Debug)]
60pub struct RecvStream {
61 conn: ConnectionRef,
62 stream: StreamId,
63 is_0rtt: bool,
64 all_data_read: bool,
65 reset: Option<VarInt>,
66}
67
68impl RecvStream {
69 pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
70 Self {
71 conn,
72 stream,
73 is_0rtt,
74 all_data_read: false,
75 reset: None,
76 }
77 }
78
79 /// Read data contiguously from the stream.
80 ///
81 /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
82 ///
83 /// This operation is cancel-safe.
84 pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
85 Read {
86 stream: self,
87 buf: ReadBuf::new(buf),
88 }
89 .await
90 }
91
92 /// Read an exact number of bytes contiguously from the stream.
93 ///
94 /// See [`read()`] for details. This operation is *not* cancel-safe.
95 ///
96 /// [`read()`]: RecvStream::read
97 pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
98 ReadExact {
99 stream: self,
100 buf: ReadBuf::new(buf),
101 }
102 .await
103 }
104
105 /// Attempts to read from the stream into the provided buffer
106 ///
107 /// On success, returns `Poll::Ready(Ok(num_bytes_read))` and places data into `buf`. If this
108 /// returns zero bytes read (and `buf` has a non-zero length), that indicates that the remote
109 /// side has [`finish`]ed the stream and the local side has already read all bytes.
110 ///
111 /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
112 /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
113 /// closed.
114 ///
115 /// [`finish`]: crate::SendStream::finish
116 pub fn poll_read(
117 &mut self,
118 cx: &mut Context,
119 buf: &mut [u8],
120 ) -> Poll<Result<usize, ReadError>> {
121 let mut buf = ReadBuf::new(buf);
122 ready!(self.poll_read_buf(cx, &mut buf))?;
123 Poll::Ready(Ok(buf.filled().len()))
124 }
125
126 /// Attempts to read from the stream into the provided buffer, which may be uninitialized
127 ///
128 /// On success, returns `Poll::Ready(Ok(()))` and places data into the unfilled portion of
129 /// `buf`. If this does not write any bytes to `buf` (and `buf.remaining()` is non-zero), that
130 /// indicates that the remote side has [`finish`]ed the stream and the local side has already
131 /// read all bytes.
132 ///
133 /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
134 /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
135 /// closed.
136 ///
137 /// [`finish`]: crate::SendStream::finish
138 pub fn poll_read_buf(
139 &mut self,
140 cx: &mut Context,
141 buf: &mut ReadBuf<'_>,
142 ) -> Poll<Result<(), ReadError>> {
143 if buf.remaining() == 0 {
144 return Poll::Ready(Ok(()));
145 }
146
147 self.poll_read_generic(cx, true, |chunks| {
148 let mut read = false;
149 loop {
150 if buf.remaining() == 0 {
151 // We know `read` is `true` because `buf.remaining()` was not 0 before
152 return ReadStatus::Readable(());
153 }
154
155 match chunks.next(buf.remaining()) {
156 Ok(Some(chunk)) => {
157 buf.put_slice(&chunk.bytes);
158 read = true;
159 }
160 res => return (if read { Some(()) } else { None }, res.err()).into(),
161 }
162 }
163 })
164 .map(|res| res.map(|_| ()))
165 }
166
167 /// Read the next segment of data
168 ///
169 /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
170 /// offset in the stream. If `ordered` is `true`, the chunk's offset will be immediately after
171 /// the last data yielded by `read()` or `read_chunk()`. If `ordered` is `false`, segments may
172 /// be received in any order, and the `Chunk`'s `offset` field can be used to determine
173 /// ordering in the caller. Unordered reads are less prone to head-of-line blocking within a
174 /// stream, but require the application to manage reassembling the original data.
175 ///
176 /// Slightly more efficient than `read` due to not copying. Chunk boundaries do not correspond
177 /// to peer writes, and hence cannot be used as framing.
178 ///
179 /// This operation is cancel-safe.
180 pub async fn read_chunk(
181 &mut self,
182 max_length: usize,
183 ordered: bool,
184 ) -> Result<Option<Chunk>, ReadError> {
185 ReadChunk {
186 stream: self,
187 max_length,
188 ordered,
189 }
190 .await
191 }
192
193 /// Attempts to read a chunk from the stream.
194 ///
195 /// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
196 /// is returned, it implies that EOF has been reached.
197 ///
198 /// If no data is available for reading, the method returns `Poll::Pending`
199 /// and arranges for the current task (via cx.waker()) to receive a notification
200 /// when the stream becomes readable or is closed.
201 fn poll_read_chunk(
202 &mut self,
203 cx: &mut Context,
204 max_length: usize,
205 ordered: bool,
206 ) -> Poll<Result<Option<Chunk>, ReadError>> {
207 self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
208 Ok(Some(chunk)) => ReadStatus::Readable(chunk),
209 res => (None, res.err()).into(),
210 })
211 }
212
213 /// Read the next segments of data
214 ///
215 /// Fills `bufs` with the segments of data beginning immediately after the
216 /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
217 /// finished.
218 ///
219 /// Slightly more efficient than `read` due to not copying. Chunk boundaries
220 /// do not correspond to peer writes, and hence cannot be used as framing.
221 ///
222 /// This operation is cancel-safe.
223 pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
224 ReadChunks { stream: self, bufs }.await
225 }
226
227 /// Foundation of [`Self::read_chunks`]
228 fn poll_read_chunks(
229 &mut self,
230 cx: &mut Context,
231 bufs: &mut [Bytes],
232 ) -> Poll<Result<Option<usize>, ReadError>> {
233 if bufs.is_empty() {
234 return Poll::Ready(Ok(Some(0)));
235 }
236
237 self.poll_read_generic(cx, true, |chunks| {
238 let mut read = 0;
239 loop {
240 if read >= bufs.len() {
241 // We know `read > 0` because `bufs` cannot be empty here
242 return ReadStatus::Readable(read);
243 }
244
245 match chunks.next(usize::MAX) {
246 Ok(Some(chunk)) => {
247 bufs[read] = chunk.bytes;
248 read += 1;
249 }
250 res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
251 }
252 }
253 })
254 }
255
256 /// Convenience method to read all remaining data into a buffer
257 ///
258 /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
259 /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
260 /// allow. `size_limit` should be set to limit worst-case memory use.
261 ///
262 /// If unordered reads have already been made, the resulting buffer may have gaps containing
263 /// arbitrary data.
264 ///
265 /// This operation is *not* cancel-safe.
266 ///
267 /// `ReadToEndError::TooLong`: Error returned when size limit is exceeded
268 pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
269 ReadToEnd {
270 stream: self,
271 size_limit,
272 read: Vec::new(),
273 start: u64::MAX,
274 end: 0,
275 }
276 .await
277 }
278
279 /// Stop accepting data
280 ///
281 /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
282 /// attempts to operate on a stream will yield `ClosedStream` errors.
283 pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
284 let mut conn = self.conn.state.lock("RecvStream::stop");
285 if self.is_0rtt && conn.check_0rtt().is_err() {
286 return Ok(());
287 }
288 conn.inner.recv_stream(self.stream).stop(error_code)?;
289 conn.wake();
290 self.all_data_read = true;
291 Ok(())
292 }
293
294 /// Check if this stream has been opened during 0-RTT.
295 ///
296 /// In which case any non-idempotent request should be considered dangerous at the application
297 /// level. Because read data is subject to replay attacks.
298 pub fn is_0rtt(&self) -> bool {
299 self.is_0rtt
300 }
301
302 /// Get the identity of this stream
303 pub fn id(&self) -> StreamId {
304 self.stream
305 }
306
307 /// Completes when the stream has been reset by the peer or otherwise closed
308 ///
309 /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
310 /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
311 /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
312 /// which it is no longer meaningful for the stream to be reset.
313 ///
314 /// This operation is cancel-safe.
315 pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
316 poll_fn(|cx| {
317 let mut conn = self.conn.state.lock("RecvStream::reset");
318 if self.is_0rtt && conn.check_0rtt().is_err() {
319 return Poll::Ready(Err(ResetError::ZeroRttRejected));
320 }
321
322 if let Some(code) = self.reset {
323 return Poll::Ready(Ok(Some(code)));
324 }
325
326 match conn.inner.recv_stream(self.stream).received_reset() {
327 Err(_) => Poll::Ready(Ok(None)),
328 Ok(Some(error_code)) => {
329 // Stream state has just now been freed, so the connection may need to issue new
330 // stream ID flow control credit
331 conn.wake();
332 Poll::Ready(Ok(Some(error_code)))
333 }
334 Ok(None) => {
335 if let Some(e) = &conn.error {
336 return Poll::Ready(Err(e.clone().into()));
337 }
338 // Resets always notify readers, since a reset is an immediate read error. We
339 // could introduce a dedicated channel to reduce the risk of spurious wakeups,
340 // but that increased complexity is probably not justified, as an application
341 // that is expecting a reset is not likely to receive large amounts of data.
342 conn.blocked_readers.insert(self.stream, cx.waker().clone());
343 Poll::Pending
344 }
345 }
346 })
347 .await
348 }
349
350 /// Handle common logic related to reading out of a receive stream
351 ///
352 /// This takes an `FnMut` closure that takes care of the actual reading process, matching
353 /// the detailed read semantics for the calling function with a particular return type.
354 /// The closure can read from the passed `&mut Chunks` and has to return the status after
355 /// reading: the amount of data read, and the status after the final read call.
356 fn poll_read_generic<T, U>(
357 &mut self,
358 cx: &mut Context,
359 ordered: bool,
360 mut read_fn: T,
361 ) -> Poll<Result<Option<U>, ReadError>>
362 where
363 T: FnMut(&mut Chunks) -> ReadStatus<U>,
364 {
365 use crate::ReadError::*;
366 if self.all_data_read {
367 return Poll::Ready(Ok(None));
368 }
369
370 let mut conn = self.conn.state.lock("RecvStream::poll_read");
371 if self.is_0rtt {
372 conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
373 }
374
375 // If we stored an error during a previous call, return it now. This can happen if a
376 // `read_fn` both wants to return data and also returns an error in its final stream status.
377 let status = match self.reset {
378 Some(code) => ReadStatus::Failed(None, Reset(code)),
379 None => {
380 let mut recv = conn.inner.recv_stream(self.stream);
381 let mut chunks = recv.read(ordered)?;
382 let status = read_fn(&mut chunks);
383 if chunks.finalize().should_transmit() {
384 conn.wake();
385 }
386 status
387 }
388 };
389
390 match status {
391 ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
392 ReadStatus::Finished(read) => {
393 self.all_data_read = true;
394 Poll::Ready(Ok(read))
395 }
396 ReadStatus::Failed(read, Blocked) => match read {
397 Some(val) => Poll::Ready(Ok(Some(val))),
398 None => {
399 if let Some(ref x) = conn.error {
400 return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
401 }
402 conn.blocked_readers.insert(self.stream, cx.waker().clone());
403 Poll::Pending
404 }
405 },
406 ReadStatus::Failed(read, Reset(error_code)) => match read {
407 None => {
408 self.all_data_read = true;
409 self.reset = Some(error_code);
410 Poll::Ready(Err(ReadError::Reset(error_code)))
411 }
412 done => {
413 self.reset = Some(error_code);
414 Poll::Ready(Ok(done))
415 }
416 },
417 ReadStatus::Failed(_read, ConnectionClosed) => {
418 self.all_data_read = true;
419 Poll::Ready(Err(ReadError::ConnectionLost(
420 ConnectionError::LocallyClosed,
421 )))
422 }
423 }
424 }
425}
426
427enum ReadStatus<T> {
428 Readable(T),
429 Finished(Option<T>),
430 Failed(Option<T>, crate::ReadError),
431}
432
433impl<T> From<(Option<T>, Option<crate::ReadError>)> for ReadStatus<T> {
434 fn from(status: (Option<T>, Option<crate::ReadError>)) -> Self {
435 match status {
436 (read, None) => Self::Finished(read),
437 (read, Some(e)) => Self::Failed(read, e),
438 }
439 }
440}
441
442/// Future produced by `RecvStream::read_to_end()`.
443struct ReadToEnd<'a> {
444 stream: &'a mut RecvStream,
445 read: Vec<(Bytes, u64)>,
446 start: u64,
447 end: u64,
448 size_limit: usize,
449}
450
451impl Future for ReadToEnd<'_> {
452 type Output = Result<Vec<u8>, ReadToEndError>;
453 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
454 loop {
455 match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
456 Some(chunk) => {
457 self.start = self.start.min(chunk.offset);
458 let end = chunk.bytes.len() as u64 + chunk.offset;
459 if (end - self.start) > self.size_limit as u64 {
460 return Poll::Ready(Err(ReadToEndError::TooLong));
461 }
462 self.end = self.end.max(end);
463 self.read.push((chunk.bytes, chunk.offset));
464 }
465 None => {
466 if self.end == 0 {
467 // Never received anything
468 return Poll::Ready(Ok(Vec::new()));
469 }
470 let start = self.start;
471 let mut buffer = vec![0; (self.end - start) as usize];
472 for (data, offset) in self.read.drain(..) {
473 let offset = (offset - start) as usize;
474 buffer[offset..offset + data.len()].copy_from_slice(&data);
475 }
476 return Poll::Ready(Ok(buffer));
477 }
478 }
479 }
480 }
481}
482
483/// Errors from [`RecvStream::read_to_end`]
484#[derive(Debug, Error, Clone, PartialEq, Eq)]
485pub enum ReadToEndError {
486 /// An error occurred during reading
487 #[error("read error: {0}")]
488 Read(#[from] ReadError),
489 /// The stream is larger than the user-supplied limit
490 #[error("stream too long")]
491 TooLong,
492}
493
494/* TODO: Enable when futures-io feature is added
495#[cfg(feature = "futures-io")]
496impl futures_io::AsyncRead for RecvStream {
497 fn poll_read(
498 self: Pin<&mut Self>,
499 cx: &mut Context,
500 buf: &mut [u8],
501 ) -> Poll<io::Result<usize>> {
502 let mut buf = ReadBuf::new(buf);
503 ready!(Self::poll_read_buf(self.get_mut(), cx, &mut buf))?;
504 Poll::Ready(Ok(buf.filled().len()))
505 }
506}
507*/
508
509impl tokio::io::AsyncRead for RecvStream {
510 fn poll_read(
511 self: Pin<&mut Self>,
512 cx: &mut Context<'_>,
513 buf: &mut ReadBuf<'_>,
514 ) -> Poll<io::Result<()>> {
515 ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
516 Poll::Ready(Ok(()))
517 }
518}
519
520impl Drop for RecvStream {
521 fn drop(&mut self) {
522 let mut conn = self.conn.state.lock("RecvStream::drop");
523
524 // clean up any previously registered wakers
525 conn.blocked_readers.remove(&self.stream);
526
527 if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
528 return;
529 }
530 if !self.all_data_read {
531 // Ignore ClosedStream errors
532 let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
533 conn.wake();
534 }
535 }
536}
537
538/// Errors that arise from reading from a stream.
539#[derive(Debug, Error, Clone, PartialEq, Eq)]
540pub enum ReadError {
541 /// The peer abandoned transmitting data on this stream
542 ///
543 /// Carries an application-defined error code.
544 #[error("stream reset by peer: error {0}")]
545 Reset(VarInt),
546 /// The connection was lost
547 #[error("connection lost")]
548 ConnectionLost(#[from] ConnectionError),
549 /// The stream has already been stopped, finished, or reset
550 #[error("closed stream")]
551 ClosedStream,
552 /// Attempted an ordered read following an unordered read
553 ///
554 /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
555 /// stream which cannot be recovered, making further ordered reads impossible.
556 #[error("ordered read after unordered read")]
557 IllegalOrderedRead,
558 /// This was a 0-RTT stream and the server rejected it
559 ///
560 /// Can only occur on clients for 0-RTT streams, which can be opened using
561 /// [`Connecting::into_0rtt()`].
562 ///
563 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
564 #[error("0-RTT rejected")]
565 ZeroRttRejected,
566}
567
568impl From<ReadableError> for ReadError {
569 fn from(e: ReadableError) -> Self {
570 match e {
571 ReadableError::ClosedStream => Self::ClosedStream,
572 ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
573 ReadableError::ConnectionClosed => Self::ConnectionLost(ConnectionError::LocallyClosed),
574 }
575 }
576}
577
578impl From<ResetError> for ReadError {
579 fn from(e: ResetError) -> Self {
580 match e {
581 ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
582 ResetError::ZeroRttRejected => Self::ZeroRttRejected,
583 }
584 }
585}
586
587impl From<ReadError> for io::Error {
588 fn from(x: ReadError) -> Self {
589 use ReadError::*;
590 let kind = match x {
591 Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
592 ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
593 IllegalOrderedRead => io::ErrorKind::InvalidInput,
594 };
595 Self::new(kind, x)
596 }
597}
598
599/// Errors that arise while waiting for a stream to be reset
600#[derive(Debug, Error, Clone, PartialEq, Eq)]
601pub enum ResetError {
602 /// The connection was lost
603 #[error("connection lost")]
604 ConnectionLost(#[from] ConnectionError),
605 /// This was a 0-RTT stream and the server rejected it
606 ///
607 /// Can only occur on clients for 0-RTT streams, which can be opened using
608 /// [`Connecting::into_0rtt()`].
609 ///
610 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
611 #[error("0-RTT rejected")]
612 ZeroRttRejected,
613}
614
615impl From<ResetError> for io::Error {
616 fn from(x: ResetError) -> Self {
617 use ResetError::*;
618 let kind = match x {
619 ZeroRttRejected => io::ErrorKind::ConnectionReset,
620 ConnectionLost(_) => io::ErrorKind::NotConnected,
621 };
622 Self::new(kind, x)
623 }
624}
625
626/// Future produced by [`RecvStream::read()`].
627///
628/// [`RecvStream::read()`]: crate::RecvStream::read
629struct Read<'a> {
630 stream: &'a mut RecvStream,
631 buf: ReadBuf<'a>,
632}
633
634impl Future for Read<'_> {
635 type Output = Result<Option<usize>, ReadError>;
636
637 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
638 let this = self.get_mut();
639 ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
640 match this.buf.filled().len() {
641 0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
642 n => Poll::Ready(Ok(Some(n))),
643 }
644 }
645}
646
647/// Future produced by `RecvStream::read_exact()`.
648struct ReadExact<'a> {
649 stream: &'a mut RecvStream,
650 buf: ReadBuf<'a>,
651}
652
653impl Future for ReadExact<'_> {
654 type Output = Result<(), ReadExactError>;
655 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
656 let this = self.get_mut();
657 let mut remaining = this.buf.remaining();
658 while remaining > 0 {
659 ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
660 let new = this.buf.remaining();
661 if new == remaining {
662 return Poll::Ready(Err(ReadExactError::FinishedEarly(this.buf.filled().len())));
663 }
664 remaining = new;
665 }
666 Poll::Ready(Ok(()))
667 }
668}
669
670/// Errors that arise from reading from a stream.
671#[derive(Debug, Error, Clone, PartialEq, Eq)]
672pub enum ReadExactError {
673 /// The stream finished before all bytes were read
674 #[error("stream finished early ({0} bytes read)")]
675 FinishedEarly(usize),
676 /// A read error occurred
677 #[error(transparent)]
678 ReadError(#[from] ReadError),
679}
680
681/// Future produced by `RecvStream::read_chunk()`.
682struct ReadChunk<'a> {
683 stream: &'a mut RecvStream,
684 max_length: usize,
685 ordered: bool,
686}
687
688impl Future for ReadChunk<'_> {
689 type Output = Result<Option<Chunk>, ReadError>;
690 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
691 let (max_length, ordered) = (self.max_length, self.ordered);
692 self.stream.poll_read_chunk(cx, max_length, ordered)
693 }
694}
695
696/// Future produced by `RecvStream::read_chunks()`.
697struct ReadChunks<'a> {
698 stream: &'a mut RecvStream,
699 bufs: &'a mut [Bytes],
700}
701
702impl Future for ReadChunks<'_> {
703 type Output = Result<Option<usize>, ReadError>;
704 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
705 let this = self.get_mut();
706 this.stream.poll_read_chunks(cx, this.bufs)
707 }
708}