Skip to main content

net/adapter/net/
reliability.rs

1//! Reliability modes for Net streams.
2//!
3//! Net supports two reliability modes:
4//! - Fire-and-forget: No acknowledgments, maximum throughput
5//! - Reliable: Per-stream reliability with selective NACKs
6
7use bytes::Bytes;
8use std::collections::VecDeque;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use super::protocol::{NackPayload, PacketFlags};
14
15/// Pre-encryption inputs needed to rebuild a packet for
16/// retransmission.
17///
18/// The reliable retransmit path used to stash the fully-encrypted
19/// packet bytes, but every encrypted packet carries the cipher's
20/// outer counter stamped at build time. Replaying those exact bytes
21/// produces the same wire counter on the wire, which the receiver's
22/// `update_rx_counter` rejects as a replay — making NACK-driven
23/// recovery a no-op the first time it fired. Stashing the rebuild
24/// inputs instead lets the retransmit driver call
25/// `PacketBuilder::build` with a fresh counter on each retransmit,
26/// so the receiver accepts the recovered packet.
27#[derive(Debug, Clone)]
28pub struct RetransmitDescriptor {
29    /// Per-stream sequence number stamped on the packet header.
30    pub seq: u64,
31    /// Stream id for the rebuild call.
32    pub stream_id: u64,
33    /// Pre-encryption event payloads (the same `&[Bytes]` originally
34    /// passed to `PacketBuilder::build`).
35    pub events: Vec<Bytes>,
36    /// Packet flags as stamped on the original send.
37    pub flags: PacketFlags,
38}
39
40/// Trait for reliability mode implementations.
41///
42/// Per crypto-session perf #133, the descriptor is exchanged as
43/// `Arc<RetransmitDescriptor>` across the trait boundary. The
44/// `RetransmitDescriptor` itself carries an inner
45/// `Vec<Bytes>` of pre-encryption event payloads — at `max_pending =
46/// 32` and ~10 events per packet that's ~320 `Bytes` refcounts
47/// dangling off the retransmit window at any given time. Pre-fix
48/// `on_send` moved the descriptor in by value (one Vec spine + one
49/// refcount bump per inner `Bytes`), and `on_nack` /
50/// `get_timed_out` deep-cloned the descriptor per retransmit (one
51/// Vec alloc + N `Bytes` refcount bumps per emission). Wrapping in
52/// `Arc` makes both paths one atomic refcount bump regardless of
53/// the inner Vec's length.
54pub trait ReliabilityMode: Send + Sync {
55    /// Called when a packet is sent. The descriptor carries pre-
56    /// encryption inputs so the retransmit path can rebuild a
57    /// fresh-counter packet rather than replaying stale ciphertext.
58    fn on_send(&mut self, descriptor: Arc<RetransmitDescriptor>);
59
60    /// Called when a packet is received. Returns true if accepted.
61    fn on_receive(&mut self, seq: u64) -> bool;
62
63    /// Check if this mode requires acknowledgments
64    fn needs_ack(&self) -> bool;
65
66    /// Build a NACK payload if there are missing sequences
67    fn build_nack(&self) -> Option<NackPayload>;
68
69    /// Process a received NACK and return descriptors for the
70    /// caller to rebuild + dispatch. The returned `Arc` clones
71    /// share the inner `RetransmitDescriptor` allocation; the
72    /// caller bumps the refcount instead of deep-cloning the
73    /// `Vec<Bytes>` of events.
74    fn on_nack(&mut self, nack: &NackPayload) -> Vec<Arc<RetransmitDescriptor>>;
75
76    /// Get descriptors that need retransmission due to timeout. See
77    /// [`Self::on_nack`] for the `Arc`-sharing contract.
78    fn get_timed_out(&mut self) -> Vec<Arc<RetransmitDescriptor>>;
79
80    /// Check if there are unacknowledged packets
81    fn has_pending(&self) -> bool;
82
83    /// Get the name of this reliability mode
84    fn name(&self) -> &'static str;
85}
86
87/// Fire-and-forget reliability mode.
88///
89/// No acknowledgments, no retransmission, maximum throughput.
90/// Suitable for:
91/// - LLM token streams
92/// - Embeddings
93/// - Intermediate activations
94/// - Metrics/telemetry
95#[derive(Debug, Default)]
96pub struct FireAndForget {
97    /// Last sequence received (for ordering check)
98    last_seq: AtomicU64,
99}
100
101impl FireAndForget {
102    /// Create a new fire-and-forget mode
103    pub fn new() -> Self {
104        Self::default()
105    }
106}
107
108impl ReliabilityMode for FireAndForget {
109    #[inline]
110    fn on_send(&mut self, _descriptor: Arc<RetransmitDescriptor>) {
111        // Nothing to track
112    }
113
114    #[inline]
115    fn on_receive(&mut self, seq: u64) -> bool {
116        // Update last sequence (informational only)
117        self.last_seq.fetch_max(seq, Ordering::Relaxed);
118        true // Always accept
119    }
120
121    #[inline]
122    fn needs_ack(&self) -> bool {
123        false
124    }
125
126    #[inline]
127    fn build_nack(&self) -> Option<NackPayload> {
128        None
129    }
130
131    #[inline]
132    fn on_nack(&mut self, _nack: &NackPayload) -> Vec<Arc<RetransmitDescriptor>> {
133        Vec::new()
134    }
135
136    #[inline]
137    fn get_timed_out(&mut self) -> Vec<Arc<RetransmitDescriptor>> {
138        Vec::new()
139    }
140
141    #[inline]
142    fn has_pending(&self) -> bool {
143        false
144    }
145
146    #[inline]
147    fn name(&self) -> &'static str {
148        "fire-and-forget"
149    }
150}
151
152/// Unacknowledged packet waiting for ACK/NACK
153#[derive(Debug, Clone)]
154struct UnackedPacket {
155    /// Pre-encryption rebuild inputs. Stashing the descriptor (not
156    /// the encrypted bytes) is what lets the retransmit path
157    /// produce a fresh-counter packet on each NACK / timeout.
158    ///
159    /// Per crypto-session perf #133, the descriptor is held behind
160    /// an `Arc` so that the retransmit emissions (`on_nack` /
161    /// `get_timed_out`) clone the refcount instead of deep-cloning
162    /// the inner `Vec<Bytes>` events list.
163    descriptor: Arc<RetransmitDescriptor>,
164    /// Time when packet was sent
165    sent_at: Instant,
166    /// Number of retransmission attempts
167    retries: u8,
168}
169
170impl UnackedPacket {
171    #[inline]
172    fn seq(&self) -> u64 {
173        self.descriptor.seq
174    }
175}
176
177/// Reliable stream mode with selective NACKs.
178///
179/// Features:
180/// - Bounded retransmit window (32 packets)
181/// - Selective NACKs (receiver-driven)
182/// - Per-stream state
183/// - Configurable RTO
184///
185/// Suitable for:
186/// - Tool call results
187/// - Guardrail decisions
188/// - Session lifecycle events
189/// - Error propagation
190pub struct ReliableStream {
191    /// The next sequence number we haven't yet received. All sequences
192    /// `< next_expected` have been received contiguously. Starts at 0,
193    /// expecting seq 0 as the first packet of the stream.
194    ///
195    /// Use `next_expected()` / `ack_seq()` accessors externally.
196    next_expected: u64,
197    /// SACK bitmap for out-of-order packets. Bit `i` is set iff sequence
198    /// `next_expected + 1 + i` has been received. This represents up to
199    /// 64 future sequences after the contiguous range. As `next_expected`
200    /// advances, the bitmap is right-shifted so bit 0 always represents
201    /// `next_expected + 1`.
202    sack_bitmap: u64,
203    /// Pending unacknowledged packets (bounded)
204    pending: VecDeque<UnackedPacket>,
205    /// Retransmit timeout
206    rto: Duration,
207    /// Maximum pending packets
208    max_pending: usize,
209    /// Maximum retries per packet
210    max_retries: u8,
211    /// Number of unacknowledged packets evicted from `pending` because
212    /// the window was full when `on_send` arrived. The evicted packet
213    /// went on the wire (the caller already issued the syscall) but
214    /// is no longer tracked for retransmit — a NACK for that seq can
215    /// no longer recover it. This counter surfaces the silent loss
216    /// to the metrics layer so operators can size `max_pending` for
217    /// their actual sustained reliable-stream throughput. Pre-fix
218    /// the eviction was unobservable.
219    untracked_evictions: u64,
220}
221
222impl ReliableStream {
223    /// Default retransmit timeout
224    pub const DEFAULT_RTO: Duration = Duration::from_millis(50);
225
226    /// Default max pending packets
227    pub const DEFAULT_MAX_PENDING: usize = 32;
228
229    /// Default max retries
230    pub const DEFAULT_MAX_RETRIES: u8 = 3;
231
232    /// Create a new reliable stream with default settings
233    pub fn new() -> Self {
234        Self {
235            next_expected: 0,
236            sack_bitmap: 0,
237            pending: VecDeque::with_capacity(Self::DEFAULT_MAX_PENDING),
238            rto: Self::DEFAULT_RTO,
239            max_pending: Self::DEFAULT_MAX_PENDING,
240            max_retries: Self::DEFAULT_MAX_RETRIES,
241            untracked_evictions: 0,
242        }
243    }
244
245    /// Create with custom settings
246    pub fn with_settings(rto: Duration, max_pending: usize, max_retries: u8) -> Self {
247        Self {
248            next_expected: 0,
249            sack_bitmap: 0,
250            pending: VecDeque::with_capacity(max_pending),
251            rto,
252            max_pending,
253            max_retries,
254            untracked_evictions: 0,
255        }
256    }
257
258    /// Number of unacknowledged packets that the stream evicted from
259    /// its retransmit window because the window was full at `on_send`
260    /// time. Each eviction means the caller's syscall succeeded
261    /// (bytes left this node) but the packet is no longer tracked
262    /// for retransmit — a NACK can no longer recover it. A non-zero
263    /// value indicates `max_pending` is undersized for the stream's
264    /// sustained throughput. Operators should size up or apply
265    /// upstream backpressure rather than accepting silent loss.
266    #[inline]
267    pub fn untracked_evictions(&self) -> u64 {
268        self.untracked_evictions
269    }
270
271    /// Set the retransmit timeout
272    pub fn set_rto(&mut self, rto: Duration) {
273        self.rto = rto;
274    }
275
276    /// Lowest sequence number we have not yet received. All sequences
277    /// strictly below this value are contiguously received.
278    #[inline]
279    pub fn next_expected(&self) -> u64 {
280        self.next_expected
281    }
282
283    /// Highest contiguously-received sequence number, or `None` if no
284    /// packets have been received yet.
285    #[inline]
286    pub fn last_received_contiguous(&self) -> Option<u64> {
287        if self.next_expected == 0 {
288            None
289        } else {
290            Some(self.next_expected - 1)
291        }
292    }
293
294    /// Get the current ack sequence (highest contiguously-received seq).
295    /// Returns 0 when nothing has been received yet — callers that need
296    /// to distinguish "received seq 0" from "received nothing" should use
297    /// [`Self::last_received_contiguous`] instead.
298    pub fn ack_seq(&self) -> u64 {
299        self.next_expected.saturating_sub(1)
300    }
301
302    /// Process an acknowledgment. `acked` is the highest sequence the
303    /// peer has contiguously received.
304    pub fn on_ack(&mut self, acked: u64) {
305        // Remove all pending packets up to and including acked.
306        while let Some(front) = self.pending.front() {
307            if front.seq() <= acked {
308                self.pending.pop_front();
309            } else {
310                break;
311            }
312        }
313    }
314
315    /// Check if there are gaps in received sequences.
316    ///
317    /// A gap exists whenever at least one future sequence has been
318    /// received out of order — meaning `next_expected` itself is still
319    /// pending (the implicit gap) and any interior missing seqs show
320    /// up as zero bits in the SACK bitmap below the highest received.
321    fn has_gaps(&self) -> bool {
322        self.sack_bitmap != 0
323    }
324
325    /// Get bitmap of missing sequences after `next_expected`.
326    ///
327    /// Bit `i` set means sequence `next_expected + 1 + i` is missing.
328    /// Sequence `next_expected` itself is always implicitly missing
329    /// whenever `has_gaps()` returns true (that's what makes the NACK
330    /// meaningful) — `missing_sequences()` on the resulting NACK emits
331    /// `next_expected` first, then the bits of this bitmap.
332    fn missing_bitmap(&self) -> u64 {
333        // Invert sack_bitmap to get missing sequences; only consider
334        // bits up to the highest received (otherwise we'd claim
335        // sequences we've never heard of are "missing").
336        if self.sack_bitmap == 0 {
337            return 0;
338        }
339        let highest_bit = 63 - self.sack_bitmap.leading_zeros();
340        let mask = if highest_bit >= 63 {
341            u64::MAX
342        } else {
343            (1u64 << (highest_bit + 1)) - 1
344        };
345        (!self.sack_bitmap) & mask
346    }
347}
348
349impl Default for ReliableStream {
350    fn default() -> Self {
351        Self::new()
352    }
353}
354
355impl ReliabilityMode for ReliableStream {
356    fn on_send(&mut self, descriptor: Arc<RetransmitDescriptor>) {
357        // Evict oldest unacked packet if window is full so that the
358        // newest packet is always tracked for retransmission.  Without
359        // this, packets sent when the window is full are silently lost
360        // from the retransmit buffer even though they were sent on the
361        // wire — a gap the receiver can never recover via NACK.
362        //
363        // Bump `untracked_evictions` on every eviction so the silent
364        // loss surfaces via the `untracked_evictions()` accessor (and
365        // any metrics layer hooked into it). Pre-fix the eviction was
366        // unobservable: a `max_pending`-undersized stream looked
367        // healthy from the sender side until NACKs started arriving
368        // for sequences whose retransmit had already been dropped.
369        if self.pending.len() >= self.max_pending {
370            self.pending.pop_front();
371            self.untracked_evictions = self.untracked_evictions.saturating_add(1);
372            tracing::warn!(
373                untracked_evictions = self.untracked_evictions,
374                max_pending = self.max_pending,
375                "ReliableStream: retransmit window full; evicted oldest \
376                 unacked packet — NACK for that seq can no longer \
377                 recover it. Increase max_pending or apply upstream \
378                 backpressure.",
379            );
380        }
381        self.pending.push_back(UnackedPacket {
382            descriptor,
383            sent_at: Instant::now(),
384            retries: 0,
385        });
386    }
387
388    fn on_receive(&mut self, seq: u64) -> bool {
389        // Anything below next_expected has already been received
390        // contiguously; reject as a duplicate.
391        if seq < self.next_expected {
392            return false;
393        }
394        if seq == self.next_expected {
395            // Next expected sequence — advance the contiguous range,
396            // then absorb any already-received future seqs that have
397            // just become contiguous.
398            //
399            // Bitmap invariant (before this call): bit i is set iff
400            // seq (old next_expected + 1 + i) has been received. After
401            // incrementing next_expected by 1 (but BEFORE shifting),
402            // bit 0 of the bitmap now refers to seq new_next_expected
403            // itself — which, if set, means that seq was also received
404            // out-of-order earlier and we can advance past it too.
405            self.next_expected += 1;
406            while self.sack_bitmap & 1 != 0 {
407                self.next_expected += 1;
408                self.sack_bitmap >>= 1;
409            }
410            // Restore the bitmap invariant: after the loop,
411            // bit 0 of the bitmap still refers to seq `next_expected`
412            // (not yet received; otherwise the loop would have
413            // consumed it). The invariant wants bit 0 to refer to
414            // seq `next_expected + 1`, so shift once more.
415            self.sack_bitmap >>= 1;
416            return true;
417        }
418        // seq > next_expected: future sequence.
419        //
420        // The bitmap can represent up to 64 future seqs past the
421        // contiguous range. `offset` here is (seq - next_expected),
422        // which is ≥ 1. Bit 0 of the bitmap represents
423        // `next_expected + 1`, so the bit index is `offset - 1`.
424        //
425        // If the first packet of a stream arrives with seq > 0, this
426        // branch records it without advancing next_expected, so
427        // sequences `[0, seq)` remain flagged as missing in the
428        // SACK bitmap — the receiver will request them via a NACK
429        // instead of silently skipping them (which is what the old
430        // code's `seq == ack_seq + 1` branch did, treating seq 0 as
431        // already-acknowledged when the stream actually started with
432        // a lost packet).
433        let offset = seq - self.next_expected;
434        if offset > 64 {
435            return false;
436        }
437        let bit = offset - 1;
438        let mask = 1u64 << bit;
439        if self.sack_bitmap & mask != 0 {
440            // Duplicate of a previously-recorded future seq.
441            return false;
442        }
443        self.sack_bitmap |= mask;
444        true
445    }
446
447    #[inline]
448    fn needs_ack(&self) -> bool {
449        true
450    }
451
452    fn build_nack(&self) -> Option<NackPayload> {
453        if self.has_gaps() {
454            Some(NackPayload {
455                next_expected: self.next_expected,
456                missing_bitmap: self.missing_bitmap(),
457            })
458        } else {
459            None
460        }
461    }
462
463    fn on_nack(&mut self, nack: &NackPayload) -> Vec<Arc<RetransmitDescriptor>> {
464        let mut retransmits = Vec::new();
465
466        // Find packets to retransmit based on NACK. Return the
467        // pre-encryption descriptors so the caller can rebuild
468        // each packet with a fresh cipher counter — replaying the
469        // stashed encrypted bytes would trip the receiver's replay
470        // window. Per perf #133 the descriptor is `Arc`-shared, so
471        // each emission is one atomic refcount bump rather than a
472        // deep `Vec<Bytes>` clone.
473        for missing_seq in nack.missing_sequences() {
474            for unacked in &mut self.pending {
475                if unacked.seq() == missing_seq && unacked.retries < self.max_retries {
476                    retransmits.push(Arc::clone(&unacked.descriptor));
477                    unacked.retries += 1;
478                    unacked.sent_at = Instant::now();
479                    break;
480                }
481            }
482        }
483
484        retransmits
485    }
486
487    fn get_timed_out(&mut self) -> Vec<Arc<RetransmitDescriptor>> {
488        let now = Instant::now();
489        let mut retransmits = Vec::new();
490
491        // Per perf #133 — `Arc::clone` bumps a refcount instead of
492        // deep-cloning the `Vec<Bytes>` events list per timed-out
493        // packet.
494        for unacked in &mut self.pending {
495            if now.duration_since(unacked.sent_at) > self.rto && unacked.retries < self.max_retries
496            {
497                retransmits.push(Arc::clone(&unacked.descriptor));
498                unacked.retries += 1;
499                unacked.sent_at = now;
500            }
501        }
502
503        retransmits
504    }
505
506    #[inline]
507    fn has_pending(&self) -> bool {
508        !self.pending.is_empty()
509    }
510
511    #[inline]
512    fn name(&self) -> &'static str {
513        "reliable"
514    }
515}
516
517impl std::fmt::Debug for ReliableStream {
518    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
519        f.debug_struct("ReliableStream")
520            .field("next_expected", &self.next_expected)
521            .field("sack_bitmap", &format!("{:064b}", self.sack_bitmap))
522            .field("pending_count", &self.pending.len())
523            .field("rto_ms", &self.rto.as_millis())
524            .finish()
525    }
526}
527
528/// Create a boxed reliability mode from configuration
529pub fn create_reliability_mode(reliable: bool) -> Box<dyn ReliabilityMode> {
530    if reliable {
531        Box::new(ReliableStream::new())
532    } else {
533        Box::new(FireAndForget::new())
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540
541    /// Test helper: build a `RetransmitDescriptor` from the legacy
542    /// `(seq, packet_bytes)` shape these tests were written against.
543    /// Wraps the bytes as a single-event payload so the in-memory
544    /// shape has something to round-trip through. Returns an
545    /// `Arc<...>` per perf #133 — `on_send` consumes the shared
546    /// allocation.
547    fn descriptor(seq: u64, packet: Bytes) -> Arc<RetransmitDescriptor> {
548        Arc::new(RetransmitDescriptor {
549            seq,
550            stream_id: 0,
551            events: vec![packet],
552            flags: PacketFlags::RELIABLE,
553        })
554    }
555
556    #[test]
557    fn test_fire_and_forget() {
558        let mut mode = FireAndForget::new();
559
560        // Should always accept
561        assert!(mode.on_receive(1));
562        assert!(mode.on_receive(3)); // Gap is fine
563        assert!(mode.on_receive(2)); // Out of order is fine
564
565        // No acks needed
566        assert!(!mode.needs_ack());
567        assert!(mode.build_nack().is_none());
568        assert!(!mode.has_pending());
569
570        // No retransmits
571        mode.on_send(descriptor(1, Bytes::from_static(b"test")));
572        assert!(mode.get_timed_out().is_empty());
573    }
574
575    #[test]
576    fn test_reliable_stream_in_order() {
577        let mut mode = ReliableStream::new();
578
579        // Receive in order starting from seq 0 (the sender always
580        // begins at 0).
581        assert!(mode.on_receive(0));
582        assert_eq!(mode.ack_seq(), 0);
583        assert_eq!(mode.last_received_contiguous(), Some(0));
584
585        assert!(mode.on_receive(1));
586        assert_eq!(mode.ack_seq(), 1);
587
588        assert!(mode.on_receive(2));
589        assert_eq!(mode.ack_seq(), 2);
590
591        assert!(mode.on_receive(3));
592        assert_eq!(mode.ack_seq(), 3);
593
594        // No NACK needed
595        assert!(mode.build_nack().is_none());
596    }
597
598    #[test]
599    fn test_reliable_stream_gap() {
600        let mut mode = ReliableStream::new();
601
602        // Receive with gap (after an initial in-order seq 0 so the
603        // gap is a real mid-stream hole, not a missing prefix).
604        assert!(mode.on_receive(0));
605        assert!(mode.on_receive(1));
606        assert!(mode.on_receive(3)); // Gap at 2
607        assert!(mode.on_receive(5)); // Gap at 4
608
609        assert_eq!(mode.ack_seq(), 1);
610
611        // Should have NACK
612        let nack = mode.build_nack().unwrap();
613        assert_eq!(nack.next_expected, 2);
614
615        // Missing: 2 (the next expected — implicit), 4 (bitmap bit 1).
616        let missing: Vec<_> = nack.missing_sequences().collect();
617        assert!(missing.contains(&2));
618        assert!(missing.contains(&4));
619    }
620
621    #[test]
622    fn test_reliable_stream_fill_gap() {
623        let mut mode = ReliableStream::new();
624
625        // Receive out of order (with seq 0 so the gap is interior, not
626        // a missing prefix).
627        assert!(mode.on_receive(0));
628        assert!(mode.on_receive(1));
629        assert!(mode.on_receive(3));
630        assert!(mode.on_receive(4));
631        assert_eq!(mode.ack_seq(), 1);
632
633        // Fill gap
634        assert!(mode.on_receive(2));
635
636        // Should advance
637        assert_eq!(mode.ack_seq(), 4);
638
639        // No NACK needed
640        assert!(mode.build_nack().is_none());
641    }
642
643    #[test]
644    fn test_reliable_stream_duplicate() {
645        let mut mode = ReliableStream::new();
646
647        assert!(mode.on_receive(0));
648        assert!(mode.on_receive(1));
649        assert!(mode.on_receive(2));
650
651        // Duplicate should be rejected
652        assert!(!mode.on_receive(1));
653        assert!(!mode.on_receive(2));
654
655        assert_eq!(mode.ack_seq(), 2);
656    }
657
658    #[test]
659    fn test_reliable_stream_pending() {
660        let mut mode = ReliableStream::new();
661
662        assert!(!mode.has_pending());
663
664        mode.on_send(descriptor(1, Bytes::from_static(b"packet1")));
665        mode.on_send(descriptor(2, Bytes::from_static(b"packet2")));
666
667        assert!(mode.has_pending());
668
669        // ACK should clear pending
670        mode.on_ack(2);
671        assert!(!mode.has_pending());
672    }
673
674    #[test]
675    fn test_reliable_stream_nack_retransmit() {
676        let mut mode = ReliableStream::new();
677
678        mode.on_send(descriptor(1, Bytes::from_static(b"packet1")));
679        mode.on_send(descriptor(2, Bytes::from_static(b"packet2")));
680        mode.on_send(descriptor(3, Bytes::from_static(b"packet3")));
681
682        // NACK saying "received through seq 1, seq 2 is the next
683        // expected (and therefore missing)".
684        let nack = NackPayload {
685            next_expected: 2,
686            missing_bitmap: 0,
687        };
688
689        let retransmits = mode.on_nack(&nack);
690        assert_eq!(retransmits.len(), 1);
691        // The descriptor's first event is the (test-helper-built)
692        // payload of the original send.
693        assert_eq!(&retransmits[0].events[0][..], b"packet2");
694        assert_eq!(retransmits[0].seq, 2);
695    }
696
697    #[test]
698    fn test_reliable_stream_too_far_ahead() {
699        let mut mode = ReliableStream::new();
700
701        assert!(mode.on_receive(0));
702        assert!(mode.on_receive(1));
703
704        // Sequence 100 is too far ahead (beyond 64-bit window)
705        assert!(!mode.on_receive(100));
706
707        assert_eq!(mode.ack_seq(), 1);
708    }
709
710    #[test]
711    fn test_reliable_stream_nack_bitmap_full_window() {
712        // Regression: when the highest received bit was 63 (full 64-bit window),
713        // 1u64 << 64 overflowed, panicking in debug or producing wrong results
714        // in release.
715        let mut mode = ReliableStream::new();
716
717        // Receive packet 0, 1, then packet 65 (exactly 64 past `1`, at
718        // the edge of the window).
719        assert!(mode.on_receive(0));
720        assert!(mode.on_receive(1));
721        assert!(mode.on_receive(65));
722
723        // build_nack should not panic and should report missing sequences
724        let nack = mode.build_nack();
725        assert!(
726            nack.is_some(),
727            "NACK should be generated for a gap spanning the full window"
728        );
729
730        let missing: Vec<_> = nack.unwrap().missing_sequences().collect();
731        // Sequences 2..=64 are missing
732        assert!(!missing.is_empty());
733    }
734
735    /// Regression: when `pending.len() >= max_pending`, `on_send`
736    /// evicts the oldest unacked packet to make room for the new
737    /// one. The evicted packet went on the wire but is no longer
738    /// tracked for retransmit — a NACK can no longer recover it.
739    /// Pre-fix the eviction was unobservable. The fix exposes a
740    /// `untracked_evictions()` counter so a metrics layer can
741    /// surface the silent loss to operators.
742    #[test]
743    fn reliable_stream_records_untracked_evictions_when_window_full() {
744        const MAX_PENDING: usize = 4;
745        let mut mode = ReliableStream::with_settings(Duration::from_millis(50), MAX_PENDING, 3);
746        assert_eq!(mode.untracked_evictions(), 0);
747
748        // Fill the window — no evictions yet.
749        for seq in 0..(MAX_PENDING as u64) {
750            mode.on_send(descriptor(seq, Bytes::from(format!("pkt-{seq}"))));
751        }
752        assert_eq!(mode.untracked_evictions(), 0);
753
754        // The next 3 sends each force an eviction.
755        for seq in (MAX_PENDING as u64)..(MAX_PENDING as u64 + 3) {
756            mode.on_send(descriptor(seq, Bytes::from(format!("pkt-{seq}"))));
757        }
758        assert_eq!(
759            mode.untracked_evictions(),
760            3,
761            "every on_send beyond max_pending must bump untracked_evictions",
762        );
763
764        // The evicted seqs (0, 1, 2) are no longer recoverable via
765        // NACK — pin that behavior so a future change that quietly
766        // re-orders eviction is caught. `missing_sequences()` yields
767        // `[next_expected, next_expected+1+i for set bits]`, so
768        // `next_expected: 0, missing_bitmap: 0b011` requests
769        // [0, 1, 2] without spilling into the still-pending seq 3.
770        let nack = NackPayload {
771            next_expected: 0,
772            missing_bitmap: 0b011,
773        };
774        let retransmits = mode.on_nack(&nack);
775        assert!(
776            retransmits.is_empty(),
777            "evicted seqs must not produce retransmit descriptors, got {} entries",
778            retransmits.len(),
779        );
780    }
781
782    #[test]
783    fn test_create_reliability_mode() {
784        let mode = create_reliability_mode(false);
785        assert_eq!(mode.name(), "fire-and-forget");
786
787        let mode = create_reliability_mode(true);
788        assert_eq!(mode.name(), "reliable");
789    }
790
791    #[test]
792    fn test_reliable_stream_nack_retransmit_full_cycle() {
793        // Full cycle: send packets, receive out of order with gaps,
794        // build NACK, retransmit missing, fill gaps, verify ack_seq advances.
795        let mut sender = ReliableStream::new();
796        let mut receiver = ReliableStream::new();
797
798        // Sender sends packets 0..10
799        for seq in 0..10u64 {
800            sender.on_send(descriptor(seq, Bytes::from(format!("pkt-{}", seq))));
801        }
802        assert!(sender.has_pending());
803
804        // Receiver gets packets 0, 1, 3, 5, 6, 7, 9 (missing 2, 4, 8)
805        assert!(receiver.on_receive(0));
806        assert!(receiver.on_receive(1));
807        assert!(receiver.on_receive(3)); // gap at 2
808        assert!(receiver.on_receive(5)); // gap at 4
809        assert!(receiver.on_receive(6));
810        assert!(receiver.on_receive(7));
811        assert!(receiver.on_receive(9)); // gap at 8
812
813        assert_eq!(receiver.ack_seq(), 1); // contiguous through 1
814
815        // Receiver builds NACK
816        let nack = receiver.build_nack().expect("should have gaps");
817        assert_eq!(nack.next_expected, 2);
818        let missing: Vec<u64> = nack.missing_sequences().collect();
819        assert!(missing.contains(&2), "should report seq 2 missing");
820        assert!(missing.contains(&4), "should report seq 4 missing");
821        assert!(missing.contains(&8), "should report seq 8 missing");
822
823        // Sender processes NACK → retransmits missing packets
824        let retransmits = sender.on_nack(&nack);
825        assert_eq!(retransmits.len(), 3, "should retransmit 3 packets");
826
827        // Receiver fills gaps
828        assert!(receiver.on_receive(2));
829        // After receiving 2: ack_seq should advance through 3, 5, 6, 7
830        // Wait — 4 is still missing, so ack_seq advances to 3 then stops
831        assert_eq!(
832            receiver.ack_seq(),
833            3,
834            "should advance through contiguous 2,3"
835        );
836
837        assert!(receiver.on_receive(4));
838        // Now 4 fills gap: ack_seq advances through 5, 6, 7
839        assert_eq!(receiver.ack_seq(), 7, "should advance through 4,5,6,7");
840
841        assert!(receiver.on_receive(8));
842        // 8 fills gap: ack_seq advances through 9
843        assert_eq!(receiver.ack_seq(), 9, "should advance through 8,9");
844
845        // No more gaps
846        assert!(
847            receiver.build_nack().is_none(),
848            "no gaps remaining after retransmit"
849        );
850    }
851
852    #[test]
853    fn test_reliable_stream_retransmit_timeout() {
854        let mut mode = ReliableStream::with_settings(
855            Duration::from_millis(50), // 50ms RTO — large enough to avoid CI jitter
856            32,
857            3,
858        );
859
860        mode.on_send(descriptor(0, Bytes::from_static(b"pkt-0")));
861        mode.on_send(descriptor(1, Bytes::from_static(b"pkt-1")));
862
863        // Nothing should time out yet (we just sent)
864        let too_early = mode.get_timed_out();
865        assert!(
866            too_early.is_empty(),
867            "packets should not time out before RTO"
868        );
869
870        // Wait well past RTO
871        std::thread::sleep(Duration::from_millis(80));
872
873        let timed_out = mode.get_timed_out();
874        assert_eq!(timed_out.len(), 2, "both packets should time out");
875        assert_eq!(&timed_out[0].events[0][..], b"pkt-0");
876        assert_eq!(&timed_out[1].events[0][..], b"pkt-1");
877
878        // Immediately after retransmit, sent_at was reset — shouldn't time out
879        // again until another RTO elapses
880        let again = mode.get_timed_out();
881        assert!(
882            again.is_empty(),
883            "just retransmitted, shouldn't timeout yet"
884        );
885    }
886
887    #[test]
888    fn test_reliable_stream_max_retries_exhausted() {
889        let mut mode = ReliableStream::with_settings(
890            Duration::from_millis(50),
891            32,
892            2, // max 2 retries
893        );
894
895        mode.on_send(descriptor(0, Bytes::from_static(b"pkt-0")));
896
897        // Exhaust retries (each iteration waits past RTO then triggers retransmit)
898        for _ in 0..3 {
899            std::thread::sleep(Duration::from_millis(80));
900            let _ = mode.get_timed_out();
901        }
902
903        // After max_retries, the packet should no longer be retransmitted
904        std::thread::sleep(Duration::from_millis(80));
905        let timed_out = mode.get_timed_out();
906        assert!(
907            timed_out.is_empty(),
908            "packet should stop being retransmitted after max_retries"
909        );
910    }
911
912    #[test]
913    fn test_regression_has_gaps_misses_interior_holes() {
914        // Regression: has_gaps() used `trailing_zeros() > 0` which relied
915        // on the subtle invariant that bit 0 of sack_bitmap is always 0
916        // after on_receive returns. The old code was accidentally correct
917        // but fragile — any refactor of on_receive could silently break
918        // gap detection.
919        //
920        // Fix: has_gaps() now delegates to missing_bitmap() != 0, which
921        // is correct by construction regardless of bitmap invariants.
922        let mut mode = ReliableStream::new();
923
924        // Receive 0, 1, 2, 4 — gap at 3
925        assert!(mode.on_receive(0));
926        assert!(mode.on_receive(1));
927        assert!(mode.on_receive(2));
928        assert!(mode.on_receive(4));
929
930        assert_eq!(mode.ack_seq(), 2);
931
932        let nack = mode.build_nack().unwrap();
933        let missing: Vec<u64> = nack.missing_sequences().collect();
934        assert!(missing.contains(&3), "should detect gap at seq 3");
935    }
936
937    #[test]
938    fn test_regression_has_gaps_with_filled_first_slot() {
939        // Verify has_gaps detects interior holes even when sequences
940        // immediately after ack_seq are present.
941        let mut mode = ReliableStream::new();
942
943        // Receive 0, 1, 3, 5, 7 — gaps at 2, 4, 6
944        assert!(mode.on_receive(0));
945        assert!(mode.on_receive(1));
946        assert!(mode.on_receive(3));
947        assert!(mode.on_receive(5));
948        assert!(mode.on_receive(7));
949
950        assert_eq!(mode.ack_seq(), 1);
951
952        let nack = mode.build_nack().expect("should detect gaps");
953        let missing: Vec<u64> = nack.missing_sequences().collect();
954        assert!(missing.contains(&2), "should detect gap at seq 2");
955        assert!(missing.contains(&4), "should detect gap at seq 4");
956        assert!(missing.contains(&6), "should detect gap at seq 6");
957        // 4 entries: next_expected=2 (implicit), plus bits for 4 and 6.
958        assert_eq!(missing.len(), 3);
959    }
960
961    #[test]
962    fn test_regression_on_send_evicts_oldest_when_full() {
963        // Regression: on_send silently dropped packets when the pending
964        // queue was full. The packet was sent on the wire but never
965        // recorded for retransmission, so if lost it could never be
966        // recovered via NACK — silently degrading reliability.
967        //
968        // Fix: on_send now evicts the oldest unacked packet to make room,
969        // so the most recent packets are always tracked.
970        let mut mode = ReliableStream::with_settings(
971            Duration::from_millis(50),
972            4, // max 4 pending
973            3,
974        );
975
976        // Send 6 packets (exceeds max_pending of 4)
977        for seq in 0..6u64 {
978            mode.on_send(descriptor(seq, Bytes::from(format!("pkt-{}", seq))));
979        }
980
981        // Should still have exactly max_pending packets tracked
982        assert_eq!(
983            mode.pending.len(),
984            4,
985            "pending queue should be at max_pending"
986        );
987
988        // The oldest packets (0, 1) should have been evicted;
989        // the newest (2, 3, 4, 5) should be retained.
990        let seqs: Vec<u64> = mode.pending.iter().map(|p| p.seq()).collect();
991        assert_eq!(
992            seqs,
993            vec![2, 3, 4, 5],
994            "should retain the most recent packets"
995        );
996
997        // NACK saying "seq 5 is the next expected (and therefore
998        // missing)" — receiver is asking for the retransmit.
999        let nack = NackPayload {
1000            next_expected: 5,
1001            missing_bitmap: 0,
1002        };
1003        let retransmits = mode.on_nack(&nack);
1004        assert_eq!(retransmits.len(), 1);
1005        assert_eq!(&retransmits[0].events[0][..], b"pkt-5");
1006        assert_eq!(retransmits[0].seq, 5);
1007    }
1008
1009    #[test]
1010    fn test_regression_duplicate_seq_zero_rejected() {
1011        // Regression: on_receive had a special case for seq=0 that checked
1012        // `seq == 0 && self.ack_seq == 0`. After receiving seq 0, ack_seq
1013        // was still 0, so a duplicate seq 0 hit the same early return and
1014        // was accepted again — violating exactly-once delivery for reliable
1015        // streams.
1016        //
1017        // Fix: added `received_first` flag to distinguish "never received
1018        // anything" from "received seq 0".
1019        let mut mode = ReliableStream::new();
1020
1021        // First reception of seq 0 should succeed
1022        assert!(mode.on_receive(0), "first seq 0 should be accepted");
1023        assert_eq!(mode.ack_seq(), 0);
1024
1025        // Duplicate seq 0 should be rejected
1026        assert!(
1027            !mode.on_receive(0),
1028            "duplicate seq 0 must be rejected for exactly-once delivery"
1029        );
1030
1031        // Normal continuation should still work
1032        assert!(mode.on_receive(1));
1033        assert_eq!(mode.ack_seq(), 1);
1034    }
1035
1036    #[test]
1037    fn test_regression_seq_zero_after_higher_seqs_rejected() {
1038        // Regression: seq 0 arriving after ack_seq had advanced (e.g., to 5)
1039        // would pass the `seq == 0 && !received_first` check (false, so it
1040        // fell through) and then hit `seq <= self.ack_seq` → duplicate.
1041        // That path was correct, but an earlier version without received_first
1042        // would have reset ack_seq to 0, moving the window backwards.
1043        // This test ensures the fix holds.
1044        let mut mode = ReliableStream::new();
1045
1046        // Receive 0..5 in order
1047        for seq in 0..=5 {
1048            assert!(mode.on_receive(seq));
1049        }
1050        assert_eq!(mode.ack_seq(), 5);
1051
1052        // Late/replayed seq 0 must be rejected and must NOT move ack_seq backwards
1053        assert!(!mode.on_receive(0), "late seq 0 must be rejected");
1054        assert_eq!(mode.ack_seq(), 5, "ack_seq must not move backwards");
1055    }
1056
1057    #[test]
1058    fn test_regression_first_received_seq_one_nacks_seq_zero() {
1059        // Regression (HIGH, BUGS.md): when the first received packet
1060        // on a reliable stream had seq > 0 (the real-world case where
1061        // seq 0 was lost in transit), the receiver silently advanced
1062        // `ack_seq` to that seq, claiming seq 0 had been acknowledged.
1063        // The sender's retransmit of seq 0 was then rejected as a
1064        // duplicate, and seq 0 was permanently lost to the application
1065        // — a reliability-contract violation.
1066        //
1067        // Fix: the receiver now leaves `next_expected` at 0 whenever
1068        // the first received seq is > 0, so the prefix gap is visible
1069        // to `build_nack()` and the retransmit of seq 0 is accepted
1070        // when it arrives.
1071        let mut mode = ReliableStream::new();
1072
1073        // First received packet has seq 1 (seq 0 was lost in transit).
1074        assert!(mode.on_receive(1));
1075        // next_expected must stay at 0 — we haven't received seq 0.
1076        assert_eq!(mode.next_expected(), 0);
1077        assert_eq!(
1078            mode.last_received_contiguous(),
1079            None,
1080            "no contiguous prefix yet"
1081        );
1082
1083        // A NACK must be generated reporting seq 0 as missing.
1084        let nack = mode.build_nack().expect("prefix gap must produce a NACK");
1085        assert_eq!(nack.next_expected, 0, "next_expected in NACK is 0");
1086        let missing: Vec<u64> = nack.missing_sequences().collect();
1087        assert!(
1088            missing.contains(&0),
1089            "NACK must report seq 0 as missing (was the lost first packet)"
1090        );
1091
1092        // Retransmit of seq 0 must be accepted and advance the stream.
1093        assert!(
1094            mode.on_receive(0),
1095            "retransmit of seq 0 must be accepted after it was NACK'd"
1096        );
1097        // Now we have seq 0 and 1 contiguously; next_expected advances.
1098        assert_eq!(mode.next_expected(), 2);
1099        assert_eq!(mode.ack_seq(), 1);
1100
1101        // No more gaps.
1102        assert!(
1103            mode.build_nack().is_none(),
1104            "no gaps after the retransmit filled the prefix"
1105        );
1106    }
1107
1108    #[test]
1109    fn test_regression_first_received_large_seq_bounded_by_window() {
1110        // When the first received packet has a large seq (e.g. the
1111        // first 10 packets were all lost), the receiver can still
1112        // NACK up to the 64-bit bitmap window's worth of gaps. The
1113        // important property is that seq 0 is reported missing and
1114        // can be accepted on retransmit — not that *every* gap before
1115        // the first received seq fits in the bitmap.
1116        let mut mode = ReliableStream::new();
1117
1118        // First received packet is seq 10 (0..9 all lost).
1119        assert!(mode.on_receive(10));
1120        assert_eq!(mode.next_expected(), 0);
1121
1122        let nack = mode.build_nack().expect("prefix gap must produce a NACK");
1123        let missing: Vec<u64> = nack.missing_sequences().collect();
1124        // seq 0 is always reported as missing when any prefix gap exists.
1125        assert!(missing.contains(&0), "NACK must report seq 0 missing");
1126        // seq 1..9 also missing (within the 64-bit bitmap window).
1127        for expected in 1..=9 {
1128            assert!(
1129                missing.contains(&expected),
1130                "NACK must report seq {expected} missing"
1131            );
1132        }
1133
1134        // Sender retransmits seq 0..9 in order.
1135        for seq in 0..10u64 {
1136            assert!(mode.on_receive(seq), "retransmit of seq {seq} accepted");
1137        }
1138        assert_eq!(mode.next_expected(), 11);
1139    }
1140
1141    #[test]
1142    fn test_regression_first_received_duplicate_rejected() {
1143        // When seq 1 arrives first and is accepted (with seq 0 still
1144        // pending NACK), a subsequent duplicate of seq 1 must be
1145        // rejected — not double-counted in the bitmap.
1146        let mut mode = ReliableStream::new();
1147
1148        assert!(mode.on_receive(1), "first seq 1 accepted");
1149        assert!(
1150            !mode.on_receive(1),
1151            "duplicate of seq 1 must be rejected for exactly-once delivery"
1152        );
1153        // State unchanged.
1154        assert_eq!(mode.next_expected(), 0);
1155    }
1156
1157    /// Regression: the retransmit path now stashes pre-encryption
1158    /// rebuild inputs (`RetransmitDescriptor`), not encrypted bytes.
1159    /// Previously, `on_send` recorded the fully-encrypted packet
1160    /// `Bytes` and `on_nack` / `get_timed_out` returned those exact
1161    /// bytes. Replaying them produced the original wire counter on
1162    /// the wire, which the receiver's `update_rx_counter` rejects
1163    /// as a replay — making NACK-driven recovery dead-on-arrival.
1164    ///
1165    /// We pin the new shape: descriptors carry stream_id, seq,
1166    /// events, and flags; multiple retransmits of the same packet
1167    /// must yield the same descriptor (so the caller's
1168    /// re-`builder.build` produces a fresh-counter packet each
1169    /// time).
1170    #[test]
1171    fn retransmit_descriptors_carry_pre_encryption_inputs() {
1172        let mut mode = ReliableStream::with_settings(Duration::from_millis(20), 32, 5);
1173
1174        // Send three packets with realistic descriptors (stream_id,
1175        // events list, flags).
1176        let events_a = vec![Bytes::from_static(b"event-A-payload")];
1177        let events_b = vec![Bytes::from_static(b"event-B-payload")];
1178        let events_c = vec![Bytes::from_static(b"event-C-payload")];
1179        mode.on_send(Arc::new(RetransmitDescriptor {
1180            seq: 0,
1181            stream_id: 7,
1182            events: events_a.clone(),
1183            flags: PacketFlags::RELIABLE,
1184        }));
1185        mode.on_send(Arc::new(RetransmitDescriptor {
1186            seq: 1,
1187            stream_id: 7,
1188            events: events_b.clone(),
1189            flags: PacketFlags::RELIABLE,
1190        }));
1191        mode.on_send(Arc::new(RetransmitDescriptor {
1192            seq: 2,
1193            stream_id: 7,
1194            events: events_c.clone(),
1195            flags: PacketFlags::RELIABLE,
1196        }));
1197
1198        // NACK seq=1.
1199        let nack = NackPayload {
1200            next_expected: 1,
1201            missing_bitmap: 0,
1202        };
1203        let retransmits = mode.on_nack(&nack);
1204        assert_eq!(retransmits.len(), 1);
1205        let r = &retransmits[0];
1206        assert_eq!(r.seq, 1);
1207        assert_eq!(r.stream_id, 7);
1208        assert_eq!(r.events, events_b);
1209        assert_eq!(r.flags, PacketFlags::RELIABLE);
1210
1211        // The descriptor has the inputs needed for
1212        // `PacketBuilder::build(stream_id, seq, &events, flags)`.
1213        // Each retransmit lets the caller produce a fresh-counter
1214        // packet — distinct from the original even though the
1215        // descriptor itself is identical to what was originally
1216        // pushed. This is what fixes the replay-window rejection.
1217        let nack2 = NackPayload {
1218            next_expected: 1,
1219            missing_bitmap: 0,
1220        };
1221        let retransmits2 = mode.on_nack(&nack2);
1222        assert_eq!(retransmits2.len(), 1);
1223        let r2 = &retransmits2[0];
1224        // The descriptor is the same — the *cipher counter* freshness
1225        // is the responsibility of the rebuild caller, not of the
1226        // reliability layer.
1227        assert_eq!(r2.seq, r.seq);
1228        assert_eq!(r2.events, r.events);
1229        assert_eq!(r2.flags, r.flags);
1230        assert_eq!(r2.stream_id, r.stream_id);
1231    }
1232
1233    /// Pin crypto-session perf #133: `on_nack` and `get_timed_out`
1234    /// must emit `Arc::clone`s of the descriptor already held in
1235    /// the retransmit window, not deep copies. Compare backing
1236    /// pointers via `Arc::as_ptr` — a regression that swaps back to
1237    /// `descriptor.clone()` on the inner `RetransmitDescriptor`
1238    /// would silently re-introduce the per-retransmit
1239    /// `Vec<Bytes>` allocation + N `Bytes` refcount bumps.
1240    #[test]
1241    fn retransmits_share_descriptor_via_arc_refcount_not_deep_clone() {
1242        let mut mode = ReliableStream::with_settings(Duration::from_millis(20), 32, 5);
1243
1244        let original = Arc::new(RetransmitDescriptor {
1245            seq: 0,
1246            stream_id: 7,
1247            events: vec![Bytes::from_static(b"event-A")],
1248            flags: PacketFlags::RELIABLE,
1249        });
1250        let original_ptr = Arc::as_ptr(&original);
1251        mode.on_send(Arc::clone(&original));
1252
1253        // NACK path: emitted Arc points at the same allocation as
1254        // the original we pushed (refcount bump, not a clone).
1255        let nack = NackPayload {
1256            next_expected: 0,
1257            missing_bitmap: 1,
1258        };
1259        let from_nack = mode.on_nack(&nack);
1260        assert_eq!(from_nack.len(), 1, "nack should produce one retransmit");
1261        assert_eq!(
1262            Arc::as_ptr(&from_nack[0]),
1263            original_ptr,
1264            "on_nack must clone the Arc, not deep-clone the descriptor"
1265        );
1266
1267        // Timeout path: re-arm the timer, sleep, drain. Same
1268        // pointer-identity assertion as the NACK path.
1269        std::thread::sleep(Duration::from_millis(35));
1270        let from_timeout = mode.get_timed_out();
1271        assert!(
1272            !from_timeout.is_empty(),
1273            "expected at least one timed-out retransmit"
1274        );
1275        assert_eq!(
1276            Arc::as_ptr(&from_timeout[0]),
1277            original_ptr,
1278            "get_timed_out must clone the Arc, not deep-clone the descriptor"
1279        );
1280    }
1281}