compio_quic/recv_stream.rs
1use std::{
2 collections::BTreeMap,
3 io,
4 sync::Arc,
5 task::{Context, Poll},
6};
7
8use compio_buf::{
9 BufResult, IoBufMut,
10 bytes::{BufMut, Bytes},
11};
12use compio_io::AsyncRead;
13use futures_util::{future::poll_fn, ready};
14use quinn_proto::{Chunk, Chunks, ClosedStream, ReadableError, StreamId, VarInt};
15use thiserror::Error;
16
17use crate::{ConnectionError, ConnectionInner, StoppedError};
18
19/// A stream that can only be used to receive data
20///
21/// `stop(0)` is implicitly called on drop unless:
22/// - A variant of [`ReadError`] has been yielded by a read call
23/// - [`stop()`] was called explicitly
24///
25/// # Cancellation
26///
27/// A `read` method is said to be *cancel-safe* when dropping its future before
28/// the future becomes ready cannot lead to loss of stream data. This is true of
29/// methods which succeed immediately when any progress is made, and is not true
30/// of methods which might need to perform multiple reads internally before
31/// succeeding. Each `read` method documents whether it is cancel-safe.
32///
33/// # Common issues
34///
35/// ## Data never received on a locally-opened stream
36///
37/// Peers are not notified of streams until they or a later-numbered stream are
38/// used to send data. If a bidirectional stream is locally opened but never
39/// used to send, then the peer may never see it. Application protocols should
40/// always arrange for the endpoint which will first transmit on a stream to be
41/// the endpoint responsible for opening it.
42///
43/// ## Data never received on a remotely-opened stream
44///
45/// Verify that the stream you are receiving is the same one that the server is
46/// sending on, e.g. by logging the [`id`] of each. Streams are always accepted
47/// in the same order as they are created, i.e. ascending order by [`StreamId`].
48/// For example, even if a sender first transmits on bidirectional stream 1, the
49/// first stream yielded by [`Connection::accept_bi`] on the receiver
50/// will be bidirectional stream 0.
51///
52/// [`stop()`]: RecvStream::stop
53/// [`id`]: RecvStream::id
54/// [`Connection::accept_bi`]: crate::Connection::accept_bi
55#[derive(Debug)]
56pub struct RecvStream {
57 conn: Arc<ConnectionInner>,
58 stream: StreamId,
59 is_0rtt: bool,
60 all_data_read: bool,
61 reset: Option<VarInt>,
62}
63
64impl RecvStream {
65 pub(crate) fn new(conn: Arc<ConnectionInner>, stream: StreamId, is_0rtt: bool) -> Self {
66 Self {
67 conn,
68 stream,
69 is_0rtt,
70 all_data_read: false,
71 reset: None,
72 }
73 }
74
75 /// Get the identity of this stream
76 pub fn id(&self) -> StreamId {
77 self.stream
78 }
79
80 /// Check if this stream has been opened during 0-RTT.
81 ///
82 /// In which case any non-idempotent request should be considered dangerous
83 /// at the application level. Because read data is subject to replay
84 /// attacks.
85 pub fn is_0rtt(&self) -> bool {
86 self.is_0rtt
87 }
88
89 /// Stop accepting data
90 ///
91 /// Discards unread data and notifies the peer to stop transmitting. Once
92 /// stopped, further attempts to operate on a stream will yield
93 /// `ClosedStream` errors.
94 pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
95 let mut state = self.conn.state();
96 if self.is_0rtt && !state.check_0rtt() {
97 return Ok(());
98 }
99 state.conn.recv_stream(self.stream).stop(error_code)?;
100 state.wake();
101 self.all_data_read = true;
102 Ok(())
103 }
104
105 /// Completes when the stream has been reset by the peer or otherwise
106 /// closed.
107 ///
108 /// Yields `Some` with the reset error code when the stream is reset by the
109 /// peer. Yields `None` when the stream was previously
110 /// [`stop()`](Self::stop)ed, or when the stream was
111 /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has
112 /// been received, after which it is no longer meaningful for the stream to
113 /// be reset.
114 ///
115 /// This operation is cancel-safe.
116 pub async fn stopped(&mut self) -> Result<Option<VarInt>, StoppedError> {
117 poll_fn(|cx| {
118 let mut state = self.conn.state();
119
120 if self.is_0rtt && !state.check_0rtt() {
121 return Poll::Ready(Err(StoppedError::ZeroRttRejected));
122 }
123 if let Some(code) = self.reset {
124 return Poll::Ready(Ok(Some(code)));
125 }
126
127 match state.conn.recv_stream(self.stream).received_reset() {
128 Err(_) => Poll::Ready(Ok(None)),
129 Ok(Some(error_code)) => {
130 // Stream state has just now been freed, so the connection may need to issue new
131 // stream ID flow control credit
132 state.wake();
133 Poll::Ready(Ok(Some(error_code)))
134 }
135 Ok(None) => {
136 if let Some(e) = &state.error {
137 return Poll::Ready(Err(e.clone().into()));
138 }
139 // Resets always notify readers, since a reset is an immediate read error. We
140 // could introduce a dedicated channel to reduce the risk of spurious wakeups,
141 // but that increased complexity is probably not justified, as an application
142 // that is expecting a reset is not likely to receive large amounts of data.
143 state.readable.insert(self.stream, cx.waker().clone());
144 Poll::Pending
145 }
146 }
147 })
148 .await
149 }
150
151 /// Handle common logic related to reading out of a receive stream.
152 ///
153 /// This takes an `FnMut` closure that takes care of the actual reading
154 /// process, matching the detailed read semantics for the calling
155 /// function with a particular return type. The closure can read from
156 /// the passed `&mut Chunks` and has to return the status after reading:
157 /// the amount of data read, and the status after the final read call.
158 fn execute_poll_read<F, T>(
159 &mut self,
160 cx: &mut Context,
161 ordered: bool,
162 mut read_fn: F,
163 ) -> Poll<Result<Option<T>, ReadError>>
164 where
165 F: FnMut(&mut Chunks) -> ReadStatus<T>,
166 {
167 use quinn_proto::ReadError::*;
168
169 if self.all_data_read {
170 return Poll::Ready(Ok(None));
171 }
172
173 let mut state = self.conn.state();
174 if self.is_0rtt && !state.check_0rtt() {
175 return Poll::Ready(Err(ReadError::ZeroRttRejected));
176 }
177
178 // If we stored an error during a previous call, return it now. This can happen
179 // if a `read_fn` both wants to return data and also returns an error in
180 // its final stream status.
181 let status = match self.reset {
182 Some(code) => ReadStatus::Failed(None, Reset(code)),
183 None => {
184 let mut recv = state.conn.recv_stream(self.stream);
185 let mut chunks = recv.read(ordered)?;
186 let status = read_fn(&mut chunks);
187 if chunks.finalize().should_transmit() {
188 state.wake();
189 }
190 status
191 }
192 };
193
194 match status {
195 ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
196 ReadStatus::Finished(read) => {
197 self.all_data_read = true;
198 Poll::Ready(Ok(read))
199 }
200 ReadStatus::Failed(read, Blocked) => match read {
201 Some(val) => Poll::Ready(Ok(Some(val))),
202 None => {
203 if let Some(error) = &state.error {
204 return Poll::Ready(Err(error.clone().into()));
205 }
206 state.readable.insert(self.stream, cx.waker().clone());
207 Poll::Pending
208 }
209 },
210 ReadStatus::Failed(read, Reset(error_code)) => match read {
211 None => {
212 self.all_data_read = true;
213 self.reset = Some(error_code);
214 Poll::Ready(Err(ReadError::Reset(error_code)))
215 }
216 done => {
217 self.reset = Some(error_code);
218 Poll::Ready(Ok(done))
219 }
220 },
221 }
222 }
223
224 fn poll_read(
225 &mut self,
226 cx: &mut Context,
227 mut buf: impl BufMut,
228 ) -> Poll<Result<Option<usize>, ReadError>> {
229 if !buf.has_remaining_mut() {
230 return Poll::Ready(Ok(Some(0)));
231 }
232
233 self.execute_poll_read(cx, true, |chunks| {
234 let mut read = 0;
235 loop {
236 if !buf.has_remaining_mut() {
237 // We know `read` is `true` because `buf.remaining()` was not 0 before
238 return ReadStatus::Readable(read);
239 }
240
241 match chunks.next(buf.remaining_mut()) {
242 Ok(Some(chunk)) => {
243 read += chunk.bytes.len();
244 buf.put(chunk.bytes);
245 }
246 res => {
247 return (if read == 0 { None } else { Some(read) }, res.err()).into();
248 }
249 }
250 }
251 })
252 }
253
254 /// Read data contiguously from the stream.
255 ///
256 /// Yields the number of bytes read into `buf` on success, or `None` if the
257 /// stream was finished.
258 ///
259 /// This operation is cancel-safe.
260 pub async fn read(&mut self, mut buf: impl BufMut) -> Result<Option<usize>, ReadError> {
261 poll_fn(|cx| self.poll_read(cx, &mut buf)).await
262 }
263
264 /// Read an exact number of bytes contiguously from the stream.
265 ///
266 /// See [`read()`] for details. This operation is *not* cancel-safe.
267 ///
268 /// [`read()`]: RecvStream::read
269 pub async fn read_exact(&mut self, mut buf: impl BufMut) -> Result<(), ReadExactError> {
270 poll_fn(|cx| {
271 while buf.has_remaining_mut() {
272 if ready!(self.poll_read(cx, &mut buf))?.is_none() {
273 return Poll::Ready(Err(ReadExactError::FinishedEarly(buf.remaining_mut())));
274 }
275 }
276 Poll::Ready(Ok(()))
277 })
278 .await
279 }
280
281 /// Read the next segment of data.
282 ///
283 /// Yields `None` if the stream was finished. Otherwise, yields a segment of
284 /// data and its offset in the stream. If `ordered` is `true`, the chunk's
285 /// offset will be immediately after the last data yielded by
286 /// [`read()`](Self::read) or [`read_chunk()`](Self::read_chunk). If
287 /// `ordered` is `false`, segments may be received in any order, and the
288 /// `Chunk`'s `offset` field can be used to determine ordering in the
289 /// caller. Unordered reads are less prone to head-of-line blocking within a
290 /// stream, but require the application to manage reassembling the original
291 /// data.
292 ///
293 /// Slightly more efficient than `read` due to not copying. Chunk boundaries
294 /// do not correspond to peer writes, and hence cannot be used as framing.
295 ///
296 /// This operation is cancel-safe.
297 pub async fn read_chunk(
298 &mut self,
299 max_length: usize,
300 ordered: bool,
301 ) -> Result<Option<Chunk>, ReadError> {
302 poll_fn(|cx| {
303 self.execute_poll_read(cx, ordered, |chunks| match chunks.next(max_length) {
304 Ok(Some(chunk)) => ReadStatus::Readable(chunk),
305 res => (None, res.err()).into(),
306 })
307 })
308 .await
309 }
310
311 /// Read the next segments of data.
312 ///
313 /// Fills `bufs` with the segments of data beginning immediately after the
314 /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
315 /// finished.
316 ///
317 /// Slightly more efficient than `read` due to not copying. Chunk boundaries
318 /// do not correspond to peer writes, and hence cannot be used as framing.
319 ///
320 /// This operation is cancel-safe.
321 pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
322 if bufs.is_empty() {
323 return Ok(Some(0));
324 }
325
326 poll_fn(|cx| {
327 self.execute_poll_read(cx, true, |chunks| {
328 let mut read = 0;
329 loop {
330 if read >= bufs.len() {
331 // We know `read > 0` because `bufs` cannot be empty here
332 return ReadStatus::Readable(read);
333 }
334
335 match chunks.next(usize::MAX) {
336 Ok(Some(chunk)) => {
337 bufs[read] = chunk.bytes;
338 read += 1;
339 }
340 res => {
341 return (if read == 0 { None } else { Some(read) }, res.err()).into();
342 }
343 }
344 }
345 })
346 })
347 .await
348 }
349
350 /// Convenience method to read all remaining data into a buffer.
351 ///
352 /// Uses unordered reads to be more efficient than using [`AsyncRead`]. If
353 /// unordered reads have already been made, the resulting buffer may have
354 /// gaps containing zero.
355 ///
356 /// Depending on [`BufMut`] implementation, this method may fail with
357 /// [`ReadError::BufferTooShort`] if the buffer is not large enough to
358 /// hold the entire stream. For example when using a `&mut [u8]` it will
359 /// never receive bytes more than the length of the slice, but when using a
360 /// `&mut Vec<u8>` it will allocate more memory as needed.
361 ///
362 /// This operation is *not* cancel-safe.
363 pub async fn read_to_end(&mut self, mut buf: impl BufMut) -> Result<usize, ReadError> {
364 let mut start = u64::MAX;
365 let mut end = 0;
366 let mut chunks = BTreeMap::new();
367 loop {
368 let Some(chunk) = self.read_chunk(usize::MAX, false).await? else {
369 break;
370 };
371 start = start.min(chunk.offset);
372 end = end.max(chunk.offset + chunk.bytes.len() as u64);
373 if end - start > buf.remaining_mut() as u64 {
374 return Err(ReadError::BufferTooShort);
375 }
376 chunks.insert(chunk.offset, chunk.bytes);
377 }
378 let mut last = 0;
379 for (offset, bytes) in chunks {
380 let offset = (offset - start) as usize;
381 if offset > last {
382 buf.put_bytes(0, offset - last);
383 }
384 last = offset + bytes.len();
385 buf.put(bytes);
386 }
387 Ok((end - start) as usize)
388 }
389}
390
391impl Drop for RecvStream {
392 fn drop(&mut self) {
393 let mut state = self.conn.state();
394
395 // clean up any previously registered wakers
396 state.readable.remove(&self.stream);
397
398 if state.error.is_some() || (self.is_0rtt && !state.check_0rtt()) {
399 return;
400 }
401 if !self.all_data_read {
402 // Ignore ClosedStream errors
403 let _ = state.conn.recv_stream(self.stream).stop(0u32.into());
404 state.wake();
405 }
406 }
407}
408
409enum ReadStatus<T> {
410 Readable(T),
411 Finished(Option<T>),
412 Failed(Option<T>, quinn_proto::ReadError),
413}
414
415impl<T> From<(Option<T>, Option<quinn_proto::ReadError>)> for ReadStatus<T> {
416 fn from(status: (Option<T>, Option<quinn_proto::ReadError>)) -> Self {
417 match status {
418 (read, None) => Self::Finished(read),
419 (read, Some(e)) => Self::Failed(read, e),
420 }
421 }
422}
423
424/// Errors that arise from reading from a stream.
425#[derive(Debug, Error, Clone, PartialEq, Eq)]
426pub enum ReadError {
427 /// The peer abandoned transmitting data on this stream.
428 ///
429 /// Carries an application-defined error code.
430 #[error("stream reset by peer: error {0}")]
431 Reset(VarInt),
432 /// The connection was lost.
433 #[error("connection lost")]
434 ConnectionLost(#[from] ConnectionError),
435 /// The stream has already been stopped, finished, or reset.
436 #[error("closed stream")]
437 ClosedStream,
438 /// Attempted an ordered read following an unordered read.
439 ///
440 /// Performing an unordered read allows discontinuities to arise in the
441 /// receive buffer of a stream which cannot be recovered, making further
442 /// ordered reads impossible.
443 #[error("ordered read after unordered read")]
444 IllegalOrderedRead,
445 /// This was a 0-RTT stream and the server rejected it.
446 ///
447 /// Can only occur on clients for 0-RTT streams, which can be opened using
448 /// [`Connecting::into_0rtt()`].
449 ///
450 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
451 #[error("0-RTT rejected")]
452 ZeroRttRejected,
453 /// The stream is larger than the user-supplied buffer capacity.
454 ///
455 /// Can only occur when using [`read_to_end()`](RecvStream::read_to_end).
456 #[error("buffer too short")]
457 BufferTooShort,
458}
459
460impl From<ReadableError> for ReadError {
461 fn from(e: ReadableError) -> Self {
462 match e {
463 ReadableError::ClosedStream => Self::ClosedStream,
464 ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
465 }
466 }
467}
468
469impl From<StoppedError> for ReadError {
470 fn from(e: StoppedError) -> Self {
471 match e {
472 StoppedError::ConnectionLost(e) => Self::ConnectionLost(e),
473 StoppedError::ZeroRttRejected => Self::ZeroRttRejected,
474 }
475 }
476}
477
478impl From<ReadError> for io::Error {
479 fn from(x: ReadError) -> Self {
480 use self::ReadError::*;
481 let kind = match x {
482 Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
483 ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
484 IllegalOrderedRead | BufferTooShort => io::ErrorKind::InvalidInput,
485 };
486 Self::new(kind, x)
487 }
488}
489
490/// Errors that arise from reading from a stream.
491#[derive(Debug, Error, Clone, PartialEq, Eq)]
492pub enum ReadExactError {
493 /// The stream finished before all bytes were read
494 #[error("stream finished early (expected {0} bytes more)")]
495 FinishedEarly(usize),
496 /// A read error occurred
497 #[error(transparent)]
498 ReadError(#[from] ReadError),
499}
500
501impl AsyncRead for RecvStream {
502 async fn read<B: IoBufMut>(&mut self, mut buf: B) -> BufResult<usize, B> {
503 let res = self
504 .read(buf.as_mut_slice())
505 .await
506 .map(|n| {
507 let n = n.unwrap_or_default();
508 unsafe { buf.set_buf_init(n) }
509 n
510 })
511 .map_err(Into::into);
512 BufResult(res, buf)
513 }
514}
515
516#[cfg(feature = "io-compat")]
517impl futures_util::AsyncRead for RecvStream {
518 fn poll_read(
519 self: std::pin::Pin<&mut Self>,
520 cx: &mut Context<'_>,
521 buf: &mut [u8],
522 ) -> Poll<io::Result<usize>> {
523 self.get_mut()
524 .poll_read(cx, buf)
525 .map_ok(Option::unwrap_or_default)
526 .map_err(Into::into)
527 }
528}
529
530#[cfg(feature = "h3")]
531pub(crate) mod h3_impl {
532 use h3::quic::{self, StreamErrorIncoming};
533
534 use super::*;
535
536 impl From<ReadError> for StreamErrorIncoming {
537 fn from(e: ReadError) -> Self {
538 use ReadError::*;
539 match e {
540 Reset(code) => Self::StreamTerminated {
541 error_code: code.into_inner(),
542 },
543 ConnectionLost(e) => Self::ConnectionErrorIncoming {
544 connection_error: e.into(),
545 },
546 IllegalOrderedRead => unreachable!("illegal ordered read"),
547 e => Self::Unknown(Box::new(e)),
548 }
549 }
550 }
551
552 impl quic::RecvStream for RecvStream {
553 type Buf = Bytes;
554
555 fn poll_data(
556 &mut self,
557 cx: &mut Context<'_>,
558 ) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
559 self.execute_poll_read(cx, true, |chunks| match chunks.next(usize::MAX) {
560 Ok(Some(chunk)) => ReadStatus::Readable(chunk.bytes),
561 res => (None, res.err()).into(),
562 })
563 .map_err(Into::into)
564 }
565
566 fn stop_sending(&mut self, error_code: u64) {
567 self.stop(error_code.try_into().expect("invalid error_code"))
568 .ok();
569 }
570
571 fn recv_id(&self) -> quic::StreamId {
572 u64::from(self.stream).try_into().unwrap()
573 }
574 }
575}