phantom_protocol/transport/stream.rs
1//! Phantom Transport - Stream Management
2//!
3//! Multiplexed streams within a session.
4//! Each stream has independent sequence numbers (no Head-of-Line blocking).
5
6use crate::transport::types::{SequenceNumber, StreamId};
7
8use bytes::Bytes;
9use std::collections::VecDeque;
10use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::{Mutex, Notify, Semaphore};
14
15const MAX_PENDING_PACKETS: usize = 1024;
16
17/// Initial per-stream send window — caps how many bytes the local
18/// side will put on the wire before receiving a `WINDOW_UPDATE` from
19/// the peer. 64 KiB matches QUIC's stream initial-window default.
20pub const INITIAL_STREAM_WINDOW: u32 = 64 * 1024;
21
22/// Hard ceiling on the credit-based send window. `WINDOW_UPDATE` frames add
23/// *relative* credit; this caps the accumulated window so a peer that floods
24/// inflated credits cannot overflow the counter. A compliant peer never grants
25/// more than ~one [`INITIAL_STREAM_WINDOW`] of outstanding credit, so the cap is
26/// only a misbehaving-peer guard (the receiver's own delivery HARD_CAP is the
27/// real bound on buffering).
28pub const MAX_SEND_WINDOW: u32 = 8 * INITIAL_STREAM_WINDOW;
29
30/// Stream state
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum StreamState {
33 /// Stream is open for both directions
34 Open,
35 /// Local side has finished sending
36 HalfClosedLocal,
37 /// Remote side has finished sending
38 HalfClosedRemote,
39 /// Stream is fully closed
40 Closed,
41}
42
43/// Pending data waiting to be sent
44#[derive(Debug)]
45struct PendingData {
46 sequence: SequenceNumber,
47 data: Bytes,
48 sent_at: Option<tokio::time::Instant>,
49 #[allow(dead_code)]
50 retries: u32,
51}
52
53/// One segment handed back by [`Stream::poll_send`] for transmission.
54#[derive(Debug, Clone)]
55pub struct OutboundSegment {
56 /// Sequence number of the segment.
57 pub seq: SequenceNumber,
58 /// Payload bytes.
59 pub data: Bytes,
60 /// Whether the segment is on the reliable (ACK-tracked) path.
61 pub reliable: bool,
62 /// True when this is a retransmission (the RTO expired) rather than a first
63 /// transmission — the caller reports it to congestion control as a loss.
64 pub retransmit: bool,
65}
66
67/// RFC 6298 retransmission-timeout estimator (per stream). Replaces a fixed
68/// retransmit timer with one that tracks measured RTT (SRTT / RTTVAR) and backs
69/// off exponentially on consecutive timeouts.
70#[derive(Debug)]
71struct RtoEstimator {
72 /// Smoothed RTT; `None` until the first measurement.
73 srtt: Option<Duration>,
74 /// RTT variation estimate.
75 rttvar: Duration,
76 /// Number of consecutive timeouts (RTO is doubled `backoff_shift` times).
77 backoff_shift: u32,
78}
79
80impl RtoEstimator {
81 /// RFC 6298 (2.1): RTO before the first measurement.
82 const INITIAL_RTO: Duration = Duration::from_secs(1);
83 /// Floor — RFC's 1s minimum is too conservative for a low-latency transport.
84 const MIN_RTO: Duration = Duration::from_millis(200);
85 /// Ceiling, so a stalled path can't push the timer arbitrarily high.
86 const MAX_RTO: Duration = Duration::from_secs(60);
87 /// Clock-granularity term `G` in RFC 6298 (2.3).
88 const GRANULARITY: Duration = Duration::from_millis(1);
89 /// Cap on the backoff doubling (2^6 = 64×).
90 const MAX_BACKOFF_SHIFT: u32 = 6;
91
92 fn new() -> Self {
93 Self {
94 srtt: None,
95 rttvar: Duration::ZERO,
96 backoff_shift: 0,
97 }
98 }
99
100 /// Feed a fresh (non-retransmitted, per Karn) RTT measurement.
101 fn on_rtt_sample(&mut self, r: Duration) {
102 match self.srtt {
103 None => {
104 // RFC 6298 (2.2): first measurement.
105 self.srtt = Some(r);
106 self.rttvar = r / 2;
107 }
108 Some(srtt) => {
109 // RFC 6298 (2.3): RTTVAR = (1-1/4)·RTTVAR + 1/4·|SRTT-R|;
110 // SRTT = (1-1/8)·SRTT + 1/8·R.
111 let diff = if srtt > r { srtt - r } else { r - srtt };
112 self.rttvar = (self.rttvar * 3 + diff) / 4;
113 self.srtt = Some((srtt * 7 + r) / 8);
114 }
115 }
116 // A fresh measurement clears any accumulated backoff.
117 self.backoff_shift = 0;
118 }
119
120 /// Current RTO, honoring backoff and the floor / ceiling.
121 fn rto(&self) -> Duration {
122 // RFC 6298 (2.2)/(2.3): RTO = SRTT + max(G, K·RTTVAR), K = 4.
123 let base = match self.srtt {
124 None => Self::INITIAL_RTO,
125 Some(srtt) => srtt + std::cmp::max(Self::GRANULARITY, self.rttvar * 4),
126 };
127 // Exponential backoff (RFC 6298 (5.5)); saturate to MAX_RTO on overflow.
128 let scaled = base
129 .checked_mul(1u32 << self.backoff_shift)
130 .unwrap_or(Self::MAX_RTO);
131 scaled.clamp(Self::MIN_RTO, Self::MAX_RTO)
132 }
133
134 /// On a retransmission timeout: double the RTO (RFC 6298 (5.5)).
135 fn on_timeout(&mut self) {
136 self.backoff_shift = (self.backoff_shift + 1).min(Self::MAX_BACKOFF_SHIFT);
137 }
138}
139
140#[cfg(test)]
141mod rto_tests {
142 use super::RtoEstimator;
143 use std::time::Duration;
144
145 #[test]
146 fn follows_rfc6298_srtt_rttvar() {
147 let mut est = RtoEstimator::new();
148 // No samples yet → initial 1s.
149 assert_eq!(est.rto(), Duration::from_secs(1));
150 // First sample R=100ms: SRTT=100, RTTVAR=50, RTO = 100 + 4*50 = 300ms.
151 est.on_rtt_sample(Duration::from_millis(100));
152 assert_eq!(est.rto(), Duration::from_millis(300));
153 // A steady stream of identical samples drives RTTVAR→0, so RTO→SRTT,
154 // floored at MIN_RTO (200ms).
155 for _ in 0..50 {
156 est.on_rtt_sample(Duration::from_millis(100));
157 }
158 assert_eq!(est.rto(), Duration::from_millis(200));
159 }
160
161 #[test]
162 fn backoff_doubles_and_fresh_sample_resets() {
163 let mut est = RtoEstimator::new();
164 est.on_rtt_sample(Duration::from_millis(100)); // RTO = 300ms
165 assert_eq!(est.rto(), Duration::from_millis(300));
166 est.on_timeout();
167 assert_eq!(est.rto(), Duration::from_millis(600));
168 est.on_timeout();
169 assert_eq!(est.rto(), Duration::from_millis(1200));
170 // A fresh measurement clears the backoff. This is a *second* sample, so
171 // RTTVAR shrinks 50ms → 37.5ms and RTO = 100 + 4*37.5 = 250ms. The key
172 // check is that backoff is gone: with shift still at 2 it would be 1000ms.
173 est.on_rtt_sample(Duration::from_millis(100));
174 assert_eq!(est.rto(), Duration::from_millis(250));
175 }
176}
177
178/// Stream - multiplexed data channel within a session
179pub struct Stream {
180 /// Stream identifier
181 id: StreamId,
182 /// Current state
183 state: Mutex<StreamState>,
184 /// Send sequence number
185 send_sequence: AtomicU32,
186 /// Next expected receive sequence
187 recv_sequence: AtomicU32,
188 /// Send buffer (data waiting to be sent)
189 send_buffer: Mutex<VecDeque<PendingData>>,
190 /// Unreliable send buffer (fire and forget)
191 unreliable_buffer: Mutex<VecDeque<(SequenceNumber, Bytes)>>,
192 /// Receive buffer (out-of-order data)
193 recv_buffer: Mutex<VecDeque<(SequenceNumber, Bytes)>>,
194 /// Ordered receive queue (ready for application)
195 recv_ready: Mutex<VecDeque<Bytes>>,
196 /// Notify when data is ready to read
197 recv_notify: Notify,
198 /// Whether stream is finished locally
199 local_finished: AtomicBool,
200 /// Whether stream is finished remotely
201 remote_finished: AtomicBool,
202 /// Priority (higher = more important)
203 priority: AtomicU32,
204 /// Backpressure semaphore
205 send_semaphore: Arc<Semaphore>,
206 /// Bytes the **peer** has granted us to send — decremented as we
207 /// emit payload bytes, replenished by inbound `WINDOW_UPDATE`
208 /// frames (Phase 4.3). When it hits zero, `poll_send` stalls
209 /// until the next `WINDOW_UPDATE`.
210 peer_send_window: AtomicU32,
211 /// Bytes the local side has granted the peer — replenished as
212 /// the application drains `recv_ready`. We periodically emit a
213 /// `WINDOW_UPDATE` carrying the new absolute window.
214 local_recv_window: AtomicU32,
215 /// Total bytes the local side has consumed since the last
216 /// emitted `WINDOW_UPDATE`. Used to decide when to send the
217 /// next update (avoid flooding the wire with tiny updates).
218 bytes_since_last_update: AtomicU32,
219 /// Pending **relative** flow-control credit to advertise in a
220 /// `WINDOW_UPDATE`, staged by the receive **delivery** task (which credits
221 /// the window on *real* app consumption) and flushed by the **send loop** —
222 /// the single writer that also owns rekey, so the encrypted control frame is
223 /// sealed under a consistent epoch. Credits accumulate additively, so
224 /// several grants between two flushes are never lost. `0` = nothing pending.
225 pending_window_update: AtomicU32,
226 /// RFC 6298 retransmission-timeout estimator. A plain (sync) mutex: it is
227 /// updated only from the serial ACK path and read by `poll_send`, and the
228 /// guard is never held across an `.await`.
229 rto: std::sync::Mutex<RtoEstimator>,
230}
231
232impl Stream {
233 /// Create a new stream
234 pub fn new(id: StreamId) -> Self {
235 Self {
236 id,
237 state: Mutex::new(StreamState::Open),
238 send_sequence: AtomicU32::new(0),
239 recv_sequence: AtomicU32::new(0),
240 send_buffer: Mutex::new(VecDeque::new()),
241 unreliable_buffer: Mutex::new(VecDeque::new()),
242 recv_buffer: Mutex::new(VecDeque::new()),
243 recv_ready: Mutex::new(VecDeque::new()),
244 recv_notify: Notify::new(),
245 local_finished: AtomicBool::new(false),
246 remote_finished: AtomicBool::new(false),
247 priority: AtomicU32::new(0),
248 send_semaphore: Arc::new(Semaphore::new(MAX_PENDING_PACKETS)),
249 peer_send_window: AtomicU32::new(INITIAL_STREAM_WINDOW),
250 local_recv_window: AtomicU32::new(INITIAL_STREAM_WINDOW),
251 bytes_since_last_update: AtomicU32::new(0),
252 pending_window_update: AtomicU32::new(0),
253 rto: std::sync::Mutex::new(RtoEstimator::new()),
254 }
255 }
256
257 // ── RFC 6298 retransmission timeout ──
258
259 /// Current retransmission timeout. A poisoned lock is recovered by taking
260 /// the inner value — the RTO is a heuristic, not a correctness invariant.
261 fn current_rto(&self) -> Duration {
262 match self.rto.lock() {
263 Ok(g) => g.rto(),
264 Err(poisoned) => poisoned.into_inner().rto(),
265 }
266 }
267
268 /// Feed a fresh RTT measurement into the RTO estimator.
269 fn record_rtt_sample(&self, rtt: Duration) {
270 let mut g = match self.rto.lock() {
271 Ok(g) => g,
272 Err(poisoned) => poisoned.into_inner(),
273 };
274 g.on_rtt_sample(rtt);
275 }
276
277 /// Tell the RTO estimator a segment timed out (exponential backoff).
278 fn note_rto_timeout(&self) {
279 let mut g = match self.rto.lock() {
280 Ok(g) => g,
281 Err(poisoned) => poisoned.into_inner(),
282 };
283 g.on_timeout();
284 }
285
286 /// Get stream ID
287 pub fn id(&self) -> StreamId {
288 self.id
289 }
290
291 /// Get current state
292 pub async fn state(&self) -> StreamState {
293 *self.state.lock().await
294 }
295
296 /// Get priority
297 pub fn priority(&self) -> u32 {
298 self.priority.load(Ordering::Relaxed)
299 }
300
301 /// Set priority
302 pub fn set_priority(&self, priority: u32) {
303 self.priority.store(priority, Ordering::Relaxed);
304 }
305
306 // ── Flow control (Phase 4.3) ──
307
308 /// Bytes the peer currently allows us to send.
309 pub fn peer_send_window(&self) -> u32 {
310 self.peer_send_window.load(Ordering::Acquire)
311 }
312
313 /// Atomically reserve `n` bytes from the peer's send window.
314 /// Returns `true` if the reservation succeeded (and the window
315 /// was decremented); `false` if the window doesn't have enough
316 /// capacity — caller must wait for a `WINDOW_UPDATE`.
317 pub fn try_consume_send_window(&self, n: u32) -> bool {
318 let mut cur = self.peer_send_window.load(Ordering::Acquire);
319 loop {
320 if cur < n {
321 return false;
322 }
323 match self.peer_send_window.compare_exchange_weak(
324 cur,
325 cur - n,
326 Ordering::AcqRel,
327 Ordering::Acquire,
328 ) {
329 Ok(_) => return true,
330 Err(actual) => cur = actual,
331 }
332 }
333 }
334
335 /// Process an inbound `WINDOW_UPDATE` from the peer. The payload is a
336 /// **relative credit** — the number of bytes the peer's application just
337 /// consumed and is therefore newly willing to receive. We *add* it to the
338 /// send window (saturating at [`MAX_SEND_WINDOW`] so a misbehaving peer's
339 /// inflated credit cannot overflow the counter).
340 ///
341 /// Relative credit (vs. an absolute window) is what makes flow control
342 /// correct for a session of any length: the sender's window is
343 /// `initial + Σ credit_granted − Σ bytes_sent` = `initial + consumed −
344 /// sent`, so the receiver's outstanding (unconsumed) bytes `sent − consumed`
345 /// are bounded by `initial`. An absolute u32 window could not express this
346 /// for sessions exceeding 4 GiB and over-committed the receiver's buffer.
347 pub fn apply_peer_window_update(&self, credit: u32) {
348 let mut cur = self.peer_send_window.load(Ordering::Acquire);
349 loop {
350 let next = cur.saturating_add(credit).min(MAX_SEND_WINDOW);
351 if next == cur {
352 return; // already at the cap; nothing to add
353 }
354 match self.peer_send_window.compare_exchange_weak(
355 cur,
356 next,
357 Ordering::AcqRel,
358 Ordering::Acquire,
359 ) {
360 Ok(_) => return,
361 Err(actual) => cur = actual,
362 }
363 }
364 }
365
366 /// Bytes the local side has granted the peer.
367 pub fn local_recv_window(&self) -> u32 {
368 self.local_recv_window.load(Ordering::Acquire)
369 }
370
371 /// Record that the application has actually consumed `n` bytes from this
372 /// stream (called by the receive *delivery* task on real drainage, not
373 /// on routing). Accumulates the consumed bytes and, once the unreported
374 /// total crosses half the initial window, returns `Some(credit)` — the
375 /// **relative credit** to advertise in a `WINDOW_UPDATE` (the peer *adds*
376 /// it to its send window). The half-window threshold trades update frequency
377 /// against peer stalls.
378 pub fn record_app_consumed(&self, n: u32) -> Option<u32> {
379 let pending = self.bytes_since_last_update.fetch_add(n, Ordering::AcqRel) + n;
380 let threshold = INITIAL_STREAM_WINDOW / 2;
381 if pending >= threshold {
382 // Grant exactly the bytes we accumulated since the last update and
383 // reset the accumulator. Use a CAS-free `fetch_sub` of the granted
384 // amount rather than `store(0)` so a concurrent consume isn't lost.
385 self.bytes_since_last_update
386 .fetch_sub(pending, Ordering::AcqRel);
387 // Keep the (now informational) local_recv_window in step for stats.
388 self.local_recv_window.fetch_add(pending, Ordering::AcqRel);
389 Some(pending)
390 } else {
391 None
392 }
393 }
394
395 /// Stage relative flow-control credit to be flushed by the send loop.
396 /// Called by the receive delivery task after it credits real app
397 /// consumption. Credits **accumulate additively** (saturating at
398 /// `u32::MAX`) rather than overwriting, so several grants landing between
399 /// two send-loop flushes are summed instead of lost — the send loop is the
400 /// single emitter (epoch-safe), and it may run arbitrarily after a grant.
401 pub fn stage_window_update_credit(&self, credit: u32) {
402 let mut cur = self.pending_window_update.load(Ordering::Acquire);
403 loop {
404 let next = cur.saturating_add(credit);
405 if next == cur {
406 return; // nothing to add (zero credit, or already saturated)
407 }
408 match self.pending_window_update.compare_exchange_weak(
409 cur,
410 next,
411 Ordering::AcqRel,
412 Ordering::Acquire,
413 ) {
414 Ok(_) => return,
415 Err(actual) => cur = actual,
416 }
417 }
418 }
419
420 /// Take all staged credit (swaps the slot back to `0`). The send loop calls
421 /// this each drain pass and emits one `WINDOW_UPDATE` carrying the summed
422 /// credit if `Some`.
423 pub fn take_pending_window_update(&self) -> Option<u32> {
424 match self.pending_window_update.swap(0, Ordering::AcqRel) {
425 0 => None,
426 w => Some(w),
427 }
428 }
429
430 /// Queue data for sending with reliability
431 ///
432 /// Returns the sequence number assigned to this chunk.
433 pub async fn send_reliable(&self, data: Bytes) -> SequenceNumber {
434 // Backpressure: wait until there is space in the buffer.
435 // PANIC-SAFETY: `Semaphore::acquire` only errors after `close()`. The
436 // `send_semaphore` is a private field of this struct, constructed in
437 // `Stream::new` and never closed anywhere in the crate — the variant
438 // is structurally unreachable.
439 #[allow(clippy::expect_used)]
440 let permit = self
441 .send_semaphore
442 .acquire()
443 .await
444 .expect("Semaphore closed");
445 permit.forget();
446
447 let seq = self.send_sequence.fetch_add(1, Ordering::SeqCst);
448
449 let pending = PendingData {
450 sequence: seq,
451 data,
452 sent_at: None,
453 retries: 0,
454 };
455
456 self.send_buffer.lock().await.push_back(pending);
457
458 seq
459 }
460
461 /// Reserve the next outbound sequence number from this stream's send space.
462 ///
463 /// Control frames that are emitted directly on a data stream (e.g.
464 /// `WINDOW_UPDATE`, a bare `FIN`) MUST draw their sequence from here rather
465 /// than a private counter: the AEAD nonce is `(epoch, stream_id, sequence,
466 /// path_id)` and the receiver's replay window is keyed on `(stream_id,
467 /// sequence)`, so a control frame sharing a `(stream_id, sequence)` with a
468 /// data packet in the same epoch would reuse a nonce **and** be dropped as a
469 /// replay. Sharing one monotonic space keeps every packet on the stream
470 /// unique. Control frames are unreliable, so the resulting gap in the data
471 /// sequence is harmless (no ACK is expected, nothing waits to reassemble it).
472 pub fn next_send_sequence(&self) -> SequenceNumber {
473 self.send_sequence.fetch_add(1, Ordering::SeqCst)
474 }
475
476 /// Queue data for unreliable sending
477 ///
478 /// Returns the sequence number assigned to this chunk.
479 pub async fn send_unreliable(&self, data: Bytes) -> SequenceNumber {
480 // Unreliable data does not consume buffer permits
481 let seq = self.send_sequence.fetch_add(1, Ordering::SeqCst);
482
483 self.unreliable_buffer.lock().await.push_back((seq, data));
484
485 seq
486 }
487
488 /// Get the next segment to (re)transmit, or `None` if nothing is due.
489 ///
490 /// `cwnd_budget` is how many bytes of *new* data the congestion window
491 /// currently permits. Retransmissions ignore it — loss recovery must always
492 /// proceed — but a first transmission is withheld (`None`) when it would
493 /// exceed the budget, so the next drain resumes once ACKs free the window.
494 /// Pass `u64::MAX` to disable the limit.
495 pub async fn poll_send(&self, cwnd_budget: u64) -> Option<OutboundSegment> {
496 // Unreliable data is fire-and-forget and not congestion-controlled.
497 if let Some((seq, data)) = self.unreliable_buffer.lock().await.pop_front() {
498 return Some(OutboundSegment {
499 seq,
500 data,
501 reliable: false,
502 retransmit: false,
503 });
504 }
505
506 let mut buffer = self.send_buffer.lock().await;
507 let now = tokio::time::Instant::now();
508 // Adaptive RFC 6298 timeout (was a fixed 500ms).
509 let timeout = self.current_rto();
510
511 // Pass 1: a timed-out segment (retransmission) — always allowed.
512 for pending in buffer.iter_mut() {
513 if let Some(sent_at) = pending.sent_at {
514 if now.duration_since(sent_at) >= timeout {
515 pending.sent_at = Some(now);
516 pending.retries += 1;
517 // Back the RTO off exponentially for the next attempt.
518 self.note_rto_timeout();
519 return Some(OutboundSegment {
520 seq: pending.sequence,
521 data: pending.data.clone(),
522 reliable: true,
523 retransmit: true,
524 });
525 }
526 }
527 }
528
529 // Pass 2: the next unsent segment, if it fits BOTH the congestion window
530 // AND the peer's advertised flow-control window. In-order: if the head
531 // unsent segment doesn't fit, stop (don't skip). Retransmissions (Pass 1)
532 // bypass both budgets — those bytes were already accounted on first send
533 // (Karn), and loss recovery must always proceed.
534 for pending in buffer.iter_mut() {
535 if pending.sent_at.is_none() {
536 let len = pending.data.len() as u64;
537 if len > cwnd_budget {
538 return None; // congestion window full — wait for ACKs to free it
539 }
540 // Flow-control enforcement: consume the peer's advertised
541 // receive window. If it is exhausted, withhold the segment and
542 // wait for a `WINDOW_UPDATE` — this is what propagates a slow
543 // peer-side consumer back to us as real backpressure (the
544 // receive delivery task only credits the window on actual app
545 // consumption). `try_consume_send_window` is an atomic CAS; on
546 // success the window is debited and we WILL send (no later check
547 // can fail), so the debit never leaks.
548 if !self.try_consume_send_window(len as u32) {
549 return None; // peer flow-control window closed — wait for WINDOW_UPDATE
550 }
551 pending.sent_at = Some(now);
552 return Some(OutboundSegment {
553 seq: pending.sequence,
554 data: pending.data.clone(),
555 reliable: true,
556 retransmit: false,
557 });
558 }
559 }
560
561 None
562 }
563
564 /// Mark a sequence number as acknowledged.
565 /// Returns the timestamp when the packet was originally sent and its size, if found.
566 pub async fn ack(&self, sequence: SequenceNumber) -> Option<(tokio::time::Instant, u64)> {
567 let mut buffer = self.send_buffer.lock().await;
568 let mut result = None;
569
570 // Find the packet and get its sent_at time
571 if let Some(pos) = buffer.iter().position(|p| p.sequence == sequence) {
572 let sent_at = buffer[pos].sent_at;
573 let retries = buffer[pos].retries;
574 let size = buffer[pos].data.len() as u64;
575 buffer.remove(pos);
576
577 // Released space, add permit back
578 self.send_semaphore.add_permits(1);
579
580 if let Some(sent_at) = sent_at {
581 result = Some((sent_at, size));
582 // Karn's algorithm: only sample RTT from segments that were not
583 // retransmitted — an ACK for a resent sequence is ambiguous.
584 if retries == 0 {
585 let rtt = tokio::time::Instant::now().duration_since(sent_at);
586 self.record_rtt_sample(rtt);
587 }
588 }
589 }
590
591 result
592 }
593
594 /// Reset a still-buffered reliable segment's send timestamp so the next
595 /// [`poll_send`](Self::poll_send) re-offers it immediately (as an unsent
596 /// segment) rather than waiting a full RTO for the retransmit pass. Used
597 /// when a send attempt failed *after* `poll_send` had already stamped
598 /// `sent_at` — the bytes never reached the wire, so the segment must not be
599 /// treated as in-flight. No-op if the segment was already acknowledged and
600 /// removed.
601 pub async fn mark_unsent(&self, sequence: SequenceNumber) {
602 let mut buffer = self.send_buffer.lock().await;
603 if let Some(pending) = buffer.iter_mut().find(|p| p.sequence == sequence) {
604 pending.sent_at = None;
605 }
606 }
607
608 /// Handle received data
609 ///
610 /// Data is buffered until it can be delivered in order.
611 pub async fn on_receive(&self, sequence: SequenceNumber, data: Bytes) {
612 let expected = self.recv_sequence.load(Ordering::SeqCst);
613
614 if sequence == expected {
615 // In-order delivery
616 self.recv_ready.lock().await.push_back(data);
617 self.recv_sequence.fetch_add(1, Ordering::SeqCst);
618
619 // Try to deliver buffered out-of-order data
620 self.deliver_buffered().await;
621
622 // Notify waiters
623 self.recv_notify.notify_waiters();
624 } else if sequence > expected {
625 // Out-of-order, buffer it
626 self.recv_buffer.lock().await.push_back((sequence, data));
627 }
628 // sequence < expected means duplicate, ignore it
629 }
630
631 /// Try to deliver buffered out-of-order data
632 async fn deliver_buffered(&self) {
633 let mut recv_buf = self.recv_buffer.lock().await;
634 let mut ready = self.recv_ready.lock().await;
635
636 loop {
637 let expected = self.recv_sequence.load(Ordering::SeqCst);
638
639 // Find and remove the expected sequence.
640 // PANIC-SAFETY: `pos` was just returned by `iter().position(...)`,
641 // so `recv_buf` has an element at that index — `remove` cannot
642 // return `None`. `recv_buf` is locked for the duration of this
643 // loop, so no other task can drain it.
644 if let Some(pos) = recv_buf.iter().position(|(seq, _)| *seq == expected) {
645 #[allow(clippy::unwrap_used, clippy::disallowed_methods)]
646 let (_, data) = recv_buf.remove(pos).unwrap();
647 ready.push_back(data);
648 self.recv_sequence.fetch_add(1, Ordering::SeqCst);
649 } else {
650 break;
651 }
652 }
653 }
654
655 /// Read data from the stream (async, waits if no data available)
656 pub async fn recv(&self) -> Option<Bytes> {
657 loop {
658 {
659 let mut ready = self.recv_ready.lock().await;
660 if let Some(data) = ready.pop_front() {
661 return Some(data);
662 }
663
664 // Check if stream is closed
665 if self.remote_finished.load(Ordering::SeqCst) {
666 return None;
667 }
668 }
669
670 // Wait for new data
671 self.recv_notify.notified().await;
672 }
673 }
674
675 /// Try to read data without waiting
676 pub async fn try_recv(&self) -> Option<Bytes> {
677 self.recv_ready.lock().await.pop_front()
678 }
679
680 /// Mark local side as finished (no more data to send)
681 pub async fn finish(&self) {
682 self.local_finished.store(true, Ordering::SeqCst);
683 self.update_state().await;
684 }
685
686 /// Mark remote side as finished
687 pub async fn on_remote_finish(&self) {
688 self.remote_finished.store(true, Ordering::SeqCst);
689 self.recv_notify.notify_waiters();
690 self.update_state().await;
691 }
692
693 /// Update stream state based on finish flags
694 async fn update_state(&self) {
695 let local = self.local_finished.load(Ordering::SeqCst);
696 let remote = self.remote_finished.load(Ordering::SeqCst);
697
698 let new_state = match (local, remote) {
699 (true, true) => StreamState::Closed,
700 (true, false) => StreamState::HalfClosedLocal,
701 (false, true) => StreamState::HalfClosedRemote,
702 (false, false) => StreamState::Open,
703 };
704
705 *self.state.lock().await = new_state;
706 }
707
708 /// Get number of pending send chunks
709 pub async fn pending_send_count(&self) -> usize {
710 self.send_buffer.lock().await.len()
711 }
712
713 /// Get number of pending receive chunks
714 pub async fn pending_recv_count(&self) -> usize {
715 self.recv_ready.lock().await.len()
716 }
717
718 /// Check if stream is closed
719 pub fn is_closed(&self) -> bool {
720 self.local_finished.load(Ordering::SeqCst) && self.remote_finished.load(Ordering::SeqCst)
721 }
722}
723
724impl std::fmt::Debug for Stream {
725 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726 f.debug_struct("Stream")
727 .field("id", &self.id)
728 .field("send_seq", &self.send_sequence.load(Ordering::Relaxed))
729 .field("recv_seq", &self.recv_sequence.load(Ordering::Relaxed))
730 .field("priority", &self.priority.load(Ordering::Relaxed))
731 .finish()
732 }
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738
739 #[tokio::test]
740 async fn test_stream_send_recv() {
741 let stream = Stream::new(1);
742
743 // Send data
744 stream.send_reliable(Bytes::from("hello")).await;
745 stream.send_reliable(Bytes::from("world")).await;
746
747 // Check pending
748 assert_eq!(stream.pending_send_count().await, 2);
749
750 // Poll send twice, the second should be None because it's already sent and hasn't timed out
751 let seg = stream.poll_send(u64::MAX).await.unwrap();
752 assert_eq!(seg.seq, 0);
753 assert_eq!(seg.data, Bytes::from("hello"));
754 assert!(seg.reliable);
755 assert!(!seg.retransmit);
756
757 let seg2 = stream.poll_send(u64::MAX).await.unwrap();
758 assert_eq!(seg2.seq, 1);
759 assert_eq!(seg2.data, Bytes::from("world"));
760 assert!(seg2.reliable);
761 assert!(!seg2.retransmit);
762
763 assert!(stream.poll_send(u64::MAX).await.is_none());
764 }
765
766 #[tokio::test]
767 async fn test_stream_retransmission() {
768 // We use tokio::time::pause to mock time and test timeout
769 tokio::time::pause();
770 let stream = Stream::new(1);
771
772 stream.send_reliable(Bytes::from("hello")).await;
773
774 // First send — not a retransmission.
775 let seg = stream.poll_send(u64::MAX).await.unwrap();
776 assert_eq!(seg.seq, 0);
777 assert!(seg.reliable);
778 assert!(!seg.retransmit);
779
780 // Immediate poll should be None
781 assert!(stream.poll_send(u64::MAX).await.is_none());
782
783 // Advance 400ms — still under the initial 1s RTO (RFC 6298 (2.1):
784 // no RTT samples yet, so the timer sits at the 1-second default).
785 tokio::time::advance(std::time::Duration::from_millis(400)).await;
786 assert!(stream.poll_send(u64::MAX).await.is_none());
787
788 // Advance past the 1s initial RTO (total ~1.1s).
789 tokio::time::advance(std::time::Duration::from_millis(700)).await;
790
791 // Now it should retransmit — flagged as a retransmission.
792 let seg2 = stream.poll_send(u64::MAX).await.unwrap();
793 assert_eq!(seg2.seq, 0);
794 assert_eq!(seg2.data, Bytes::from("hello"));
795 assert!(seg2.reliable);
796 assert!(seg2.retransmit);
797
798 // Ack it
799 let acked = stream.ack(0).await;
800 assert!(acked.is_some());
801
802 // Poll again - queue is empty
803 assert!(stream.poll_send(u64::MAX).await.is_none());
804 }
805
806 #[tokio::test]
807 async fn mark_unsent_re_offers_without_waiting_rto() {
808 // Time is paused, so nothing ever crosses the RTO — any re-offer here is
809 // due to `mark_unsent`, not the retransmit timer.
810 tokio::time::pause();
811 let stream = Stream::new(1);
812 stream.send_reliable(Bytes::from("hello")).await;
813
814 // First poll stamps `sent_at`; an immediate re-poll yields nothing
815 // (treated as in-flight, not yet timed out).
816 let seg = stream.poll_send(u64::MAX).await.unwrap();
817 assert_eq!(seg.seq, 0);
818 assert!(!seg.retransmit);
819 assert!(stream.poll_send(u64::MAX).await.is_none());
820
821 // Simulate a send that failed *after* `poll_send` stamped the segment:
822 // clear `sent_at` so it is no longer considered in-flight.
823 stream.mark_unsent(0).await;
824
825 // It is re-offered immediately — without advancing past the RTO — and as
826 // a fresh send (Pass 2), not a retransmission.
827 let seg2 = stream.poll_send(u64::MAX).await.unwrap();
828 assert_eq!(seg2.seq, 0);
829 assert_eq!(seg2.data, Bytes::from("hello"));
830 assert!(seg2.reliable);
831 assert!(!seg2.retransmit);
832
833 // `mark_unsent` on an already-acked (removed) segment is a no-op.
834 assert!(stream.ack(0).await.is_some());
835 stream.mark_unsent(0).await; // no panic, no effect
836 assert!(stream.poll_send(u64::MAX).await.is_none());
837 }
838
839 #[tokio::test]
840 async fn poll_send_respects_the_cwnd_budget() {
841 let stream = Stream::new(1);
842 stream.send_reliable(Bytes::from("0123456789")).await; // 10 bytes
843 stream.send_reliable(Bytes::from("abcde")).await; // 5 bytes
844
845 // Budget of 10 admits the 10-byte head segment.
846 let seg = stream.poll_send(10).await.unwrap();
847 assert_eq!(seg.data.len(), 10);
848 assert!(!seg.retransmit);
849
850 // Budget of 4 is too small for the next (5-byte) segment → withheld.
851 assert!(stream.poll_send(4).await.is_none());
852
853 // A budget of 5 now admits it.
854 let seg2 = stream.poll_send(5).await.unwrap();
855 assert_eq!(seg2.data, Bytes::from("abcde"));
856 }
857
858 #[tokio::test]
859 async fn test_stream_in_order_receive() {
860 let stream = Stream::new(1);
861
862 // Receive in order
863 stream.on_receive(0, Bytes::from("first")).await;
864 stream.on_receive(1, Bytes::from("second")).await;
865
866 assert_eq!(stream.try_recv().await, Some(Bytes::from("first")));
867 assert_eq!(stream.try_recv().await, Some(Bytes::from("second")));
868 assert_eq!(stream.try_recv().await, None);
869 }
870
871 #[tokio::test]
872 async fn test_stream_out_of_order_receive() {
873 let stream = Stream::new(1);
874
875 // Receive out of order
876 stream.on_receive(1, Bytes::from("second")).await;
877 stream.on_receive(0, Bytes::from("first")).await;
878
879 // Should be reordered
880 assert_eq!(stream.try_recv().await, Some(Bytes::from("first")));
881 assert_eq!(stream.try_recv().await, Some(Bytes::from("second")));
882 }
883
884 #[tokio::test]
885 async fn test_stream_state() {
886 let stream = Stream::new(1);
887
888 assert_eq!(stream.state().await, StreamState::Open);
889
890 stream.finish().await;
891 assert_eq!(stream.state().await, StreamState::HalfClosedLocal);
892
893 stream.on_remote_finish().await;
894 assert_eq!(stream.state().await, StreamState::Closed);
895 assert!(stream.is_closed());
896 }
897
898 #[tokio::test]
899 async fn test_stream_backpressure() {
900 let stream = Stream::new(1);
901
902 // Fill the buffer
903 for _ in 0..MAX_PENDING_PACKETS {
904 stream.send_reliable(Bytes::from("data")).await;
905 }
906
907 assert_eq!(stream.pending_send_count().await, MAX_PENDING_PACKETS);
908
909 // Try to send one more with timeout
910 let send_future = stream.send_reliable(Bytes::from("blocked"));
911 let result = tokio::time::timeout(std::time::Duration::from_millis(100), send_future).await;
912 assert!(result.is_err(), "Send should have blocked");
913
914 // Ack one
915 stream.ack(0).await;
916
917 // Now it should succeed
918 let send_future = stream.send_reliable(Bytes::from("resumed"));
919 let result = tokio::time::timeout(std::time::Duration::from_millis(100), send_future).await;
920 assert!(result.is_ok(), "Send should have succeeded after ack");
921 assert_eq!(stream.pending_send_count().await, MAX_PENDING_PACKETS);
922 }
923
924 // ── Flow control (Phase 4.3) ──
925
926 #[test]
927 fn peer_send_window_starts_at_initial() {
928 let s = Stream::new(1);
929 assert_eq!(s.peer_send_window(), INITIAL_STREAM_WINDOW);
930 }
931
932 #[test]
933 fn try_consume_send_window_decrements_atomically() {
934 let s = Stream::new(1);
935 assert!(s.try_consume_send_window(1000));
936 assert_eq!(s.peer_send_window(), INITIAL_STREAM_WINDOW - 1000);
937 assert!(s.try_consume_send_window(INITIAL_STREAM_WINDOW - 1000));
938 assert_eq!(s.peer_send_window(), 0);
939 // Further consumption fails until refilled.
940 assert!(!s.try_consume_send_window(1));
941 }
942
943 #[test]
944 fn apply_peer_window_update_adds_relative_credit() {
945 let s = Stream::new(1);
946 // Drain to 100 bytes.
947 assert!(s.try_consume_send_window(INITIAL_STREAM_WINDOW - 100));
948 assert_eq!(s.peer_send_window(), 100);
949
950 // A WINDOW_UPDATE is a relative credit: it ADDS to the window.
951 s.apply_peer_window_update(1000);
952 assert_eq!(s.peer_send_window(), 1100);
953 s.apply_peer_window_update(50);
954 assert_eq!(s.peer_send_window(), 1150);
955
956 // Saturates at the hard cap (misbehaving-peer guard).
957 s.apply_peer_window_update(u32::MAX);
958 assert_eq!(s.peer_send_window(), MAX_SEND_WINDOW);
959 }
960
961 #[test]
962 fn record_app_consumed_grants_relative_credit_after_threshold() {
963 let s = Stream::new(1);
964 let threshold = INITIAL_STREAM_WINDOW / 2;
965
966 // Small drains return None.
967 assert!(s.record_app_consumed(100).is_none());
968 assert!(s.record_app_consumed(200).is_none());
969
970 // Drain across the half-window threshold → emit a credit equal to the
971 // accumulated consumption (300 + threshold), NOT an absolute window.
972 let credit = s.record_app_consumed(threshold);
973 assert_eq!(
974 credit,
975 Some(300 + threshold),
976 "WINDOW_UPDATE carries the relative credit (bytes consumed since last update)"
977 );
978
979 // Counter resets after emitting — small further drains do not re-emit.
980 assert!(s.record_app_consumed(10).is_none());
981 }
982
983 #[test]
984 fn relative_credit_round_trip_bounds_outstanding_to_one_window() {
985 // Model: receiver grants credit == consumed; sender's window =
986 // initial + Σcredit − Σsent, so outstanding (sent − consumed) ≤ initial.
987 let sender = Stream::new(1);
988 let receiver = Stream::new(1);
989 let threshold = INITIAL_STREAM_WINDOW / 2;
990
991 // Sender fills the initial window exactly.
992 assert!(sender.try_consume_send_window(INITIAL_STREAM_WINDOW));
993 assert_eq!(sender.peer_send_window(), 0, "initial window exhausted");
994
995 // Receiver consumes one threshold's worth → grants that much credit.
996 let credit = receiver
997 .record_app_consumed(threshold)
998 .expect("threshold crossed");
999 sender.apply_peer_window_update(credit);
1000 assert_eq!(
1001 sender.peer_send_window(),
1002 threshold,
1003 "sender may now send exactly the bytes the receiver consumed"
1004 );
1005 }
1006
1007 #[test]
1008 fn staged_window_update_credit_accumulates_until_taken() {
1009 let s = Stream::new(1);
1010 assert_eq!(s.take_pending_window_update(), None);
1011
1012 // Two grants staged before a single flush must SUM, not overwrite: the
1013 // send loop (sole emitter) may run arbitrarily late after a credit is
1014 // staged, so back-to-back grants would otherwise lose all but the last
1015 // — a permanent credit leak that shrinks the peer's window over time.
1016 s.stage_window_update_credit(1000);
1017 s.stage_window_update_credit(2500);
1018 assert_eq!(s.take_pending_window_update(), Some(3500));
1019
1020 // The slot resets to empty once taken.
1021 assert_eq!(s.take_pending_window_update(), None);
1022
1023 // Accumulation saturates instead of wrapping past u32::MAX.
1024 s.stage_window_update_credit(u32::MAX);
1025 s.stage_window_update_credit(10);
1026 assert_eq!(s.take_pending_window_update(), Some(u32::MAX));
1027
1028 // Zero credit is a no-op (no spurious WINDOW_UPDATE).
1029 s.stage_window_update_credit(0);
1030 assert_eq!(s.take_pending_window_update(), None);
1031 }
1032}