1use std::collections::hash_map::Entry;
9use std::mem;
10
11use thiserror::Error;
12use tracing::debug;
13
14use super::state::get_or_insert_recv;
15use super::{ClosedStream, Retransmits, ShouldTransmit, StreamId, StreamsState};
16use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
17use crate::connection::streams::state::StreamRecv;
18use crate::{TransportError, VarInt, frame};
19
20#[derive(Debug, Default)]
21pub(super) struct Recv {
22 state: RecvState,
24 pub(super) assembler: Assembler,
25 sent_max_stream_data: u64,
26 pub(super) end: u64,
27 pub(super) stopped: bool,
28}
29
30impl Recv {
31 pub(super) fn new(initial_max_data: u64) -> Box<Self> {
32 Box::new(Self {
33 state: RecvState::default(),
34 assembler: Assembler::new(),
35 sent_max_stream_data: initial_max_data,
36 end: 0,
37 stopped: false,
38 })
39 }
40
41 pub(super) fn reinit(&mut self, initial_max_data: u64) {
43 self.state = RecvState::default();
44 self.assembler.reinit();
45 self.sent_max_stream_data = initial_max_data;
46 self.end = 0;
47 self.stopped = false;
48 }
49
50 pub(super) fn ingest(
54 &mut self,
55 frame: frame::Stream,
56 payload_len: usize,
57 received: u64,
58 max_data: u64,
59 ) -> Result<(u64, bool), TransportError> {
60 let end = frame.offset + frame.data.len() as u64;
61 if end >= 2u64.pow(62) {
62 return Err(TransportError::FLOW_CONTROL_ERROR(
63 "maximum stream offset too large",
64 ));
65 }
66
67 if let Some(final_offset) = self.final_offset() {
68 if end > final_offset || (frame.fin && end != final_offset) {
69 debug!(end, final_offset, "final size error");
70 return Err(TransportError::FINAL_SIZE_ERROR(""));
71 }
72 }
73
74 let new_bytes = self.credit_consumed_by(end, received, max_data)?;
75
76 if frame.fin && !self.stopped {
79 if let RecvState::Recv { ref mut size } = self.state {
80 *size = Some(end);
81 }
82 }
83
84 self.end = self.end.max(end);
85 if !self.stopped {
88 self.assembler.insert(frame.offset, frame.data, payload_len);
89 }
90
91 Ok((new_bytes, frame.fin && self.stopped))
92 }
93
94 pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), ClosedStream> {
95 if self.stopped {
96 return Err(ClosedStream { _private: () });
97 }
98
99 self.stopped = true;
100 self.assembler.clear();
101 let read_credits = self.end - self.assembler.bytes_read();
103 Ok((read_credits, ShouldTransmit(self.is_receiving())))
108 }
109
110 pub(super) fn max_stream_data(&mut self, stream_receive_window: u64) -> (u64, ShouldTransmit) {
118 let max_stream_data = self.assembler.bytes_read() + stream_receive_window;
119
120 let diff = max_stream_data - self.sent_max_stream_data;
128 let transmit = self.can_send_flow_control() && diff >= (stream_receive_window / 8);
129 (max_stream_data, ShouldTransmit(transmit))
130 }
131
132 pub(super) fn record_sent_max_stream_data(&mut self, sent_value: u64) {
138 if sent_value > self.sent_max_stream_data {
139 self.sent_max_stream_data = sent_value;
140 }
141 }
142
143 pub(super) fn final_offset_unknown(&self) -> bool {
151 matches!(self.state, RecvState::Recv { size: None })
152 }
153
154 pub(super) fn can_send_flow_control(&self) -> bool {
156 self.final_offset_unknown() && !self.stopped
159 }
160
161 pub(super) fn is_receiving(&self) -> bool {
163 matches!(self.state, RecvState::Recv { .. })
164 }
165
166 fn final_offset(&self) -> Option<u64> {
167 match self.state {
168 RecvState::Recv { size } => size,
169 RecvState::ResetRecvd { size, .. } => Some(size),
170 }
171 }
172
173 pub(super) fn reset(
175 &mut self,
176 error_code: VarInt,
177 final_offset: VarInt,
178 received: u64,
179 max_data: u64,
180 ) -> Result<bool, TransportError> {
181 if let Some(offset) = self.final_offset() {
183 if offset != final_offset.into_inner() {
184 return Err(TransportError::FINAL_SIZE_ERROR("inconsistent value"));
185 }
186 } else if self.end > u64::from(final_offset) {
187 return Err(TransportError::FINAL_SIZE_ERROR(
188 "lower than high water mark",
189 ));
190 }
191 self.credit_consumed_by(final_offset.into(), received, max_data)?;
192
193 if matches!(self.state, RecvState::ResetRecvd { .. }) {
194 return Ok(false);
195 }
196 self.state = RecvState::ResetRecvd {
197 size: final_offset.into(),
198 error_code,
199 };
200 self.assembler.clear();
205 Ok(true)
206 }
207
208 pub(super) fn reset_code(&self) -> Option<VarInt> {
209 match self.state {
210 RecvState::ResetRecvd { error_code, .. } => Some(error_code),
211 _ => None,
212 }
213 }
214
215 fn credit_consumed_by(
218 &self,
219 offset: u64,
220 received: u64,
221 max_data: u64,
222 ) -> Result<u64, TransportError> {
223 let prev_end = self.end;
224 let new_bytes = offset.saturating_sub(prev_end);
225 if offset > self.sent_max_stream_data || received + new_bytes > max_data {
226 debug!(
227 received,
228 new_bytes,
229 max_data,
230 offset,
231 stream_max_data = self.sent_max_stream_data,
232 "flow control error"
233 );
234 return Err(TransportError::FLOW_CONTROL_ERROR(""));
235 }
236
237 Ok(new_bytes)
238 }
239}
240
241pub struct Chunks<'a> {
253 id: StreamId,
254 ordered: bool,
255 streams: &'a mut StreamsState,
256 pending: &'a mut Retransmits,
257 state: ChunksState,
258 read: u64,
259}
260
261impl<'a> Chunks<'a> {
262 pub(super) fn new(
263 id: StreamId,
264 ordered: bool,
265 streams: &'a mut StreamsState,
266 pending: &'a mut Retransmits,
267 ) -> Result<Self, ReadableError> {
268 let mut entry = match streams.recv.entry(id) {
269 Entry::Occupied(entry) => entry,
270 Entry::Vacant(_) => return Err(ReadableError::ClosedStream),
271 };
272
273 let mut recv =
274 match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
275 true => return Err(ReadableError::ClosedStream),
276 false => entry.remove().unwrap().into_inner(), };
278
279 recv.assembler.ensure_ordering(ordered)?;
280 Ok(Self {
281 id,
282 ordered,
283 streams,
284 pending,
285 state: ChunksState::Readable(recv),
286 read: 0,
287 })
288 }
289
290 pub fn next(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
294 let rs = match self.state {
295 ChunksState::Readable(ref mut rs) => rs,
296 ChunksState::Reset(error_code) => {
297 return Err(ReadError::Reset(error_code));
298 }
299 ChunksState::Finished => {
300 return Ok(None);
301 }
302 ChunksState::Finalized => panic!("must not call next() after finalize()"),
303 };
304
305 if let Some(chunk) = rs.assembler.read(max_length, self.ordered) {
306 self.read += chunk.bytes.len() as u64;
307 return Ok(Some(chunk));
308 }
309
310 match rs.state {
311 RecvState::ResetRecvd { error_code, .. } => {
312 debug_assert_eq!(self.read, 0, "reset streams have empty buffers");
313 let state = mem::replace(&mut self.state, ChunksState::Reset(error_code));
314 let recv = match state {
316 ChunksState::Readable(recv) => StreamRecv::Open(recv),
317 _ => unreachable!("state must be ChunkState::Readable"),
318 };
319 self.streams.stream_recv_freed(self.id, recv);
320 Err(ReadError::Reset(error_code))
321 }
322 RecvState::Recv { size } => {
323 if size == Some(rs.end) && rs.assembler.bytes_read() == rs.end {
324 let state = mem::replace(&mut self.state, ChunksState::Finished);
325 let recv = match state {
327 ChunksState::Readable(recv) => StreamRecv::Open(recv),
328 _ => unreachable!("state must be ChunkState::Readable"),
329 };
330 self.streams.stream_recv_freed(self.id, recv);
331 Ok(None)
332 } else {
333 Err(ReadError::Blocked)
338 }
339 }
340 }
341 }
342
343 pub fn finalize(mut self) -> ShouldTransmit {
353 self.finalize_inner()
354 }
355
356 fn finalize_inner(&mut self) -> ShouldTransmit {
357 let state = mem::replace(&mut self.state, ChunksState::Finalized);
358 if let ChunksState::Finalized = state {
359 return ShouldTransmit(false);
361 }
362
363 let mut should_transmit = self.streams.queue_max_stream_id(self.pending);
367
368 if let ChunksState::Readable(mut rs) = state {
370 let (_, max_stream_data) = rs.max_stream_data(self.streams.stream_receive_window);
371 should_transmit |= max_stream_data.0;
372 if max_stream_data.0 {
373 self.pending.max_stream_data.insert(self.id);
374 }
375 self.streams
377 .recv
378 .insert(self.id, Some(StreamRecv::Open(rs)));
379 }
380
381 let max_data = self.streams.add_read_credits(self.read);
383 self.pending.max_data |= max_data.0;
384 should_transmit |= max_data.0;
385 ShouldTransmit(should_transmit)
386 }
387}
388
389impl Drop for Chunks<'_> {
390 fn drop(&mut self) {
391 let _ = self.finalize_inner();
392 }
393}
394
395enum ChunksState {
396 Readable(Box<Recv>),
397 Reset(VarInt),
398 Finished,
399 Finalized,
400}
401
402#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
404pub enum ReadError {
405 #[error("blocked")]
410 Blocked,
411 #[error("reset by peer: code {0}")]
415 Reset(VarInt),
416 #[error("stream closed due to connection error")]
418 ConnectionClosed,
419}
420
421#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
423pub enum ReadableError {
424 #[error("closed stream")]
426 ClosedStream,
427 #[error("ordered read after unordered read")]
432 IllegalOrderedRead,
433 #[error("stream closed due to connection error")]
435 ConnectionClosed,
436}
437
438impl From<IllegalOrderedRead> for ReadableError {
439 fn from(_: IllegalOrderedRead) -> Self {
440 Self::IllegalOrderedRead
441 }
442}
443
444#[derive(Debug, Copy, Clone, Eq, PartialEq)]
445enum RecvState {
446 Recv { size: Option<u64> },
447 ResetRecvd { size: u64, error_code: VarInt },
448}
449
450impl Default for RecvState {
451 fn default() -> Self {
452 Self::Recv { size: None }
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use bytes::Bytes;
459
460 use crate::{Dir, Side};
461
462 use super::*;
463
464 #[test]
465 fn reordered_frames_while_stopped() {
466 const INITIAL_BYTES: u64 = 3;
467 const INITIAL_OFFSET: u64 = 3;
468 const RECV_WINDOW: u64 = 8;
469 let mut s = Recv::new(RECV_WINDOW);
470 let mut data_recvd = 0;
471 let (new_bytes, is_closed) = s
473 .ingest(
474 frame::Stream {
475 id: StreamId::new(Side::Client, Dir::Uni, 0),
476 offset: INITIAL_OFFSET,
477 fin: false,
478 data: Bytes::from_static(&[0; INITIAL_BYTES as usize]),
479 },
480 123,
481 data_recvd,
482 data_recvd + 1024,
483 )
484 .unwrap();
485 data_recvd += new_bytes;
486 assert_eq!(new_bytes, INITIAL_OFFSET + INITIAL_BYTES);
487 assert!(!is_closed);
488
489 let (credits, transmit) = s.stop().unwrap();
490 assert!(transmit.should_transmit());
491 assert_eq!(
492 credits,
493 INITIAL_OFFSET + INITIAL_BYTES,
494 "full connection flow control credit is issued by stop"
495 );
496
497 let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
498 assert!(!transmit.should_transmit());
499 assert_eq!(
500 max_stream_data, RECV_WINDOW,
501 "stream flow control credit isn't issued by stop"
502 );
503
504 let (new_bytes, is_closed) = s
506 .ingest(
507 frame::Stream {
508 id: StreamId::new(Side::Client, Dir::Uni, 0),
509 offset: RECV_WINDOW - 1,
510 fin: false,
511 data: Bytes::from_static(&[0; 1]),
512 },
513 123,
514 data_recvd,
515 data_recvd + 1024,
516 )
517 .unwrap();
518 data_recvd += new_bytes;
519 assert_eq!(new_bytes, RECV_WINDOW - (INITIAL_OFFSET + INITIAL_BYTES));
520 assert!(!is_closed);
521
522 let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
523 assert!(!transmit.should_transmit());
524 assert_eq!(
525 max_stream_data, RECV_WINDOW,
526 "stream flow control credit isn't issued after stop"
527 );
528
529 let (new_bytes, is_closed) = s
531 .ingest(
532 frame::Stream {
533 id: StreamId::new(Side::Client, Dir::Uni, 0),
534 offset: 0,
535 fin: false,
536 data: Bytes::from_static(&[0; INITIAL_OFFSET as usize]),
537 },
538 123,
539 data_recvd,
540 data_recvd + 1024,
541 )
542 .unwrap();
543 assert_eq!(
544 new_bytes, 0,
545 "reordered frames don't issue connection-level flow control for stopped streams"
546 );
547 assert!(!is_closed);
548
549 let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
550 assert!(!transmit.should_transmit());
551 assert_eq!(
552 max_stream_data, RECV_WINDOW,
553 "stream flow control credit isn't issued after stop"
554 );
555 }
556}