Skip to main content

phantom_protocol/transport/
stream.rs

1//! Phantom Transport - Stream Management
2//!
3//! Multiplexed streams within a session.
4//! Each stream has independent sequence numbers (no Head-of-Line blocking).
5
6use crate::transport::types::{SequenceNumber, StreamId};
7
8use bytes::Bytes;
9use std::collections::VecDeque;
10use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::{Mutex, Notify, Semaphore};
14
15const MAX_PENDING_PACKETS: usize = 1024;
16
17/// Initial per-stream send window — caps how many bytes the local
18/// side will put on the wire before receiving a `WINDOW_UPDATE` from
19/// the peer. 64 KiB matches QUIC's stream initial-window default.
20pub const INITIAL_STREAM_WINDOW: u32 = 64 * 1024;
21
22/// Hard ceiling on the credit-based send window. `WINDOW_UPDATE` frames add
23/// *relative* credit; this caps the accumulated window so a peer that floods
24/// inflated credits cannot overflow the counter. A compliant peer never grants
25/// more than ~one [`INITIAL_STREAM_WINDOW`] of outstanding credit, so the cap is
26/// only a misbehaving-peer guard (the receiver's own delivery HARD_CAP is the
27/// real bound on buffering).
28pub const MAX_SEND_WINDOW: u32 = 8 * INITIAL_STREAM_WINDOW;
29
30/// Stream state
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum StreamState {
33    /// Stream is open for both directions
34    Open,
35    /// Local side has finished sending
36    HalfClosedLocal,
37    /// Remote side has finished sending
38    HalfClosedRemote,
39    /// Stream is fully closed
40    Closed,
41}
42
43/// Pending data waiting to be sent
44#[derive(Debug)]
45struct PendingData {
46    sequence: SequenceNumber,
47    data: Bytes,
48    sent_at: Option<tokio::time::Instant>,
49    #[allow(dead_code)]
50    retries: u32,
51}
52
53/// One segment handed back by [`Stream::poll_send`] for transmission.
54#[derive(Debug, Clone)]
55pub struct OutboundSegment {
56    /// Sequence number of the segment.
57    pub seq: SequenceNumber,
58    /// Payload bytes.
59    pub data: Bytes,
60    /// Whether the segment is on the reliable (ACK-tracked) path.
61    pub reliable: bool,
62    /// True when this is a retransmission (the RTO expired) rather than a first
63    /// transmission — the caller reports it to congestion control as a loss.
64    pub retransmit: bool,
65}
66
67/// RFC 6298 retransmission-timeout estimator (per stream). Replaces a fixed
68/// retransmit timer with one that tracks measured RTT (SRTT / RTTVAR) and backs
69/// off exponentially on consecutive timeouts.
70#[derive(Debug)]
71struct RtoEstimator {
72    /// Smoothed RTT; `None` until the first measurement.
73    srtt: Option<Duration>,
74    /// RTT variation estimate.
75    rttvar: Duration,
76    /// Number of consecutive timeouts (RTO is doubled `backoff_shift` times).
77    backoff_shift: u32,
78}
79
80impl RtoEstimator {
81    /// RFC 6298 (2.1): RTO before the first measurement.
82    const INITIAL_RTO: Duration = Duration::from_secs(1);
83    /// Floor — RFC's 1s minimum is too conservative for a low-latency transport.
84    const MIN_RTO: Duration = Duration::from_millis(200);
85    /// Ceiling, so a stalled path can't push the timer arbitrarily high.
86    const MAX_RTO: Duration = Duration::from_secs(60);
87    /// Clock-granularity term `G` in RFC 6298 (2.3).
88    const GRANULARITY: Duration = Duration::from_millis(1);
89    /// Cap on the backoff doubling (2^6 = 64×).
90    const MAX_BACKOFF_SHIFT: u32 = 6;
91
92    fn new() -> Self {
93        Self {
94            srtt: None,
95            rttvar: Duration::ZERO,
96            backoff_shift: 0,
97        }
98    }
99
100    /// Feed a fresh (non-retransmitted, per Karn) RTT measurement.
101    fn on_rtt_sample(&mut self, r: Duration) {
102        match self.srtt {
103            None => {
104                // RFC 6298 (2.2): first measurement.
105                self.srtt = Some(r);
106                self.rttvar = r / 2;
107            }
108            Some(srtt) => {
109                // RFC 6298 (2.3): RTTVAR = (1-1/4)·RTTVAR + 1/4·|SRTT-R|;
110                //                 SRTT  = (1-1/8)·SRTT  + 1/8·R.
111                let diff = if srtt > r { srtt - r } else { r - srtt };
112                self.rttvar = (self.rttvar * 3 + diff) / 4;
113                self.srtt = Some((srtt * 7 + r) / 8);
114            }
115        }
116        // A fresh measurement clears any accumulated backoff.
117        self.backoff_shift = 0;
118    }
119
120    /// Current RTO, honoring backoff and the floor / ceiling.
121    fn rto(&self) -> Duration {
122        // RFC 6298 (2.2)/(2.3): RTO = SRTT + max(G, K·RTTVAR), K = 4.
123        let base = match self.srtt {
124            None => Self::INITIAL_RTO,
125            Some(srtt) => srtt + std::cmp::max(Self::GRANULARITY, self.rttvar * 4),
126        };
127        // Exponential backoff (RFC 6298 (5.5)); saturate to MAX_RTO on overflow.
128        let scaled = base
129            .checked_mul(1u32 << self.backoff_shift)
130            .unwrap_or(Self::MAX_RTO);
131        scaled.clamp(Self::MIN_RTO, Self::MAX_RTO)
132    }
133
134    /// On a retransmission timeout: double the RTO (RFC 6298 (5.5)).
135    fn on_timeout(&mut self) {
136        self.backoff_shift = (self.backoff_shift + 1).min(Self::MAX_BACKOFF_SHIFT);
137    }
138}
139
140#[cfg(test)]
141mod rto_tests {
142    use super::RtoEstimator;
143    use std::time::Duration;
144
145    #[test]
146    fn follows_rfc6298_srtt_rttvar() {
147        let mut est = RtoEstimator::new();
148        // No samples yet → initial 1s.
149        assert_eq!(est.rto(), Duration::from_secs(1));
150        // First sample R=100ms: SRTT=100, RTTVAR=50, RTO = 100 + 4*50 = 300ms.
151        est.on_rtt_sample(Duration::from_millis(100));
152        assert_eq!(est.rto(), Duration::from_millis(300));
153        // A steady stream of identical samples drives RTTVAR→0, so RTO→SRTT,
154        // floored at MIN_RTO (200ms).
155        for _ in 0..50 {
156            est.on_rtt_sample(Duration::from_millis(100));
157        }
158        assert_eq!(est.rto(), Duration::from_millis(200));
159    }
160
161    #[test]
162    fn backoff_doubles_and_fresh_sample_resets() {
163        let mut est = RtoEstimator::new();
164        est.on_rtt_sample(Duration::from_millis(100)); // RTO = 300ms
165        assert_eq!(est.rto(), Duration::from_millis(300));
166        est.on_timeout();
167        assert_eq!(est.rto(), Duration::from_millis(600));
168        est.on_timeout();
169        assert_eq!(est.rto(), Duration::from_millis(1200));
170        // A fresh measurement clears the backoff. This is a *second* sample, so
171        // RTTVAR shrinks 50ms → 37.5ms and RTO = 100 + 4*37.5 = 250ms. The key
172        // check is that backoff is gone: with shift still at 2 it would be 1000ms.
173        est.on_rtt_sample(Duration::from_millis(100));
174        assert_eq!(est.rto(), Duration::from_millis(250));
175    }
176}
177
178/// Stream - multiplexed data channel within a session
179pub struct Stream {
180    /// Stream identifier
181    id: StreamId,
182    /// Current state
183    state: Mutex<StreamState>,
184    /// Send sequence number
185    send_sequence: AtomicU32,
186    /// Next expected receive sequence
187    recv_sequence: AtomicU32,
188    /// Send buffer (data waiting to be sent)
189    send_buffer: Mutex<VecDeque<PendingData>>,
190    /// Unreliable send buffer (fire and forget)
191    unreliable_buffer: Mutex<VecDeque<(SequenceNumber, Bytes)>>,
192    /// Receive buffer (out-of-order data)
193    recv_buffer: Mutex<VecDeque<(SequenceNumber, Bytes)>>,
194    /// Ordered receive queue (ready for application)
195    recv_ready: Mutex<VecDeque<Bytes>>,
196    /// Notify when data is ready to read
197    recv_notify: Notify,
198    /// Whether stream is finished locally
199    local_finished: AtomicBool,
200    /// Whether stream is finished remotely
201    remote_finished: AtomicBool,
202    /// Priority (higher = more important)
203    priority: AtomicU32,
204    /// Backpressure semaphore
205    send_semaphore: Arc<Semaphore>,
206    /// Bytes the **peer** has granted us to send — decremented as we
207    /// emit payload bytes, replenished by inbound `WINDOW_UPDATE`
208    /// frames (Phase 4.3). When it hits zero, `poll_send` stalls
209    /// until the next `WINDOW_UPDATE`.
210    peer_send_window: AtomicU32,
211    /// Bytes the local side has granted the peer — replenished as
212    /// the application drains `recv_ready`. We periodically emit a
213    /// `WINDOW_UPDATE` carrying the new absolute window.
214    local_recv_window: AtomicU32,
215    /// Total bytes the local side has consumed since the last
216    /// emitted `WINDOW_UPDATE`. Used to decide when to send the
217    /// next update (avoid flooding the wire with tiny updates).
218    bytes_since_last_update: AtomicU32,
219    /// Pending **relative** flow-control credit to advertise in a
220    /// `WINDOW_UPDATE`, staged by the receive **delivery** task (which credits
221    /// the window on *real* app consumption) and flushed by the **send loop** —
222    /// the single writer that also owns rekey, so the encrypted control frame is
223    /// sealed under a consistent epoch. Credits accumulate additively, so
224    /// several grants between two flushes are never lost. `0` = nothing pending.
225    pending_window_update: AtomicU32,
226    /// RFC 6298 retransmission-timeout estimator. A plain (sync) mutex: it is
227    /// updated only from the serial ACK path and read by `poll_send`, and the
228    /// guard is never held across an `.await`.
229    rto: std::sync::Mutex<RtoEstimator>,
230}
231
232impl Stream {
233    /// Create a new stream
234    pub fn new(id: StreamId) -> Self {
235        Self {
236            id,
237            state: Mutex::new(StreamState::Open),
238            send_sequence: AtomicU32::new(0),
239            recv_sequence: AtomicU32::new(0),
240            send_buffer: Mutex::new(VecDeque::new()),
241            unreliable_buffer: Mutex::new(VecDeque::new()),
242            recv_buffer: Mutex::new(VecDeque::new()),
243            recv_ready: Mutex::new(VecDeque::new()),
244            recv_notify: Notify::new(),
245            local_finished: AtomicBool::new(false),
246            remote_finished: AtomicBool::new(false),
247            priority: AtomicU32::new(0),
248            send_semaphore: Arc::new(Semaphore::new(MAX_PENDING_PACKETS)),
249            peer_send_window: AtomicU32::new(INITIAL_STREAM_WINDOW),
250            local_recv_window: AtomicU32::new(INITIAL_STREAM_WINDOW),
251            bytes_since_last_update: AtomicU32::new(0),
252            pending_window_update: AtomicU32::new(0),
253            rto: std::sync::Mutex::new(RtoEstimator::new()),
254        }
255    }
256
257    // ── RFC 6298 retransmission timeout ──
258
259    /// Current retransmission timeout. A poisoned lock is recovered by taking
260    /// the inner value — the RTO is a heuristic, not a correctness invariant.
261    fn current_rto(&self) -> Duration {
262        match self.rto.lock() {
263            Ok(g) => g.rto(),
264            Err(poisoned) => poisoned.into_inner().rto(),
265        }
266    }
267
268    /// Feed a fresh RTT measurement into the RTO estimator.
269    fn record_rtt_sample(&self, rtt: Duration) {
270        let mut g = match self.rto.lock() {
271            Ok(g) => g,
272            Err(poisoned) => poisoned.into_inner(),
273        };
274        g.on_rtt_sample(rtt);
275    }
276
277    /// Tell the RTO estimator a segment timed out (exponential backoff).
278    fn note_rto_timeout(&self) {
279        let mut g = match self.rto.lock() {
280            Ok(g) => g,
281            Err(poisoned) => poisoned.into_inner(),
282        };
283        g.on_timeout();
284    }
285
286    /// Get stream ID
287    pub fn id(&self) -> StreamId {
288        self.id
289    }
290
291    /// Get current state
292    pub async fn state(&self) -> StreamState {
293        *self.state.lock().await
294    }
295
296    /// Get priority
297    pub fn priority(&self) -> u32 {
298        self.priority.load(Ordering::Relaxed)
299    }
300
301    /// Set priority
302    pub fn set_priority(&self, priority: u32) {
303        self.priority.store(priority, Ordering::Relaxed);
304    }
305
306    // ── Flow control (Phase 4.3) ──
307
308    /// Bytes the peer currently allows us to send.
309    pub fn peer_send_window(&self) -> u32 {
310        self.peer_send_window.load(Ordering::Acquire)
311    }
312
313    /// Atomically reserve `n` bytes from the peer's send window.
314    /// Returns `true` if the reservation succeeded (and the window
315    /// was decremented); `false` if the window doesn't have enough
316    /// capacity — caller must wait for a `WINDOW_UPDATE`.
317    pub fn try_consume_send_window(&self, n: u32) -> bool {
318        let mut cur = self.peer_send_window.load(Ordering::Acquire);
319        loop {
320            if cur < n {
321                return false;
322            }
323            match self.peer_send_window.compare_exchange_weak(
324                cur,
325                cur - n,
326                Ordering::AcqRel,
327                Ordering::Acquire,
328            ) {
329                Ok(_) => return true,
330                Err(actual) => cur = actual,
331            }
332        }
333    }
334
335    /// Process an inbound `WINDOW_UPDATE` from the peer. The payload is a
336    /// **relative credit** — the number of bytes the peer's application just
337    /// consumed and is therefore newly willing to receive. We *add* it to the
338    /// send window (saturating at [`MAX_SEND_WINDOW`] so a misbehaving peer's
339    /// inflated credit cannot overflow the counter).
340    ///
341    /// Relative credit (vs. an absolute window) is what makes flow control
342    /// correct for a session of any length: the sender's window is
343    /// `initial + Σ credit_granted − Σ bytes_sent` = `initial + consumed −
344    /// sent`, so the receiver's outstanding (unconsumed) bytes `sent − consumed`
345    /// are bounded by `initial`. An absolute u32 window could not express this
346    /// for sessions exceeding 4 GiB and over-committed the receiver's buffer.
347    pub fn apply_peer_window_update(&self, credit: u32) {
348        let mut cur = self.peer_send_window.load(Ordering::Acquire);
349        loop {
350            let next = cur.saturating_add(credit).min(MAX_SEND_WINDOW);
351            if next == cur {
352                return; // already at the cap; nothing to add
353            }
354            match self.peer_send_window.compare_exchange_weak(
355                cur,
356                next,
357                Ordering::AcqRel,
358                Ordering::Acquire,
359            ) {
360                Ok(_) => return,
361                Err(actual) => cur = actual,
362            }
363        }
364    }
365
366    /// Bytes the local side has granted the peer.
367    pub fn local_recv_window(&self) -> u32 {
368        self.local_recv_window.load(Ordering::Acquire)
369    }
370
371    /// Record that the application has actually consumed `n` bytes from this
372    /// stream (called by the receive *delivery* task on real drainage, not
373    /// on routing). Accumulates the consumed bytes and, once the unreported
374    /// total crosses half the initial window, returns `Some(credit)` — the
375    /// **relative credit** to advertise in a `WINDOW_UPDATE` (the peer *adds*
376    /// it to its send window). The half-window threshold trades update frequency
377    /// against peer stalls.
378    pub fn record_app_consumed(&self, n: u32) -> Option<u32> {
379        let pending = self.bytes_since_last_update.fetch_add(n, Ordering::AcqRel) + n;
380        let threshold = INITIAL_STREAM_WINDOW / 2;
381        if pending >= threshold {
382            // Grant exactly the bytes we accumulated since the last update and
383            // reset the accumulator. Use a CAS-free `fetch_sub` of the granted
384            // amount rather than `store(0)` so a concurrent consume isn't lost.
385            self.bytes_since_last_update
386                .fetch_sub(pending, Ordering::AcqRel);
387            // Keep the (now informational) local_recv_window in step for stats.
388            self.local_recv_window.fetch_add(pending, Ordering::AcqRel);
389            Some(pending)
390        } else {
391            None
392        }
393    }
394
395    /// Stage relative flow-control credit to be flushed by the send loop.
396    /// Called by the receive delivery task after it credits real app
397    /// consumption. Credits **accumulate additively** (saturating at
398    /// `u32::MAX`) rather than overwriting, so several grants landing between
399    /// two send-loop flushes are summed instead of lost — the send loop is the
400    /// single emitter (epoch-safe), and it may run arbitrarily after a grant.
401    pub fn stage_window_update_credit(&self, credit: u32) {
402        let mut cur = self.pending_window_update.load(Ordering::Acquire);
403        loop {
404            let next = cur.saturating_add(credit);
405            if next == cur {
406                return; // nothing to add (zero credit, or already saturated)
407            }
408            match self.pending_window_update.compare_exchange_weak(
409                cur,
410                next,
411                Ordering::AcqRel,
412                Ordering::Acquire,
413            ) {
414                Ok(_) => return,
415                Err(actual) => cur = actual,
416            }
417        }
418    }
419
420    /// Take all staged credit (swaps the slot back to `0`). The send loop calls
421    /// this each drain pass and emits one `WINDOW_UPDATE` carrying the summed
422    /// credit if `Some`.
423    pub fn take_pending_window_update(&self) -> Option<u32> {
424        match self.pending_window_update.swap(0, Ordering::AcqRel) {
425            0 => None,
426            w => Some(w),
427        }
428    }
429
430    /// Queue data for sending with reliability
431    ///
432    /// Returns the sequence number assigned to this chunk.
433    pub async fn send_reliable(&self, data: Bytes) -> SequenceNumber {
434        // Backpressure: wait until there is space in the buffer.
435        // PANIC-SAFETY: `Semaphore::acquire` only errors after `close()`. The
436        // `send_semaphore` is a private field of this struct, constructed in
437        // `Stream::new` and never closed anywhere in the crate — the variant
438        // is structurally unreachable.
439        #[allow(clippy::expect_used)]
440        let permit = self
441            .send_semaphore
442            .acquire()
443            .await
444            .expect("Semaphore closed");
445        permit.forget();
446
447        let seq = self.send_sequence.fetch_add(1, Ordering::SeqCst);
448
449        let pending = PendingData {
450            sequence: seq,
451            data,
452            sent_at: None,
453            retries: 0,
454        };
455
456        self.send_buffer.lock().await.push_back(pending);
457
458        seq
459    }
460
461    /// Reserve the next outbound sequence number from this stream's send space.
462    ///
463    /// Control frames that are emitted directly on a data stream (e.g.
464    /// `WINDOW_UPDATE`, a bare `FIN`) MUST draw their sequence from here rather
465    /// than a private counter: the AEAD nonce is `(epoch, stream_id, sequence,
466    /// path_id)` and the receiver's replay window is keyed on `(stream_id,
467    /// sequence)`, so a control frame sharing a `(stream_id, sequence)` with a
468    /// data packet in the same epoch would reuse a nonce **and** be dropped as a
469    /// replay. Sharing one monotonic space keeps every packet on the stream
470    /// unique. Control frames are unreliable, so the resulting gap in the data
471    /// sequence is harmless (no ACK is expected, nothing waits to reassemble it).
472    pub fn next_send_sequence(&self) -> SequenceNumber {
473        self.send_sequence.fetch_add(1, Ordering::SeqCst)
474    }
475
476    /// Queue data for unreliable sending
477    ///
478    /// Returns the sequence number assigned to this chunk.
479    pub async fn send_unreliable(&self, data: Bytes) -> SequenceNumber {
480        // Unreliable data does not consume buffer permits
481        let seq = self.send_sequence.fetch_add(1, Ordering::SeqCst);
482
483        self.unreliable_buffer.lock().await.push_back((seq, data));
484
485        seq
486    }
487
488    /// Get the next segment to (re)transmit, or `None` if nothing is due.
489    ///
490    /// `cwnd_budget` is how many bytes of *new* data the congestion window
491    /// currently permits. Retransmissions ignore it — loss recovery must always
492    /// proceed — but a first transmission is withheld (`None`) when it would
493    /// exceed the budget, so the next drain resumes once ACKs free the window.
494    /// Pass `u64::MAX` to disable the limit.
495    pub async fn poll_send(&self, cwnd_budget: u64) -> Option<OutboundSegment> {
496        // Unreliable data is fire-and-forget and not congestion-controlled.
497        if let Some((seq, data)) = self.unreliable_buffer.lock().await.pop_front() {
498            return Some(OutboundSegment {
499                seq,
500                data,
501                reliable: false,
502                retransmit: false,
503            });
504        }
505
506        let mut buffer = self.send_buffer.lock().await;
507        let now = tokio::time::Instant::now();
508        // Adaptive RFC 6298 timeout (was a fixed 500ms).
509        let timeout = self.current_rto();
510
511        // Pass 1: a timed-out segment (retransmission) — always allowed.
512        for pending in buffer.iter_mut() {
513            if let Some(sent_at) = pending.sent_at {
514                if now.duration_since(sent_at) >= timeout {
515                    pending.sent_at = Some(now);
516                    pending.retries += 1;
517                    // Back the RTO off exponentially for the next attempt.
518                    self.note_rto_timeout();
519                    return Some(OutboundSegment {
520                        seq: pending.sequence,
521                        data: pending.data.clone(),
522                        reliable: true,
523                        retransmit: true,
524                    });
525                }
526            }
527        }
528
529        // Pass 2: the next unsent segment, if it fits BOTH the congestion window
530        // AND the peer's advertised flow-control window. In-order: if the head
531        // unsent segment doesn't fit, stop (don't skip). Retransmissions (Pass 1)
532        // bypass both budgets — those bytes were already accounted on first send
533        // (Karn), and loss recovery must always proceed.
534        for pending in buffer.iter_mut() {
535            if pending.sent_at.is_none() {
536                let len = pending.data.len() as u64;
537                if len > cwnd_budget {
538                    return None; // congestion window full — wait for ACKs to free it
539                }
540                // Flow-control enforcement: consume the peer's advertised
541                // receive window. If it is exhausted, withhold the segment and
542                // wait for a `WINDOW_UPDATE` — this is what propagates a slow
543                // peer-side consumer back to us as real backpressure (the
544                // receive delivery task only credits the window on actual app
545                // consumption). `try_consume_send_window` is an atomic CAS; on
546                // success the window is debited and we WILL send (no later check
547                // can fail), so the debit never leaks.
548                if !self.try_consume_send_window(len as u32) {
549                    return None; // peer flow-control window closed — wait for WINDOW_UPDATE
550                }
551                pending.sent_at = Some(now);
552                return Some(OutboundSegment {
553                    seq: pending.sequence,
554                    data: pending.data.clone(),
555                    reliable: true,
556                    retransmit: false,
557                });
558            }
559        }
560
561        None
562    }
563
564    /// Mark a sequence number as acknowledged.
565    /// Returns the timestamp when the packet was originally sent and its size, if found.
566    pub async fn ack(&self, sequence: SequenceNumber) -> Option<(tokio::time::Instant, u64)> {
567        let mut buffer = self.send_buffer.lock().await;
568        let mut result = None;
569
570        // Find the packet and get its sent_at time
571        if let Some(pos) = buffer.iter().position(|p| p.sequence == sequence) {
572            let sent_at = buffer[pos].sent_at;
573            let retries = buffer[pos].retries;
574            let size = buffer[pos].data.len() as u64;
575            buffer.remove(pos);
576
577            // Released space, add permit back
578            self.send_semaphore.add_permits(1);
579
580            if let Some(sent_at) = sent_at {
581                result = Some((sent_at, size));
582                // Karn's algorithm: only sample RTT from segments that were not
583                // retransmitted — an ACK for a resent sequence is ambiguous.
584                if retries == 0 {
585                    let rtt = tokio::time::Instant::now().duration_since(sent_at);
586                    self.record_rtt_sample(rtt);
587                }
588            }
589        }
590
591        result
592    }
593
594    /// Reset a still-buffered reliable segment's send timestamp so the next
595    /// [`poll_send`](Self::poll_send) re-offers it immediately (as an unsent
596    /// segment) rather than waiting a full RTO for the retransmit pass. Used
597    /// when a send attempt failed *after* `poll_send` had already stamped
598    /// `sent_at` — the bytes never reached the wire, so the segment must not be
599    /// treated as in-flight. No-op if the segment was already acknowledged and
600    /// removed.
601    pub async fn mark_unsent(&self, sequence: SequenceNumber) {
602        let mut buffer = self.send_buffer.lock().await;
603        if let Some(pending) = buffer.iter_mut().find(|p| p.sequence == sequence) {
604            pending.sent_at = None;
605        }
606    }
607
608    /// Handle received data
609    ///
610    /// Data is buffered until it can be delivered in order.
611    pub async fn on_receive(&self, sequence: SequenceNumber, data: Bytes) {
612        let expected = self.recv_sequence.load(Ordering::SeqCst);
613
614        if sequence == expected {
615            // In-order delivery
616            self.recv_ready.lock().await.push_back(data);
617            self.recv_sequence.fetch_add(1, Ordering::SeqCst);
618
619            // Try to deliver buffered out-of-order data
620            self.deliver_buffered().await;
621
622            // Notify waiters
623            self.recv_notify.notify_waiters();
624        } else if sequence > expected {
625            // Out-of-order, buffer it
626            self.recv_buffer.lock().await.push_back((sequence, data));
627        }
628        // sequence < expected means duplicate, ignore it
629    }
630
631    /// Try to deliver buffered out-of-order data
632    async fn deliver_buffered(&self) {
633        let mut recv_buf = self.recv_buffer.lock().await;
634        let mut ready = self.recv_ready.lock().await;
635
636        loop {
637            let expected = self.recv_sequence.load(Ordering::SeqCst);
638
639            // Find and remove the expected sequence.
640            // PANIC-SAFETY: `pos` was just returned by `iter().position(...)`,
641            // so `recv_buf` has an element at that index — `remove` cannot
642            // return `None`. `recv_buf` is locked for the duration of this
643            // loop, so no other task can drain it.
644            if let Some(pos) = recv_buf.iter().position(|(seq, _)| *seq == expected) {
645                #[allow(clippy::unwrap_used, clippy::disallowed_methods)]
646                let (_, data) = recv_buf.remove(pos).unwrap();
647                ready.push_back(data);
648                self.recv_sequence.fetch_add(1, Ordering::SeqCst);
649            } else {
650                break;
651            }
652        }
653    }
654
655    /// Read data from the stream (async, waits if no data available)
656    pub async fn recv(&self) -> Option<Bytes> {
657        loop {
658            {
659                let mut ready = self.recv_ready.lock().await;
660                if let Some(data) = ready.pop_front() {
661                    return Some(data);
662                }
663
664                // Check if stream is closed
665                if self.remote_finished.load(Ordering::SeqCst) {
666                    return None;
667                }
668            }
669
670            // Wait for new data
671            self.recv_notify.notified().await;
672        }
673    }
674
675    /// Try to read data without waiting
676    pub async fn try_recv(&self) -> Option<Bytes> {
677        self.recv_ready.lock().await.pop_front()
678    }
679
680    /// Mark local side as finished (no more data to send)
681    pub async fn finish(&self) {
682        self.local_finished.store(true, Ordering::SeqCst);
683        self.update_state().await;
684    }
685
686    /// Mark remote side as finished
687    pub async fn on_remote_finish(&self) {
688        self.remote_finished.store(true, Ordering::SeqCst);
689        self.recv_notify.notify_waiters();
690        self.update_state().await;
691    }
692
693    /// Update stream state based on finish flags
694    async fn update_state(&self) {
695        let local = self.local_finished.load(Ordering::SeqCst);
696        let remote = self.remote_finished.load(Ordering::SeqCst);
697
698        let new_state = match (local, remote) {
699            (true, true) => StreamState::Closed,
700            (true, false) => StreamState::HalfClosedLocal,
701            (false, true) => StreamState::HalfClosedRemote,
702            (false, false) => StreamState::Open,
703        };
704
705        *self.state.lock().await = new_state;
706    }
707
708    /// Get number of pending send chunks
709    pub async fn pending_send_count(&self) -> usize {
710        self.send_buffer.lock().await.len()
711    }
712
713    /// Get number of pending receive chunks
714    pub async fn pending_recv_count(&self) -> usize {
715        self.recv_ready.lock().await.len()
716    }
717
718    /// Check if stream is closed
719    pub fn is_closed(&self) -> bool {
720        self.local_finished.load(Ordering::SeqCst) && self.remote_finished.load(Ordering::SeqCst)
721    }
722}
723
724impl std::fmt::Debug for Stream {
725    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726        f.debug_struct("Stream")
727            .field("id", &self.id)
728            .field("send_seq", &self.send_sequence.load(Ordering::Relaxed))
729            .field("recv_seq", &self.recv_sequence.load(Ordering::Relaxed))
730            .field("priority", &self.priority.load(Ordering::Relaxed))
731            .finish()
732    }
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738
739    #[tokio::test]
740    async fn test_stream_send_recv() {
741        let stream = Stream::new(1);
742
743        // Send data
744        stream.send_reliable(Bytes::from("hello")).await;
745        stream.send_reliable(Bytes::from("world")).await;
746
747        // Check pending
748        assert_eq!(stream.pending_send_count().await, 2);
749
750        // Poll send twice, the second should be None because it's already sent and hasn't timed out
751        let seg = stream.poll_send(u64::MAX).await.unwrap();
752        assert_eq!(seg.seq, 0);
753        assert_eq!(seg.data, Bytes::from("hello"));
754        assert!(seg.reliable);
755        assert!(!seg.retransmit);
756
757        let seg2 = stream.poll_send(u64::MAX).await.unwrap();
758        assert_eq!(seg2.seq, 1);
759        assert_eq!(seg2.data, Bytes::from("world"));
760        assert!(seg2.reliable);
761        assert!(!seg2.retransmit);
762
763        assert!(stream.poll_send(u64::MAX).await.is_none());
764    }
765
766    #[tokio::test]
767    async fn test_stream_retransmission() {
768        // We use tokio::time::pause to mock time and test timeout
769        tokio::time::pause();
770        let stream = Stream::new(1);
771
772        stream.send_reliable(Bytes::from("hello")).await;
773
774        // First send — not a retransmission.
775        let seg = stream.poll_send(u64::MAX).await.unwrap();
776        assert_eq!(seg.seq, 0);
777        assert!(seg.reliable);
778        assert!(!seg.retransmit);
779
780        // Immediate poll should be None
781        assert!(stream.poll_send(u64::MAX).await.is_none());
782
783        // Advance 400ms — still under the initial 1s RTO (RFC 6298 (2.1):
784        // no RTT samples yet, so the timer sits at the 1-second default).
785        tokio::time::advance(std::time::Duration::from_millis(400)).await;
786        assert!(stream.poll_send(u64::MAX).await.is_none());
787
788        // Advance past the 1s initial RTO (total ~1.1s).
789        tokio::time::advance(std::time::Duration::from_millis(700)).await;
790
791        // Now it should retransmit — flagged as a retransmission.
792        let seg2 = stream.poll_send(u64::MAX).await.unwrap();
793        assert_eq!(seg2.seq, 0);
794        assert_eq!(seg2.data, Bytes::from("hello"));
795        assert!(seg2.reliable);
796        assert!(seg2.retransmit);
797
798        // Ack it
799        let acked = stream.ack(0).await;
800        assert!(acked.is_some());
801
802        // Poll again - queue is empty
803        assert!(stream.poll_send(u64::MAX).await.is_none());
804    }
805
806    #[tokio::test]
807    async fn mark_unsent_re_offers_without_waiting_rto() {
808        // Time is paused, so nothing ever crosses the RTO — any re-offer here is
809        // due to `mark_unsent`, not the retransmit timer.
810        tokio::time::pause();
811        let stream = Stream::new(1);
812        stream.send_reliable(Bytes::from("hello")).await;
813
814        // First poll stamps `sent_at`; an immediate re-poll yields nothing
815        // (treated as in-flight, not yet timed out).
816        let seg = stream.poll_send(u64::MAX).await.unwrap();
817        assert_eq!(seg.seq, 0);
818        assert!(!seg.retransmit);
819        assert!(stream.poll_send(u64::MAX).await.is_none());
820
821        // Simulate a send that failed *after* `poll_send` stamped the segment:
822        // clear `sent_at` so it is no longer considered in-flight.
823        stream.mark_unsent(0).await;
824
825        // It is re-offered immediately — without advancing past the RTO — and as
826        // a fresh send (Pass 2), not a retransmission.
827        let seg2 = stream.poll_send(u64::MAX).await.unwrap();
828        assert_eq!(seg2.seq, 0);
829        assert_eq!(seg2.data, Bytes::from("hello"));
830        assert!(seg2.reliable);
831        assert!(!seg2.retransmit);
832
833        // `mark_unsent` on an already-acked (removed) segment is a no-op.
834        assert!(stream.ack(0).await.is_some());
835        stream.mark_unsent(0).await; // no panic, no effect
836        assert!(stream.poll_send(u64::MAX).await.is_none());
837    }
838
839    #[tokio::test]
840    async fn poll_send_respects_the_cwnd_budget() {
841        let stream = Stream::new(1);
842        stream.send_reliable(Bytes::from("0123456789")).await; // 10 bytes
843        stream.send_reliable(Bytes::from("abcde")).await; // 5 bytes
844
845        // Budget of 10 admits the 10-byte head segment.
846        let seg = stream.poll_send(10).await.unwrap();
847        assert_eq!(seg.data.len(), 10);
848        assert!(!seg.retransmit);
849
850        // Budget of 4 is too small for the next (5-byte) segment → withheld.
851        assert!(stream.poll_send(4).await.is_none());
852
853        // A budget of 5 now admits it.
854        let seg2 = stream.poll_send(5).await.unwrap();
855        assert_eq!(seg2.data, Bytes::from("abcde"));
856    }
857
858    #[tokio::test]
859    async fn test_stream_in_order_receive() {
860        let stream = Stream::new(1);
861
862        // Receive in order
863        stream.on_receive(0, Bytes::from("first")).await;
864        stream.on_receive(1, Bytes::from("second")).await;
865
866        assert_eq!(stream.try_recv().await, Some(Bytes::from("first")));
867        assert_eq!(stream.try_recv().await, Some(Bytes::from("second")));
868        assert_eq!(stream.try_recv().await, None);
869    }
870
871    #[tokio::test]
872    async fn test_stream_out_of_order_receive() {
873        let stream = Stream::new(1);
874
875        // Receive out of order
876        stream.on_receive(1, Bytes::from("second")).await;
877        stream.on_receive(0, Bytes::from("first")).await;
878
879        // Should be reordered
880        assert_eq!(stream.try_recv().await, Some(Bytes::from("first")));
881        assert_eq!(stream.try_recv().await, Some(Bytes::from("second")));
882    }
883
884    #[tokio::test]
885    async fn test_stream_state() {
886        let stream = Stream::new(1);
887
888        assert_eq!(stream.state().await, StreamState::Open);
889
890        stream.finish().await;
891        assert_eq!(stream.state().await, StreamState::HalfClosedLocal);
892
893        stream.on_remote_finish().await;
894        assert_eq!(stream.state().await, StreamState::Closed);
895        assert!(stream.is_closed());
896    }
897
898    #[tokio::test]
899    async fn test_stream_backpressure() {
900        let stream = Stream::new(1);
901
902        // Fill the buffer
903        for _ in 0..MAX_PENDING_PACKETS {
904            stream.send_reliable(Bytes::from("data")).await;
905        }
906
907        assert_eq!(stream.pending_send_count().await, MAX_PENDING_PACKETS);
908
909        // Try to send one more with timeout
910        let send_future = stream.send_reliable(Bytes::from("blocked"));
911        let result = tokio::time::timeout(std::time::Duration::from_millis(100), send_future).await;
912        assert!(result.is_err(), "Send should have blocked");
913
914        // Ack one
915        stream.ack(0).await;
916
917        // Now it should succeed
918        let send_future = stream.send_reliable(Bytes::from("resumed"));
919        let result = tokio::time::timeout(std::time::Duration::from_millis(100), send_future).await;
920        assert!(result.is_ok(), "Send should have succeeded after ack");
921        assert_eq!(stream.pending_send_count().await, MAX_PENDING_PACKETS);
922    }
923
924    // ── Flow control (Phase 4.3) ──
925
926    #[test]
927    fn peer_send_window_starts_at_initial() {
928        let s = Stream::new(1);
929        assert_eq!(s.peer_send_window(), INITIAL_STREAM_WINDOW);
930    }
931
932    #[test]
933    fn try_consume_send_window_decrements_atomically() {
934        let s = Stream::new(1);
935        assert!(s.try_consume_send_window(1000));
936        assert_eq!(s.peer_send_window(), INITIAL_STREAM_WINDOW - 1000);
937        assert!(s.try_consume_send_window(INITIAL_STREAM_WINDOW - 1000));
938        assert_eq!(s.peer_send_window(), 0);
939        // Further consumption fails until refilled.
940        assert!(!s.try_consume_send_window(1));
941    }
942
943    #[test]
944    fn apply_peer_window_update_adds_relative_credit() {
945        let s = Stream::new(1);
946        // Drain to 100 bytes.
947        assert!(s.try_consume_send_window(INITIAL_STREAM_WINDOW - 100));
948        assert_eq!(s.peer_send_window(), 100);
949
950        // A WINDOW_UPDATE is a relative credit: it ADDS to the window.
951        s.apply_peer_window_update(1000);
952        assert_eq!(s.peer_send_window(), 1100);
953        s.apply_peer_window_update(50);
954        assert_eq!(s.peer_send_window(), 1150);
955
956        // Saturates at the hard cap (misbehaving-peer guard).
957        s.apply_peer_window_update(u32::MAX);
958        assert_eq!(s.peer_send_window(), MAX_SEND_WINDOW);
959    }
960
961    #[test]
962    fn record_app_consumed_grants_relative_credit_after_threshold() {
963        let s = Stream::new(1);
964        let threshold = INITIAL_STREAM_WINDOW / 2;
965
966        // Small drains return None.
967        assert!(s.record_app_consumed(100).is_none());
968        assert!(s.record_app_consumed(200).is_none());
969
970        // Drain across the half-window threshold → emit a credit equal to the
971        // accumulated consumption (300 + threshold), NOT an absolute window.
972        let credit = s.record_app_consumed(threshold);
973        assert_eq!(
974            credit,
975            Some(300 + threshold),
976            "WINDOW_UPDATE carries the relative credit (bytes consumed since last update)"
977        );
978
979        // Counter resets after emitting — small further drains do not re-emit.
980        assert!(s.record_app_consumed(10).is_none());
981    }
982
983    #[test]
984    fn relative_credit_round_trip_bounds_outstanding_to_one_window() {
985        // Model: receiver grants credit == consumed; sender's window =
986        // initial + Σcredit − Σsent, so outstanding (sent − consumed) ≤ initial.
987        let sender = Stream::new(1);
988        let receiver = Stream::new(1);
989        let threshold = INITIAL_STREAM_WINDOW / 2;
990
991        // Sender fills the initial window exactly.
992        assert!(sender.try_consume_send_window(INITIAL_STREAM_WINDOW));
993        assert_eq!(sender.peer_send_window(), 0, "initial window exhausted");
994
995        // Receiver consumes one threshold's worth → grants that much credit.
996        let credit = receiver
997            .record_app_consumed(threshold)
998            .expect("threshold crossed");
999        sender.apply_peer_window_update(credit);
1000        assert_eq!(
1001            sender.peer_send_window(),
1002            threshold,
1003            "sender may now send exactly the bytes the receiver consumed"
1004        );
1005    }
1006
1007    #[test]
1008    fn staged_window_update_credit_accumulates_until_taken() {
1009        let s = Stream::new(1);
1010        assert_eq!(s.take_pending_window_update(), None);
1011
1012        // Two grants staged before a single flush must SUM, not overwrite: the
1013        // send loop (sole emitter) may run arbitrarily late after a credit is
1014        // staged, so back-to-back grants would otherwise lose all but the last
1015        // — a permanent credit leak that shrinks the peer's window over time.
1016        s.stage_window_update_credit(1000);
1017        s.stage_window_update_credit(2500);
1018        assert_eq!(s.take_pending_window_update(), Some(3500));
1019
1020        // The slot resets to empty once taken.
1021        assert_eq!(s.take_pending_window_update(), None);
1022
1023        // Accumulation saturates instead of wrapping past u32::MAX.
1024        s.stage_window_update_credit(u32::MAX);
1025        s.stage_window_update_credit(10);
1026        assert_eq!(s.take_pending_window_update(), Some(u32::MAX));
1027
1028        // Zero credit is a no-op (no spurious WINDOW_UPDATE).
1029        s.stage_window_update_credit(0);
1030        assert_eq!(s.take_pending_window_update(), None);
1031    }
1032}