kvarn_quinn/recv_stream.rs
1use std::{
2 future::Future,
3 io,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use proto::{Chunk, Chunks, ConnectionError, ReadableError, StreamId};
10use thiserror::Error;
11use tokio::io::ReadBuf;
12
13use crate::{
14 connection::{ConnectionRef, UnknownStream},
15 VarInt,
16};
17
18/// A stream that can only be used to receive data
19///
20/// `stop(0)` is implicitly called on drop unless:
21/// - A variant of [`ReadError`] has been yielded by a read call
22/// - [`stop()`] was called explicitly
23///
24/// # Common issues
25///
26/// ## Data never received on a locally-opened stream
27///
28/// Peers are not notified of streams until they or a later-numbered stream are used to send
29/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
30/// never see it. Application protocols should always arrange for the endpoint which will first
31/// transmit on a stream to be the endpoint responsible for opening it.
32///
33/// ## Data never received on a remotely-opened stream
34///
35/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
36/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
37/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
38/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
39/// will be bidirectional stream 0.
40///
41/// ## Unexpected [`WriteError::Stopped`] in sender
42///
43/// When a stream is expected to be closed gracefully the sender should call
44/// [`SendStream::finish`]. However there is no guarantee the connected [`RecvStream`] will
45/// receive the "finished" notification in the same QUIC frame as the last frame which
46/// carried data.
47///
48/// Even if the application layer logic already knows it read all the data because it does
49/// its own framing, it should still read until it reaches the end of the [`RecvStream`].
50/// Otherwise it risks inadvertently calling [`RecvStream::stop`] if it drops the stream.
51/// And calling [`RecvStream::stop`] could result in the connected [`SendStream::finish`]
52/// call failing with a [`WriteError::Stopped`] error.
53///
54/// For example if exactly 10 bytes are to be read, you still need to explicitly read the
55/// end of the stream:
56///
57/// ```no_run
58/// # use quinn::{SendStream, RecvStream};
59/// # async fn func(
60/// # mut send_stream: SendStream,
61/// # mut recv_stream: RecvStream,
62/// # ) -> anyhow::Result<()>
63/// # {
64/// // In the sending task
65/// send_stream.write(&b"0123456789"[..]).await?;
66/// send_stream.finish().await?;
67///
68/// // In the receiving task
69/// let mut buf = [0u8; 10];
70/// let data = recv_stream.read_exact(&mut buf).await?;
71/// if recv_stream.read_to_end(0).await.is_err() {
72/// // Discard unexpected data and notify the peer to stop sending it
73/// let _ = recv_stream.stop(0u8.into());
74/// }
75/// # Ok(())
76/// # }
77/// ```
78///
79/// An alternative approach, used in HTTP/3, is to specify a particular error code used with `stop`
80/// that indicates graceful receiver-initiated stream shutdown, rather than a true error condition.
81///
82/// [`RecvStream::read_chunk`] could be used instead which does not take ownership and
83/// allows using an explicit call to [`RecvStream::stop`] with a custom error code.
84///
85/// [`ReadError`]: crate::ReadError
86/// [`stop()`]: RecvStream::stop
87/// [`SendStream::finish`]: crate::SendStream::finish
88/// [`WriteError::Stopped`]: crate::WriteError::Stopped
89/// [`id`]: RecvStream::id
90/// [`Connection::accept_bi`]: crate::Connection::accept_bi
91#[derive(Debug)]
92pub struct RecvStream {
93 conn: ConnectionRef,
94 stream: StreamId,
95 is_0rtt: bool,
96 all_data_read: bool,
97 reset: Option<VarInt>,
98}
99
100impl RecvStream {
101 pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
102 Self {
103 conn,
104 stream,
105 is_0rtt,
106 all_data_read: false,
107 reset: None,
108 }
109 }
110
111 /// Read data contiguously from the stream.
112 ///
113 /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
114 pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
115 Read {
116 stream: self,
117 buf: ReadBuf::new(buf),
118 }
119 .await
120 }
121
122 /// Read an exact number of bytes contiguously from the stream.
123 ///
124 /// See [`read()`] for details.
125 ///
126 /// [`read()`]: RecvStream::read
127 pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
128 ReadExact {
129 stream: self,
130 buf: ReadBuf::new(buf),
131 }
132 .await
133 }
134
135 fn poll_read(
136 &mut self,
137 cx: &mut Context,
138 buf: &mut ReadBuf<'_>,
139 ) -> Poll<Result<(), ReadError>> {
140 if buf.remaining() == 0 {
141 return Poll::Ready(Ok(()));
142 }
143
144 self.poll_read_generic(cx, true, |chunks| {
145 let mut read = false;
146 loop {
147 if buf.remaining() == 0 {
148 // We know `read` is `true` because `buf.remaining()` was not 0 before
149 return ReadStatus::Readable(());
150 }
151
152 match chunks.next(buf.remaining()) {
153 Ok(Some(chunk)) => {
154 buf.put_slice(&chunk.bytes);
155 read = true;
156 }
157 res => return (if read { Some(()) } else { None }, res.err()).into(),
158 }
159 }
160 })
161 .map(|res| res.map(|_| ()))
162 }
163
164 /// Read the next segment of data
165 ///
166 /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
167 /// offset in the stream. If `ordered` is `true`, the chunk's offset will be immediately after
168 /// the last data yielded by `read()` or `read_chunk()`. If `ordered` is `false`, segments may
169 /// be received in any order, and the `Chunk`'s `offset` field can be used to determine
170 /// ordering in the caller. Unordered reads are less prone to head-of-line blocking within a
171 /// stream, but require the application to manage reassembling the original data.
172 ///
173 /// Slightly more efficient than `read` due to not copying. Chunk boundaries do not correspond
174 /// to peer writes, and hence cannot be used as framing.
175 pub async fn read_chunk(
176 &mut self,
177 max_length: usize,
178 ordered: bool,
179 ) -> Result<Option<Chunk>, ReadError> {
180 ReadChunk {
181 stream: self,
182 max_length,
183 ordered,
184 }
185 .await
186 }
187
188 /// Foundation of [`Self::read_chunk`]
189 fn poll_read_chunk(
190 &mut self,
191 cx: &mut Context,
192 max_length: usize,
193 ordered: bool,
194 ) -> Poll<Result<Option<Chunk>, ReadError>> {
195 self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
196 Ok(Some(chunk)) => ReadStatus::Readable(chunk),
197 res => (None, res.err()).into(),
198 })
199 }
200
201 /// Read the next segments of data
202 ///
203 /// Fills `bufs` with the segments of data beginning immediately after the
204 /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
205 /// finished.
206 ///
207 /// Slightly more efficient than `read` due to not copying. Chunk boundaries
208 /// do not correspond to peer writes, and hence cannot be used as framing.
209 pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
210 ReadChunks { stream: self, bufs }.await
211 }
212
213 /// Foundation of [`Self::read_chunks`]
214 fn poll_read_chunks(
215 &mut self,
216 cx: &mut Context,
217 bufs: &mut [Bytes],
218 ) -> Poll<Result<Option<usize>, ReadError>> {
219 if bufs.is_empty() {
220 return Poll::Ready(Ok(Some(0)));
221 }
222
223 self.poll_read_generic(cx, true, |chunks| {
224 let mut read = 0;
225 loop {
226 if read >= bufs.len() {
227 // We know `read > 0` because `bufs` cannot be empty here
228 return ReadStatus::Readable(read);
229 }
230
231 match chunks.next(usize::MAX) {
232 Ok(Some(chunk)) => {
233 bufs[read] = chunk.bytes;
234 read += 1;
235 }
236 res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
237 }
238 }
239 })
240 }
241
242 /// Convenience method to read all remaining data into a buffer
243 ///
244 /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
245 /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
246 /// allow. `size_limit` should be set to limit worst-case memory use.
247 ///
248 /// If unordered reads have already been made, the resulting buffer may have gaps containing
249 /// arbitrary data.
250 ///
251 /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
252 pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
253 ReadToEnd {
254 stream: self,
255 size_limit,
256 read: Vec::new(),
257 start: u64::max_value(),
258 end: 0,
259 }
260 .await
261 }
262
263 /// Stop accepting data
264 ///
265 /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
266 /// attempts to operate on a stream will yield `UnknownStream` errors.
267 pub fn stop(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
268 let mut conn = self.conn.state.lock("RecvStream::stop");
269 if self.is_0rtt && conn.check_0rtt().is_err() {
270 return Ok(());
271 }
272 conn.inner.recv_stream(self.stream).stop(error_code)?;
273 conn.wake();
274 self.all_data_read = true;
275 Ok(())
276 }
277
278 /// Check if this stream has been opened during 0-RTT.
279 ///
280 /// In which case any non-idempotent request should be considered dangerous at the application
281 /// level. Because read data is subject to replay attacks.
282 pub fn is_0rtt(&self) -> bool {
283 self.is_0rtt
284 }
285
286 /// Get the identity of this stream
287 pub fn id(&self) -> StreamId {
288 self.stream
289 }
290
291 /// Handle common logic related to reading out of a receive stream
292 ///
293 /// This takes an `FnMut` closure that takes care of the actual reading process, matching
294 /// the detailed read semantics for the calling function with a particular return type.
295 /// The closure can read from the passed `&mut Chunks` and has to return the status after
296 /// reading: the amount of data read, and the status after the final read call.
297 fn poll_read_generic<T, U>(
298 &mut self,
299 cx: &mut Context,
300 ordered: bool,
301 mut read_fn: T,
302 ) -> Poll<Result<Option<U>, ReadError>>
303 where
304 T: FnMut(&mut Chunks) -> ReadStatus<U>,
305 {
306 use proto::ReadError::*;
307 if self.all_data_read {
308 return Poll::Ready(Ok(None));
309 }
310
311 let mut conn = self.conn.state.lock("RecvStream::poll_read");
312 if self.is_0rtt {
313 conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
314 }
315
316 // If we stored an error during a previous call, return it now. This can happen if a
317 // `read_fn` both wants to return data and also returns an error in its final stream status.
318 let status = match self.reset.take() {
319 Some(code) => ReadStatus::Failed(None, Reset(code)),
320 None => {
321 let mut recv = conn.inner.recv_stream(self.stream);
322 let mut chunks = recv.read(ordered)?;
323 let status = read_fn(&mut chunks);
324 if chunks.finalize().should_transmit() {
325 conn.wake();
326 }
327 status
328 }
329 };
330
331 match status {
332 ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
333 ReadStatus::Finished(read) => {
334 self.all_data_read = true;
335 Poll::Ready(Ok(read))
336 }
337 ReadStatus::Failed(read, Blocked) => match read {
338 Some(val) => Poll::Ready(Ok(Some(val))),
339 None => {
340 if let Some(ref x) = conn.error {
341 return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
342 }
343 conn.blocked_readers.insert(self.stream, cx.waker().clone());
344 Poll::Pending
345 }
346 },
347 ReadStatus::Failed(read, Reset(error_code)) => match read {
348 None => {
349 self.all_data_read = true;
350 Poll::Ready(Err(ReadError::Reset(error_code)))
351 }
352 done => {
353 self.reset = Some(error_code);
354 Poll::Ready(Ok(done))
355 }
356 },
357 }
358 }
359}
360
361enum ReadStatus<T> {
362 Readable(T),
363 Finished(Option<T>),
364 Failed(Option<T>, proto::ReadError),
365}
366
367impl<T> From<(Option<T>, Option<proto::ReadError>)> for ReadStatus<T> {
368 fn from(status: (Option<T>, Option<proto::ReadError>)) -> Self {
369 match status {
370 (read, None) => Self::Finished(read),
371 (read, Some(e)) => Self::Failed(read, e),
372 }
373 }
374}
375
376/// Future produced by [`RecvStream::read_to_end()`].
377///
378/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
379#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
380struct ReadToEnd<'a> {
381 stream: &'a mut RecvStream,
382 read: Vec<(Bytes, u64)>,
383 start: u64,
384 end: u64,
385 size_limit: usize,
386}
387
388impl Future for ReadToEnd<'_> {
389 type Output = Result<Vec<u8>, ReadToEndError>;
390 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
391 loop {
392 match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
393 Some(chunk) => {
394 self.start = self.start.min(chunk.offset);
395 let end = chunk.bytes.len() as u64 + chunk.offset;
396 if (end - self.start) > self.size_limit as u64 {
397 return Poll::Ready(Err(ReadToEndError::TooLong));
398 }
399 self.end = self.end.max(end);
400 self.read.push((chunk.bytes, chunk.offset));
401 }
402 None => {
403 if self.end == 0 {
404 // Never received anything
405 return Poll::Ready(Ok(Vec::new()));
406 }
407 let start = self.start;
408 let mut buffer = vec![0; (self.end - start) as usize];
409 for (data, offset) in self.read.drain(..) {
410 let offset = (offset - start) as usize;
411 buffer[offset..offset + data.len()].copy_from_slice(&data);
412 }
413 return Poll::Ready(Ok(buffer));
414 }
415 }
416 }
417 }
418}
419
420/// Errors from [`RecvStream::read_to_end`]
421#[derive(Debug, Error, Clone, PartialEq, Eq)]
422pub enum ReadToEndError {
423 /// An error occurred during reading
424 #[error("read error: {0}")]
425 Read(#[from] ReadError),
426 /// The stream is larger than the user-supplied limit
427 #[error("stream too long")]
428 TooLong,
429}
430
431#[cfg(feature = "futures-io")]
432impl futures_io::AsyncRead for RecvStream {
433 fn poll_read(
434 self: Pin<&mut Self>,
435 cx: &mut Context,
436 buf: &mut [u8],
437 ) -> Poll<io::Result<usize>> {
438 let mut buf = ReadBuf::new(buf);
439 ready!(RecvStream::poll_read(self.get_mut(), cx, &mut buf))?;
440 Poll::Ready(Ok(buf.filled().len()))
441 }
442}
443
444impl tokio::io::AsyncRead for RecvStream {
445 fn poll_read(
446 self: Pin<&mut Self>,
447 cx: &mut Context<'_>,
448 buf: &mut ReadBuf<'_>,
449 ) -> Poll<io::Result<()>> {
450 ready!(Self::poll_read(self.get_mut(), cx, buf))?;
451 Poll::Ready(Ok(()))
452 }
453}
454
455impl Drop for RecvStream {
456 fn drop(&mut self) {
457 let mut conn = self.conn.state.lock("RecvStream::drop");
458
459 // clean up any previously registered wakers
460 conn.blocked_readers.remove(&self.stream);
461
462 if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
463 return;
464 }
465 if !self.all_data_read {
466 // Ignore UnknownStream errors
467 let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
468 conn.wake();
469 }
470 }
471}
472
473/// Errors that arise from reading from a stream.
474#[derive(Debug, Error, Clone, PartialEq, Eq)]
475pub enum ReadError {
476 /// The peer abandoned transmitting data on this stream
477 ///
478 /// Carries an application-defined error code.
479 #[error("stream reset by peer: error {0}")]
480 Reset(VarInt),
481 /// The connection was lost
482 #[error("connection lost")]
483 ConnectionLost(#[from] ConnectionError),
484 /// The stream has already been stopped, finished, or reset
485 #[error("unknown stream")]
486 UnknownStream,
487 /// Attempted an ordered read following an unordered read
488 ///
489 /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
490 /// stream which cannot be recovered, making further ordered reads impossible.
491 #[error("ordered read after unordered read")]
492 IllegalOrderedRead,
493 /// This was a 0-RTT stream and the server rejected it
494 ///
495 /// Can only occur on clients for 0-RTT streams, which can be opened using
496 /// [`Connecting::into_0rtt()`].
497 ///
498 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
499 #[error("0-RTT rejected")]
500 ZeroRttRejected,
501}
502
503impl From<ReadableError> for ReadError {
504 fn from(e: ReadableError) -> Self {
505 match e {
506 ReadableError::UnknownStream => Self::UnknownStream,
507 ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
508 }
509 }
510}
511
512impl From<ReadError> for io::Error {
513 fn from(x: ReadError) -> Self {
514 use self::ReadError::*;
515 let kind = match x {
516 Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
517 ConnectionLost(_) | UnknownStream => io::ErrorKind::NotConnected,
518 IllegalOrderedRead => io::ErrorKind::InvalidInput,
519 };
520 Self::new(kind, x)
521 }
522}
523
524/// Future produced by [`RecvStream::read()`].
525///
526/// [`RecvStream::read()`]: crate::RecvStream::read
527#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
528struct Read<'a> {
529 stream: &'a mut RecvStream,
530 buf: ReadBuf<'a>,
531}
532
533impl<'a> Future for Read<'a> {
534 type Output = Result<Option<usize>, ReadError>;
535
536 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
537 let this = self.get_mut();
538 ready!(this.stream.poll_read(cx, &mut this.buf))?;
539 match this.buf.filled().len() {
540 0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
541 n => Poll::Ready(Ok(Some(n))),
542 }
543 }
544}
545
546/// Future produced by [`RecvStream::read_exact()`].
547///
548/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
549#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
550struct ReadExact<'a> {
551 stream: &'a mut RecvStream,
552 buf: ReadBuf<'a>,
553}
554
555impl<'a> Future for ReadExact<'a> {
556 type Output = Result<(), ReadExactError>;
557 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
558 let this = self.get_mut();
559 let total = this.buf.remaining();
560 let mut remaining = total;
561 while remaining > 0 {
562 ready!(this.stream.poll_read(cx, &mut this.buf))?;
563 let new = this.buf.remaining();
564 if new == remaining {
565 let read = total - remaining;
566 return Poll::Ready(Err(ReadExactError::FinishedEarly(read)));
567 }
568 remaining = new;
569 }
570 Poll::Ready(Ok(()))
571 }
572}
573
574/// Errors that arise from reading from a stream.
575#[derive(Debug, Error, Clone, PartialEq, Eq)]
576pub enum ReadExactError {
577 /// The stream finished before all bytes were read
578 #[error("stream finished early ({0} bytes read)")]
579 FinishedEarly(usize),
580 /// A read error occurred
581 #[error(transparent)]
582 ReadError(#[from] ReadError),
583}
584
585/// Future produced by [`RecvStream::read_chunk()`].
586///
587/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
588#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
589struct ReadChunk<'a> {
590 stream: &'a mut RecvStream,
591 max_length: usize,
592 ordered: bool,
593}
594
595impl<'a> Future for ReadChunk<'a> {
596 type Output = Result<Option<Chunk>, ReadError>;
597 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
598 let (max_length, ordered) = (self.max_length, self.ordered);
599 self.stream.poll_read_chunk(cx, max_length, ordered)
600 }
601}
602
603/// Future produced by [`RecvStream::read_chunks()`].
604///
605/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
606#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
607struct ReadChunks<'a> {
608 stream: &'a mut RecvStream,
609 bufs: &'a mut [Bytes],
610}
611
612impl<'a> Future for ReadChunks<'a> {
613 type Output = Result<Option<usize>, ReadError>;
614 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
615 let this = self.get_mut();
616 this.stream.poll_read_chunks(cx, this.bufs)
617 }
618}