quinn/recv_stream.rs
1use std::{
2 future::{Future, poll_fn},
3 io,
4 pin::Pin,
5 task::{Context, Poll, ready},
6};
7
8use bytes::Bytes;
9use proto::{Chunk, Chunks, ClosedStream, ConnectionError, ReadableError, StreamId};
10use thiserror::Error;
11use tokio::io::ReadBuf;
12
13use crate::{VarInt, connection::ConnectionRef};
14
15/// A stream that can only be used to receive data
16///
17/// `stop(0)` is implicitly called on drop unless:
18/// - A variant of [`ReadError`] has been yielded by a read call
19/// - [`stop()`] was called explicitly
20///
21/// # Cancellation
22///
23/// A `read` method is said to be *cancel-safe* when dropping its future before the future becomes
24/// ready cannot lead to loss of stream data. This is true of methods which succeed immediately when
25/// any progress is made, and is not true of methods which might need to perform multiple reads
26/// internally before succeeding. Each `read` method documents whether it is cancel-safe.
27///
28/// # Common issues
29///
30/// ## Data never received on a locally-opened stream
31///
32/// Peers are not notified of streams until they or a later-numbered stream are used to send
33/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
34/// never see it. Application protocols should always arrange for the endpoint which will first
35/// transmit on a stream to be the endpoint responsible for opening it.
36///
37/// ## Data never received on a remotely-opened stream
38///
39/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
40/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
41/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
42/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
43/// will be bidirectional stream 0.
44///
45/// [`ReadError`]: crate::ReadError
46/// [`stop()`]: RecvStream::stop
47/// [`SendStream::finish`]: crate::SendStream::finish
48/// [`WriteError::Stopped`]: crate::WriteError::Stopped
49/// [`id`]: RecvStream::id
50/// [`Connection::accept_bi`]: crate::Connection::accept_bi
51#[derive(Debug)]
52pub struct RecvStream {
53 conn: ConnectionRef,
54 stream: StreamId,
55 is_0rtt: bool,
56 all_data_read: bool,
57 reset: Option<VarInt>,
58}
59
60impl RecvStream {
61 pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
62 Self {
63 conn,
64 stream,
65 is_0rtt,
66 all_data_read: false,
67 reset: None,
68 }
69 }
70
71 /// Read data contiguously from the stream.
72 ///
73 /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
74 ///
75 /// This operation is cancel-safe.
76 pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
77 Read {
78 stream: self,
79 buf: ReadBuf::new(buf),
80 }
81 .await
82 }
83
84 /// Read an exact number of bytes contiguously from the stream.
85 ///
86 /// See [`read()`] for details. This operation is *not* cancel-safe.
87 ///
88 /// [`read()`]: RecvStream::read
89 pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
90 ReadExact {
91 stream: self,
92 buf: ReadBuf::new(buf),
93 }
94 .await
95 }
96
97 /// Attempts to read from the stream into buf.
98 ///
99 /// On success, returns Poll::Ready(Ok(num_bytes_read)) and places data in
100 /// the buf. If no data was read, it implies that EOF has been reached.
101 ///
102 /// If no data is available for reading, the method returns Poll::Pending
103 /// and arranges for the current task (via cx.waker()) to receive a notification
104 /// when the stream becomes readable or is closed.
105 pub fn poll_read(
106 &mut self,
107 cx: &mut Context,
108 buf: &mut [u8],
109 ) -> Poll<Result<usize, ReadError>> {
110 let mut buf = ReadBuf::new(buf);
111 ready!(self.poll_read_buf(cx, &mut buf))?;
112 Poll::Ready(Ok(buf.filled().len()))
113 }
114
115 /// Attempts to read from the stream into buf.
116 ///
117 /// On success, returns Poll::Ready(Ok(())) and places data in
118 /// the buf. If no data was read and the buffer had a non null capacity,
119 /// it implies that EOF has been reached.
120 ///
121 /// If no data is available for reading, the method returns Poll::Pending
122 /// and arranges for the current task (via cx.waker()) to receive a notification
123 /// when the stream becomes readable or is closed.
124 pub fn poll_read_buf(
125 &mut self,
126 cx: &mut Context,
127 buf: &mut ReadBuf<'_>,
128 ) -> Poll<Result<(), ReadError>> {
129 if buf.remaining() == 0 {
130 return Poll::Ready(Ok(()));
131 }
132
133 self.poll_read_generic(cx, true, |chunks| {
134 let mut read = false;
135 loop {
136 if buf.remaining() == 0 {
137 // We know `read` is `true` because `buf.remaining()` was not 0 before
138 return ReadStatus::Readable(());
139 }
140
141 match chunks.next(buf.remaining()) {
142 Ok(Some(chunk)) => {
143 buf.put_slice(&chunk.bytes);
144 read = true;
145 }
146 res => return (if read { Some(()) } else { None }, res.err()).into(),
147 }
148 }
149 })
150 .map(|res| res.map(|_| ()))
151 }
152
153 /// Read the next segment of data
154 ///
155 /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
156 /// offset in the stream. If `ordered` is `true`, the chunk's offset will be immediately after
157 /// the last data yielded by `read()` or `read_chunk()`. If `ordered` is `false`, segments may
158 /// be received in any order, and the `Chunk`'s `offset` field can be used to determine
159 /// ordering in the caller. Unordered reads are less prone to head-of-line blocking within a
160 /// stream, but require the application to manage reassembling the original data.
161 ///
162 /// Slightly more efficient than `read` due to not copying. Chunk boundaries do not correspond
163 /// to peer writes, and hence cannot be used as framing.
164 ///
165 /// This operation is cancel-safe.
166 pub async fn read_chunk(
167 &mut self,
168 max_length: usize,
169 ordered: bool,
170 ) -> Result<Option<Chunk>, ReadError> {
171 ReadChunk {
172 stream: self,
173 max_length,
174 ordered,
175 }
176 .await
177 }
178
179 /// Attempts to read a chunk from the stream.
180 ///
181 /// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
182 /// is returned, it implies that EOF has been reached.
183 ///
184 /// If no data is available for reading, the method returns `Poll::Pending`
185 /// and arranges for the current task (via cx.waker()) to receive a notification
186 /// when the stream becomes readable or is closed.
187 fn poll_read_chunk(
188 &mut self,
189 cx: &mut Context,
190 max_length: usize,
191 ordered: bool,
192 ) -> Poll<Result<Option<Chunk>, ReadError>> {
193 self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
194 Ok(Some(chunk)) => ReadStatus::Readable(chunk),
195 res => (None, res.err()).into(),
196 })
197 }
198
199 /// Read the next segments of data
200 ///
201 /// Fills `bufs` with the segments of data beginning immediately after the
202 /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
203 /// finished.
204 ///
205 /// Slightly more efficient than `read` due to not copying. Chunk boundaries
206 /// do not correspond to peer writes, and hence cannot be used as framing.
207 ///
208 /// This operation is cancel-safe.
209 pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
210 ReadChunks { stream: self, bufs }.await
211 }
212
213 /// Foundation of [`Self::read_chunks`]
214 fn poll_read_chunks(
215 &mut self,
216 cx: &mut Context,
217 bufs: &mut [Bytes],
218 ) -> Poll<Result<Option<usize>, ReadError>> {
219 if bufs.is_empty() {
220 return Poll::Ready(Ok(Some(0)));
221 }
222
223 self.poll_read_generic(cx, true, |chunks| {
224 let mut read = 0;
225 loop {
226 if read >= bufs.len() {
227 // We know `read > 0` because `bufs` cannot be empty here
228 return ReadStatus::Readable(read);
229 }
230
231 match chunks.next(usize::MAX) {
232 Ok(Some(chunk)) => {
233 bufs[read] = chunk.bytes;
234 read += 1;
235 }
236 res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
237 }
238 }
239 })
240 }
241
242 /// Convenience method to read all remaining data into a buffer
243 ///
244 /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
245 /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
246 /// allow. `size_limit` should be set to limit worst-case memory use.
247 ///
248 /// If unordered reads have already been made, the resulting buffer may have gaps containing
249 /// arbitrary data.
250 ///
251 /// This operation is *not* cancel-safe.
252 ///
253 /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
254 pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
255 ReadToEnd {
256 stream: self,
257 size_limit,
258 read: Vec::new(),
259 start: u64::MAX,
260 end: 0,
261 }
262 .await
263 }
264
265 /// Stop accepting data
266 ///
267 /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
268 /// attempts to operate on a stream will yield `ClosedStream` errors.
269 pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
270 let mut conn = self.conn.state.lock("RecvStream::stop");
271 if self.is_0rtt && conn.check_0rtt().is_err() {
272 return Ok(());
273 }
274 conn.inner.recv_stream(self.stream).stop(error_code)?;
275 conn.wake();
276 self.all_data_read = true;
277 Ok(())
278 }
279
280 /// Check if this stream has been opened during 0-RTT.
281 ///
282 /// In which case any non-idempotent request should be considered dangerous at the application
283 /// level. Because read data is subject to replay attacks.
284 pub fn is_0rtt(&self) -> bool {
285 self.is_0rtt
286 }
287
288 /// Get the identity of this stream
289 pub fn id(&self) -> StreamId {
290 self.stream
291 }
292
293 /// Completes when the stream has been reset by the peer or otherwise closed
294 ///
295 /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
296 /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
297 /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
298 /// which it is no longer meaningful for the stream to be reset.
299 ///
300 /// This operation is cancel-safe.
301 pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
302 poll_fn(|cx| {
303 let mut conn = self.conn.state.lock("RecvStream::reset");
304 if self.is_0rtt && conn.check_0rtt().is_err() {
305 return Poll::Ready(Err(ResetError::ZeroRttRejected));
306 }
307
308 if let Some(code) = self.reset {
309 return Poll::Ready(Ok(Some(code)));
310 }
311
312 match conn.inner.recv_stream(self.stream).received_reset() {
313 Err(_) => Poll::Ready(Ok(None)),
314 Ok(Some(error_code)) => {
315 // Stream state has just now been freed, so the connection may need to issue new
316 // stream ID flow control credit
317 conn.wake();
318 Poll::Ready(Ok(Some(error_code)))
319 }
320 Ok(None) => {
321 if let Some(e) = &conn.error {
322 return Poll::Ready(Err(e.clone().into()));
323 }
324 // Resets always notify readers, since a reset is an immediate read error. We
325 // could introduce a dedicated channel to reduce the risk of spurious wakeups,
326 // but that increased complexity is probably not justified, as an application
327 // that is expecting a reset is not likely to receive large amounts of data.
328 conn.blocked_readers.insert(self.stream, cx.waker().clone());
329 Poll::Pending
330 }
331 }
332 })
333 .await
334 }
335
336 /// Handle common logic related to reading out of a receive stream
337 ///
338 /// This takes an `FnMut` closure that takes care of the actual reading process, matching
339 /// the detailed read semantics for the calling function with a particular return type.
340 /// The closure can read from the passed `&mut Chunks` and has to return the status after
341 /// reading: the amount of data read, and the status after the final read call.
342 fn poll_read_generic<T, U>(
343 &mut self,
344 cx: &mut Context,
345 ordered: bool,
346 mut read_fn: T,
347 ) -> Poll<Result<Option<U>, ReadError>>
348 where
349 T: FnMut(&mut Chunks) -> ReadStatus<U>,
350 {
351 use proto::ReadError::*;
352 if self.all_data_read {
353 return Poll::Ready(Ok(None));
354 }
355
356 let mut conn = self.conn.state.lock("RecvStream::poll_read");
357 if self.is_0rtt {
358 conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
359 }
360
361 // If we stored an error during a previous call, return it now. This can happen if a
362 // `read_fn` both wants to return data and also returns an error in its final stream status.
363 let status = match self.reset {
364 Some(code) => ReadStatus::Failed(None, Reset(code)),
365 None => {
366 let mut recv = conn.inner.recv_stream(self.stream);
367 let mut chunks = recv.read(ordered)?;
368 let status = read_fn(&mut chunks);
369 if chunks.finalize().should_transmit() {
370 conn.wake();
371 }
372 status
373 }
374 };
375
376 match status {
377 ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
378 ReadStatus::Finished(read) => {
379 self.all_data_read = true;
380 Poll::Ready(Ok(read))
381 }
382 ReadStatus::Failed(read, Blocked) => match read {
383 Some(val) => Poll::Ready(Ok(Some(val))),
384 None => {
385 if let Some(ref x) = conn.error {
386 return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
387 }
388 conn.blocked_readers.insert(self.stream, cx.waker().clone());
389 Poll::Pending
390 }
391 },
392 ReadStatus::Failed(read, Reset(error_code)) => match read {
393 None => {
394 self.all_data_read = true;
395 self.reset = Some(error_code);
396 Poll::Ready(Err(ReadError::Reset(error_code)))
397 }
398 done => {
399 self.reset = Some(error_code);
400 Poll::Ready(Ok(done))
401 }
402 },
403 }
404 }
405}
406
407enum ReadStatus<T> {
408 Readable(T),
409 Finished(Option<T>),
410 Failed(Option<T>, proto::ReadError),
411}
412
413impl<T> From<(Option<T>, Option<proto::ReadError>)> for ReadStatus<T> {
414 fn from(status: (Option<T>, Option<proto::ReadError>)) -> Self {
415 match status {
416 (read, None) => Self::Finished(read),
417 (read, Some(e)) => Self::Failed(read, e),
418 }
419 }
420}
421
422/// Future produced by [`RecvStream::read_to_end()`].
423///
424/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
425struct ReadToEnd<'a> {
426 stream: &'a mut RecvStream,
427 read: Vec<(Bytes, u64)>,
428 start: u64,
429 end: u64,
430 size_limit: usize,
431}
432
433impl Future for ReadToEnd<'_> {
434 type Output = Result<Vec<u8>, ReadToEndError>;
435 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
436 loop {
437 match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
438 Some(chunk) => {
439 self.start = self.start.min(chunk.offset);
440 let end = chunk.bytes.len() as u64 + chunk.offset;
441 if (end - self.start) > self.size_limit as u64 {
442 return Poll::Ready(Err(ReadToEndError::TooLong));
443 }
444 self.end = self.end.max(end);
445 self.read.push((chunk.bytes, chunk.offset));
446 }
447 None => {
448 if self.end == 0 {
449 // Never received anything
450 return Poll::Ready(Ok(Vec::new()));
451 }
452 let start = self.start;
453 let mut buffer = vec![0; (self.end - start) as usize];
454 for (data, offset) in self.read.drain(..) {
455 let offset = (offset - start) as usize;
456 buffer[offset..offset + data.len()].copy_from_slice(&data);
457 }
458 return Poll::Ready(Ok(buffer));
459 }
460 }
461 }
462 }
463}
464
465/// Errors from [`RecvStream::read_to_end`]
466#[derive(Debug, Error, Clone, PartialEq, Eq)]
467pub enum ReadToEndError {
468 /// An error occurred during reading
469 #[error("read error: {0}")]
470 Read(#[from] ReadError),
471 /// The stream is larger than the user-supplied limit
472 #[error("stream too long")]
473 TooLong,
474}
475
476#[cfg(feature = "futures-io")]
477impl futures_io::AsyncRead for RecvStream {
478 fn poll_read(
479 self: Pin<&mut Self>,
480 cx: &mut Context,
481 buf: &mut [u8],
482 ) -> Poll<io::Result<usize>> {
483 let mut buf = ReadBuf::new(buf);
484 ready!(Self::poll_read_buf(self.get_mut(), cx, &mut buf))?;
485 Poll::Ready(Ok(buf.filled().len()))
486 }
487}
488
489impl tokio::io::AsyncRead for RecvStream {
490 fn poll_read(
491 self: Pin<&mut Self>,
492 cx: &mut Context<'_>,
493 buf: &mut ReadBuf<'_>,
494 ) -> Poll<io::Result<()>> {
495 ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
496 Poll::Ready(Ok(()))
497 }
498}
499
500impl Drop for RecvStream {
501 fn drop(&mut self) {
502 let mut conn = self.conn.state.lock("RecvStream::drop");
503
504 // clean up any previously registered wakers
505 conn.blocked_readers.remove(&self.stream);
506
507 if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
508 return;
509 }
510 if !self.all_data_read {
511 // Ignore ClosedStream errors
512 let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
513 conn.wake();
514 }
515 }
516}
517
518/// Errors that arise from reading from a stream.
519#[derive(Debug, Error, Clone, PartialEq, Eq)]
520pub enum ReadError {
521 /// The peer abandoned transmitting data on this stream
522 ///
523 /// Carries an application-defined error code.
524 #[error("stream reset by peer: error {0}")]
525 Reset(VarInt),
526 /// The connection was lost
527 #[error("connection lost")]
528 ConnectionLost(#[from] ConnectionError),
529 /// The stream has already been stopped, finished, or reset
530 #[error("closed stream")]
531 ClosedStream,
532 /// Attempted an ordered read following an unordered read
533 ///
534 /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
535 /// stream which cannot be recovered, making further ordered reads impossible.
536 #[error("ordered read after unordered read")]
537 IllegalOrderedRead,
538 /// This was a 0-RTT stream and the server rejected it
539 ///
540 /// Can only occur on clients for 0-RTT streams, which can be opened using
541 /// [`Connecting::into_0rtt()`].
542 ///
543 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
544 #[error("0-RTT rejected")]
545 ZeroRttRejected,
546}
547
548impl From<ReadableError> for ReadError {
549 fn from(e: ReadableError) -> Self {
550 match e {
551 ReadableError::ClosedStream => Self::ClosedStream,
552 ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
553 }
554 }
555}
556
557impl From<ResetError> for ReadError {
558 fn from(e: ResetError) -> Self {
559 match e {
560 ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
561 ResetError::ZeroRttRejected => Self::ZeroRttRejected,
562 }
563 }
564}
565
566impl From<ReadError> for io::Error {
567 fn from(x: ReadError) -> Self {
568 use ReadError::*;
569 let kind = match x {
570 Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
571 ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
572 IllegalOrderedRead => io::ErrorKind::InvalidInput,
573 };
574 Self::new(kind, x)
575 }
576}
577
578/// Errors that arise while waiting for a stream to be reset
579#[derive(Debug, Error, Clone, PartialEq, Eq)]
580pub enum ResetError {
581 /// The connection was lost
582 #[error("connection lost")]
583 ConnectionLost(#[from] ConnectionError),
584 /// This was a 0-RTT stream and the server rejected it
585 ///
586 /// Can only occur on clients for 0-RTT streams, which can be opened using
587 /// [`Connecting::into_0rtt()`].
588 ///
589 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
590 #[error("0-RTT rejected")]
591 ZeroRttRejected,
592}
593
594impl From<ResetError> for io::Error {
595 fn from(x: ResetError) -> Self {
596 use ResetError::*;
597 let kind = match x {
598 ZeroRttRejected => io::ErrorKind::ConnectionReset,
599 ConnectionLost(_) => io::ErrorKind::NotConnected,
600 };
601 Self::new(kind, x)
602 }
603}
604
605/// Future produced by [`RecvStream::read()`].
606///
607/// [`RecvStream::read()`]: crate::RecvStream::read
608struct Read<'a> {
609 stream: &'a mut RecvStream,
610 buf: ReadBuf<'a>,
611}
612
613impl Future for Read<'_> {
614 type Output = Result<Option<usize>, ReadError>;
615
616 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
617 let this = self.get_mut();
618 ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
619 match this.buf.filled().len() {
620 0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
621 n => Poll::Ready(Ok(Some(n))),
622 }
623 }
624}
625
626/// Future produced by [`RecvStream::read_exact()`].
627///
628/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
629struct ReadExact<'a> {
630 stream: &'a mut RecvStream,
631 buf: ReadBuf<'a>,
632}
633
634impl Future for ReadExact<'_> {
635 type Output = Result<(), ReadExactError>;
636 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
637 let this = self.get_mut();
638 let mut remaining = this.buf.remaining();
639 while remaining > 0 {
640 ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
641 let new = this.buf.remaining();
642 if new == remaining {
643 return Poll::Ready(Err(ReadExactError::FinishedEarly(this.buf.filled().len())));
644 }
645 remaining = new;
646 }
647 Poll::Ready(Ok(()))
648 }
649}
650
651/// Errors that arise from reading from a stream.
652#[derive(Debug, Error, Clone, PartialEq, Eq)]
653pub enum ReadExactError {
654 /// The stream finished before all bytes were read
655 #[error("stream finished early ({0} bytes read)")]
656 FinishedEarly(usize),
657 /// A read error occurred
658 #[error(transparent)]
659 ReadError(#[from] ReadError),
660}
661
662/// Future produced by [`RecvStream::read_chunk()`].
663///
664/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
665struct ReadChunk<'a> {
666 stream: &'a mut RecvStream,
667 max_length: usize,
668 ordered: bool,
669}
670
671impl Future for ReadChunk<'_> {
672 type Output = Result<Option<Chunk>, ReadError>;
673 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
674 let (max_length, ordered) = (self.max_length, self.ordered);
675 self.stream.poll_read_chunk(cx, max_length, ordered)
676 }
677}
678
679/// Future produced by [`RecvStream::read_chunks()`].
680///
681/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
682struct ReadChunks<'a> {
683 stream: &'a mut RecvStream,
684 bufs: &'a mut [Bytes],
685}
686
687impl Future for ReadChunks<'_> {
688 type Output = Result<Option<usize>, ReadError>;
689 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
690 let this = self.get_mut();
691 this.stream.poll_read_chunks(cx, this.bufs)
692 }
693}