iroh_quinn/recv_stream.rs
1use std::{
2 future::{poll_fn, Future},
3 io,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use proto::{Chunk, Chunks, ClosedStream, ConnectionError, ReadableError, StreamId};
10use thiserror::Error;
11use tokio::io::ReadBuf;
12
13use crate::{connection::ConnectionRef, VarInt};
14
15/// A stream that can only be used to receive data
16///
17/// `stop(0)` is implicitly called on drop unless:
18/// - A variant of [`ReadError`] has been yielded by a read call
19/// - [`stop()`] was called explicitly
20///
21/// # Cancellation
22///
23/// A `read` method is said to be *cancel-safe* when dropping its future before the future becomes
24/// ready cannot lead to loss of stream data. This is true of methods which succeed immediately when
25/// any progress is made, and is not true of methods which might need to perform multiple reads
26/// internally before succeeding. Each `read` method documents whether it is cancel-safe.
27///
28/// # Common issues
29///
30/// ## Data never received on a locally-opened stream
31///
32/// Peers are not notified of streams until they or a later-numbered stream are used to send
33/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
34/// never see it. Application protocols should always arrange for the endpoint which will first
35/// transmit on a stream to be the endpoint responsible for opening it.
36///
37/// ## Data never received on a remotely-opened stream
38///
39/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
40/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
41/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
42/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
43/// will be bidirectional stream 0.
44///
45/// [`ReadError`]: crate::ReadError
46/// [`stop()`]: RecvStream::stop
47/// [`SendStream::finish`]: crate::SendStream::finish
48/// [`WriteError::Stopped`]: crate::WriteError::Stopped
49/// [`id`]: RecvStream::id
50/// [`Connection::accept_bi`]: crate::Connection::accept_bi
51#[derive(Debug)]
52pub struct RecvStream {
53 conn: ConnectionRef,
54 stream: StreamId,
55 is_0rtt: bool,
56 all_data_read: bool,
57 reset: Option<VarInt>,
58}
59
60impl RecvStream {
61 pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
62 Self {
63 conn,
64 stream,
65 is_0rtt,
66 all_data_read: false,
67 reset: None,
68 }
69 }
70
71 /// Read data contiguously from the stream.
72 ///
73 /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
74 ///
75 /// This operation is cancel-safe.
76 pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
77 Read {
78 stream: self,
79 buf: ReadBuf::new(buf),
80 }
81 .await
82 }
83
84 /// Read an exact number of bytes contiguously from the stream.
85 ///
86 /// See [`read()`] for details. This operation is *not* cancel-safe.
87 ///
88 /// [`read()`]: RecvStream::read
89 pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
90 ReadExact {
91 stream: self,
92 buf: ReadBuf::new(buf),
93 }
94 .await
95 }
96
97 /// Attempts to read from the stream into buf.
98 ///
99 /// On success, returns Poll::Ready(Ok(num_bytes_read)) and places data in
100 /// the buf. If no data was read, it implies that EOF has been reached.
101 ///
102 /// If no data is available for reading, the method returns Poll::Pending
103 /// and arranges for the current task (via cx.waker()) to receive a notification
104 /// when the stream becomes readable or is closed.
105 pub fn poll_read(
106 &mut self,
107 cx: &mut Context,
108 buf: &mut [u8],
109 ) -> Poll<Result<usize, ReadError>> {
110 let mut buf = ReadBuf::new(buf);
111 ready!(self.poll_read_buf(cx, &mut buf))?;
112 Poll::Ready(Ok(buf.filled().len()))
113 }
114
115 fn poll_read_buf(
116 &mut self,
117 cx: &mut Context,
118 buf: &mut ReadBuf<'_>,
119 ) -> Poll<Result<(), ReadError>> {
120 if buf.remaining() == 0 {
121 return Poll::Ready(Ok(()));
122 }
123
124 self.poll_read_generic(cx, true, |chunks| {
125 let mut read = false;
126 loop {
127 if buf.remaining() == 0 {
128 // We know `read` is `true` because `buf.remaining()` was not 0 before
129 return ReadStatus::Readable(());
130 }
131
132 match chunks.next(buf.remaining()) {
133 Ok(Some(chunk)) => {
134 buf.put_slice(&chunk.bytes);
135 read = true;
136 }
137 res => return (if read { Some(()) } else { None }, res.err()).into(),
138 }
139 }
140 })
141 .map(|res| res.map(|_| ()))
142 }
143
144 /// Read the next segment of data
145 ///
146 /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
147 /// offset in the stream. If `ordered` is `true`, the chunk's offset will be immediately after
148 /// the last data yielded by `read()` or `read_chunk()`. If `ordered` is `false`, segments may
149 /// be received in any order, and the `Chunk`'s `offset` field can be used to determine
150 /// ordering in the caller. Unordered reads are less prone to head-of-line blocking within a
151 /// stream, but require the application to manage reassembling the original data.
152 ///
153 /// Slightly more efficient than `read` due to not copying. Chunk boundaries do not correspond
154 /// to peer writes, and hence cannot be used as framing.
155 ///
156 /// This operation is cancel-safe.
157 pub async fn read_chunk(
158 &mut self,
159 max_length: usize,
160 ordered: bool,
161 ) -> Result<Option<Chunk>, ReadError> {
162 ReadChunk {
163 stream: self,
164 max_length,
165 ordered,
166 }
167 .await
168 }
169
170 /// Attempts to read a chunk from the stream.
171 ///
172 /// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
173 /// is returned, it implies that EOF has been reached.
174 ///
175 /// If no data is available for reading, the method returns `Poll::Pending`
176 /// and arranges for the current task (via cx.waker()) to receive a notification
177 /// when the stream becomes readable or is closed.
178 fn poll_read_chunk(
179 &mut self,
180 cx: &mut Context,
181 max_length: usize,
182 ordered: bool,
183 ) -> Poll<Result<Option<Chunk>, ReadError>> {
184 self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
185 Ok(Some(chunk)) => ReadStatus::Readable(chunk),
186 res => (None, res.err()).into(),
187 })
188 }
189
190 /// Read the next segments of data
191 ///
192 /// Fills `bufs` with the segments of data beginning immediately after the
193 /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
194 /// finished.
195 ///
196 /// Slightly more efficient than `read` due to not copying. Chunk boundaries
197 /// do not correspond to peer writes, and hence cannot be used as framing.
198 ///
199 /// This operation is cancel-safe.
200 pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
201 ReadChunks { stream: self, bufs }.await
202 }
203
204 /// Foundation of [`Self::read_chunks`]
205 fn poll_read_chunks(
206 &mut self,
207 cx: &mut Context,
208 bufs: &mut [Bytes],
209 ) -> Poll<Result<Option<usize>, ReadError>> {
210 if bufs.is_empty() {
211 return Poll::Ready(Ok(Some(0)));
212 }
213
214 self.poll_read_generic(cx, true, |chunks| {
215 let mut read = 0;
216 loop {
217 if read >= bufs.len() {
218 // We know `read > 0` because `bufs` cannot be empty here
219 return ReadStatus::Readable(read);
220 }
221
222 match chunks.next(usize::MAX) {
223 Ok(Some(chunk)) => {
224 bufs[read] = chunk.bytes;
225 read += 1;
226 }
227 res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
228 }
229 }
230 })
231 }
232
233 /// Convenience method to read all remaining data into a buffer
234 ///
235 /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
236 /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
237 /// allow. `size_limit` should be set to limit worst-case memory use.
238 ///
239 /// If unordered reads have already been made, the resulting buffer may have gaps containing
240 /// arbitrary data.
241 ///
242 /// This operation is *not* cancel-safe.
243 ///
244 /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
245 pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
246 ReadToEnd {
247 stream: self,
248 size_limit,
249 read: Vec::new(),
250 start: u64::MAX,
251 end: 0,
252 }
253 .await
254 }
255
256 /// Stop accepting data
257 ///
258 /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
259 /// attempts to operate on a stream will yield `ClosedStream` errors.
260 pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
261 let mut conn = self.conn.state.lock("RecvStream::stop");
262 if self.is_0rtt && conn.check_0rtt().is_err() {
263 return Ok(());
264 }
265 conn.inner.recv_stream(self.stream).stop(error_code)?;
266 conn.wake();
267 self.all_data_read = true;
268 Ok(())
269 }
270
271 /// Check if this stream has been opened during 0-RTT.
272 ///
273 /// In which case any non-idempotent request should be considered dangerous at the application
274 /// level. Because read data is subject to replay attacks.
275 pub fn is_0rtt(&self) -> bool {
276 self.is_0rtt
277 }
278
279 /// Get the identity of this stream
280 pub fn id(&self) -> StreamId {
281 self.stream
282 }
283
284 /// Completes when the stream has been reset by the peer or otherwise closed
285 ///
286 /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
287 /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
288 /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
289 /// which it is no longer meaningful for the stream to be reset.
290 ///
291 /// This operation is cancel-safe.
292 pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
293 poll_fn(|cx| {
294 let mut conn = self.conn.state.lock("RecvStream::reset");
295 if self.is_0rtt && conn.check_0rtt().is_err() {
296 return Poll::Ready(Err(ResetError::ZeroRttRejected));
297 }
298
299 if let Some(code) = self.reset {
300 return Poll::Ready(Ok(Some(code)));
301 }
302
303 match conn.inner.recv_stream(self.stream).received_reset() {
304 Err(_) => Poll::Ready(Ok(None)),
305 Ok(Some(error_code)) => {
306 // Stream state has just now been freed, so the connection may need to issue new
307 // stream ID flow control credit
308 conn.wake();
309 Poll::Ready(Ok(Some(error_code)))
310 }
311 Ok(None) => {
312 if let Some(e) = &conn.error {
313 return Poll::Ready(Err(e.clone().into()));
314 }
315 // Resets always notify readers, since a reset is an immediate read error. We
316 // could introduce a dedicated channel to reduce the risk of spurious wakeups,
317 // but that increased complexity is probably not justified, as an application
318 // that is expecting a reset is not likely to receive large amounts of data.
319 conn.blocked_readers.insert(self.stream, cx.waker().clone());
320 Poll::Pending
321 }
322 }
323 })
324 .await
325 }
326
327 /// Handle common logic related to reading out of a receive stream
328 ///
329 /// This takes an `FnMut` closure that takes care of the actual reading process, matching
330 /// the detailed read semantics for the calling function with a particular return type.
331 /// The closure can read from the passed `&mut Chunks` and has to return the status after
332 /// reading: the amount of data read, and the status after the final read call.
333 fn poll_read_generic<T, U>(
334 &mut self,
335 cx: &mut Context,
336 ordered: bool,
337 mut read_fn: T,
338 ) -> Poll<Result<Option<U>, ReadError>>
339 where
340 T: FnMut(&mut Chunks) -> ReadStatus<U>,
341 {
342 use proto::ReadError::*;
343 if self.all_data_read {
344 return Poll::Ready(Ok(None));
345 }
346
347 let mut conn = self.conn.state.lock("RecvStream::poll_read");
348 if self.is_0rtt {
349 conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
350 }
351
352 // If we stored an error during a previous call, return it now. This can happen if a
353 // `read_fn` both wants to return data and also returns an error in its final stream status.
354 let status = match self.reset {
355 Some(code) => ReadStatus::Failed(None, Reset(code)),
356 None => {
357 let mut recv = conn.inner.recv_stream(self.stream);
358 let mut chunks = recv.read(ordered)?;
359 let status = read_fn(&mut chunks);
360 if chunks.finalize().should_transmit() {
361 conn.wake();
362 }
363 status
364 }
365 };
366
367 match status {
368 ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
369 ReadStatus::Finished(read) => {
370 self.all_data_read = true;
371 Poll::Ready(Ok(read))
372 }
373 ReadStatus::Failed(read, Blocked) => match read {
374 Some(val) => Poll::Ready(Ok(Some(val))),
375 None => {
376 if let Some(ref x) = conn.error {
377 return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
378 }
379 conn.blocked_readers.insert(self.stream, cx.waker().clone());
380 Poll::Pending
381 }
382 },
383 ReadStatus::Failed(read, Reset(error_code)) => match read {
384 None => {
385 self.all_data_read = true;
386 self.reset = Some(error_code);
387 Poll::Ready(Err(ReadError::Reset(error_code)))
388 }
389 done => {
390 self.reset = Some(error_code);
391 Poll::Ready(Ok(done))
392 }
393 },
394 }
395 }
396}
397
398enum ReadStatus<T> {
399 Readable(T),
400 Finished(Option<T>),
401 Failed(Option<T>, proto::ReadError),
402}
403
404impl<T> From<(Option<T>, Option<proto::ReadError>)> for ReadStatus<T> {
405 fn from(status: (Option<T>, Option<proto::ReadError>)) -> Self {
406 match status {
407 (read, None) => Self::Finished(read),
408 (read, Some(e)) => Self::Failed(read, e),
409 }
410 }
411}
412
413/// Future produced by [`RecvStream::read_to_end()`].
414///
415/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
416struct ReadToEnd<'a> {
417 stream: &'a mut RecvStream,
418 read: Vec<(Bytes, u64)>,
419 start: u64,
420 end: u64,
421 size_limit: usize,
422}
423
424impl Future for ReadToEnd<'_> {
425 type Output = Result<Vec<u8>, ReadToEndError>;
426 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
427 loop {
428 match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
429 Some(chunk) => {
430 self.start = self.start.min(chunk.offset);
431 let end = chunk.bytes.len() as u64 + chunk.offset;
432 if (end - self.start) > self.size_limit as u64 {
433 return Poll::Ready(Err(ReadToEndError::TooLong));
434 }
435 self.end = self.end.max(end);
436 self.read.push((chunk.bytes, chunk.offset));
437 }
438 None => {
439 if self.end == 0 {
440 // Never received anything
441 return Poll::Ready(Ok(Vec::new()));
442 }
443 let start = self.start;
444 let mut buffer = vec![0; (self.end - start) as usize];
445 for (data, offset) in self.read.drain(..) {
446 let offset = (offset - start) as usize;
447 buffer[offset..offset + data.len()].copy_from_slice(&data);
448 }
449 return Poll::Ready(Ok(buffer));
450 }
451 }
452 }
453 }
454}
455
456/// Errors from [`RecvStream::read_to_end`]
457#[derive(Debug, Error, Clone, PartialEq, Eq)]
458pub enum ReadToEndError {
459 /// An error occurred during reading
460 #[error("read error: {0}")]
461 Read(#[from] ReadError),
462 /// The stream is larger than the user-supplied limit
463 #[error("stream too long")]
464 TooLong,
465}
466
467#[cfg(feature = "futures-io")]
468impl futures_io::AsyncRead for RecvStream {
469 fn poll_read(
470 self: Pin<&mut Self>,
471 cx: &mut Context,
472 buf: &mut [u8],
473 ) -> Poll<io::Result<usize>> {
474 let mut buf = ReadBuf::new(buf);
475 ready!(Self::poll_read_buf(self.get_mut(), cx, &mut buf))?;
476 Poll::Ready(Ok(buf.filled().len()))
477 }
478}
479
480impl tokio::io::AsyncRead for RecvStream {
481 fn poll_read(
482 self: Pin<&mut Self>,
483 cx: &mut Context<'_>,
484 buf: &mut ReadBuf<'_>,
485 ) -> Poll<io::Result<()>> {
486 ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
487 Poll::Ready(Ok(()))
488 }
489}
490
491impl Drop for RecvStream {
492 fn drop(&mut self) {
493 let mut conn = self.conn.state.lock("RecvStream::drop");
494
495 // clean up any previously registered wakers
496 conn.blocked_readers.remove(&self.stream);
497
498 if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
499 return;
500 }
501 if !self.all_data_read {
502 // Ignore ClosedStream errors
503 let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
504 conn.wake();
505 }
506 }
507}
508
509/// Errors that arise from reading from a stream.
510#[derive(Debug, Error, Clone, PartialEq, Eq)]
511pub enum ReadError {
512 /// The peer abandoned transmitting data on this stream
513 ///
514 /// Carries an application-defined error code.
515 #[error("stream reset by peer: error {0}")]
516 Reset(VarInt),
517 /// The connection was lost
518 #[error("connection lost")]
519 ConnectionLost(#[from] ConnectionError),
520 /// The stream has already been stopped, finished, or reset
521 #[error("closed stream")]
522 ClosedStream,
523 /// Attempted an ordered read following an unordered read
524 ///
525 /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
526 /// stream which cannot be recovered, making further ordered reads impossible.
527 #[error("ordered read after unordered read")]
528 IllegalOrderedRead,
529 /// This was a 0-RTT stream and the server rejected it
530 ///
531 /// Can only occur on clients for 0-RTT streams, which can be opened using
532 /// [`Connecting::into_0rtt()`].
533 ///
534 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
535 #[error("0-RTT rejected")]
536 ZeroRttRejected,
537}
538
539impl From<ReadableError> for ReadError {
540 fn from(e: ReadableError) -> Self {
541 match e {
542 ReadableError::ClosedStream => Self::ClosedStream,
543 ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
544 }
545 }
546}
547
548impl From<ResetError> for ReadError {
549 fn from(e: ResetError) -> Self {
550 match e {
551 ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
552 ResetError::ZeroRttRejected => Self::ZeroRttRejected,
553 }
554 }
555}
556
557impl From<ReadError> for io::Error {
558 fn from(x: ReadError) -> Self {
559 use ReadError::*;
560 let kind = match x {
561 Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
562 ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
563 IllegalOrderedRead => io::ErrorKind::InvalidInput,
564 };
565 Self::new(kind, x)
566 }
567}
568
569/// Errors that arise while waiting for a stream to be reset
570#[derive(Debug, Error, Clone, PartialEq, Eq)]
571pub enum ResetError {
572 /// The connection was lost
573 #[error("connection lost")]
574 ConnectionLost(#[from] ConnectionError),
575 /// This was a 0-RTT stream and the server rejected it
576 ///
577 /// Can only occur on clients for 0-RTT streams, which can be opened using
578 /// [`Connecting::into_0rtt()`].
579 ///
580 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
581 #[error("0-RTT rejected")]
582 ZeroRttRejected,
583}
584
585impl From<ResetError> for io::Error {
586 fn from(x: ResetError) -> Self {
587 use ResetError::*;
588 let kind = match x {
589 ZeroRttRejected => io::ErrorKind::ConnectionReset,
590 ConnectionLost(_) => io::ErrorKind::NotConnected,
591 };
592 Self::new(kind, x)
593 }
594}
595
596/// Future produced by [`RecvStream::read()`].
597///
598/// [`RecvStream::read()`]: crate::RecvStream::read
599struct Read<'a> {
600 stream: &'a mut RecvStream,
601 buf: ReadBuf<'a>,
602}
603
604impl Future for Read<'_> {
605 type Output = Result<Option<usize>, ReadError>;
606
607 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
608 let this = self.get_mut();
609 ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
610 match this.buf.filled().len() {
611 0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
612 n => Poll::Ready(Ok(Some(n))),
613 }
614 }
615}
616
617/// Future produced by [`RecvStream::read_exact()`].
618///
619/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
620struct ReadExact<'a> {
621 stream: &'a mut RecvStream,
622 buf: ReadBuf<'a>,
623}
624
625impl Future for ReadExact<'_> {
626 type Output = Result<(), ReadExactError>;
627 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
628 let this = self.get_mut();
629 let mut remaining = this.buf.remaining();
630 while remaining > 0 {
631 ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
632 let new = this.buf.remaining();
633 if new == remaining {
634 return Poll::Ready(Err(ReadExactError::FinishedEarly(this.buf.filled().len())));
635 }
636 remaining = new;
637 }
638 Poll::Ready(Ok(()))
639 }
640}
641
642/// Errors that arise from reading from a stream.
643#[derive(Debug, Error, Clone, PartialEq, Eq)]
644pub enum ReadExactError {
645 /// The stream finished before all bytes were read
646 #[error("stream finished early ({0} bytes read)")]
647 FinishedEarly(usize),
648 /// A read error occurred
649 #[error(transparent)]
650 ReadError(#[from] ReadError),
651}
652
653/// Future produced by [`RecvStream::read_chunk()`].
654///
655/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
656struct ReadChunk<'a> {
657 stream: &'a mut RecvStream,
658 max_length: usize,
659 ordered: bool,
660}
661
662impl Future for ReadChunk<'_> {
663 type Output = Result<Option<Chunk>, ReadError>;
664 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
665 let (max_length, ordered) = (self.max_length, self.ordered);
666 self.stream.poll_read_chunk(cx, max_length, ordered)
667 }
668}
669
670/// Future produced by [`RecvStream::read_chunks()`].
671///
672/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
673struct ReadChunks<'a> {
674 stream: &'a mut RecvStream,
675 bufs: &'a mut [Bytes],
676}
677
678impl Future for ReadChunks<'_> {
679 type Output = Result<Option<usize>, ReadError>;
680 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
681 let this = self.get_mut();
682 this.stream.poll_read_chunks(cx, this.bufs)
683 }
684}