net/adapter/net/reliability.rs
1//! Reliability modes for Net streams.
2//!
3//! Net supports two reliability modes:
4//! - Fire-and-forget: No acknowledgments, maximum throughput
5//! - Reliable: Per-stream reliability with selective NACKs
6
7use bytes::Bytes;
8use std::collections::VecDeque;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use super::protocol::{NackPayload, PacketFlags};
14
15/// Pre-encryption inputs needed to rebuild a packet for
16/// retransmission.
17///
18/// The reliable retransmit path used to stash the fully-encrypted
19/// packet bytes, but every encrypted packet carries the cipher's
20/// outer counter stamped at build time. Replaying those exact bytes
21/// produces the same wire counter on the wire, which the receiver's
22/// `update_rx_counter` rejects as a replay — making NACK-driven
23/// recovery a no-op the first time it fired. Stashing the rebuild
24/// inputs instead lets the retransmit driver call
25/// `PacketBuilder::build` with a fresh counter on each retransmit,
26/// so the receiver accepts the recovered packet.
27#[derive(Debug, Clone)]
28pub struct RetransmitDescriptor {
29 /// Per-stream sequence number stamped on the packet header.
30 pub seq: u64,
31 /// Stream id for the rebuild call.
32 pub stream_id: u64,
33 /// Pre-encryption event payloads (the same `&[Bytes]` originally
34 /// passed to `PacketBuilder::build`).
35 pub events: Vec<Bytes>,
36 /// Packet flags as stamped on the original send.
37 pub flags: PacketFlags,
38}
39
40/// Trait for reliability mode implementations.
41///
42/// Per crypto-session perf #133, the descriptor is exchanged as
43/// `Arc<RetransmitDescriptor>` across the trait boundary. The
44/// `RetransmitDescriptor` itself carries an inner
45/// `Vec<Bytes>` of pre-encryption event payloads — at `max_pending =
46/// 32` and ~10 events per packet that's ~320 `Bytes` refcounts
47/// dangling off the retransmit window at any given time. Pre-fix
48/// `on_send` moved the descriptor in by value (one Vec spine + one
49/// refcount bump per inner `Bytes`), and `on_nack` /
50/// `get_timed_out` deep-cloned the descriptor per retransmit (one
51/// Vec alloc + N `Bytes` refcount bumps per emission). Wrapping in
52/// `Arc` makes both paths one atomic refcount bump regardless of
53/// the inner Vec's length.
54pub trait ReliabilityMode: Send + Sync {
55 /// Called when a packet is sent. The descriptor carries pre-
56 /// encryption inputs so the retransmit path can rebuild a
57 /// fresh-counter packet rather than replaying stale ciphertext.
58 fn on_send(&mut self, descriptor: Arc<RetransmitDescriptor>);
59
60 /// Called when a packet is received. Returns true if accepted.
61 fn on_receive(&mut self, seq: u64) -> bool;
62
63 /// Check if this mode requires acknowledgments
64 fn needs_ack(&self) -> bool;
65
66 /// Build a NACK payload if there are missing sequences
67 fn build_nack(&self) -> Option<NackPayload>;
68
69 /// Process a received NACK and return descriptors for the
70 /// caller to rebuild + dispatch. The returned `Arc` clones
71 /// share the inner `RetransmitDescriptor` allocation; the
72 /// caller bumps the refcount instead of deep-cloning the
73 /// `Vec<Bytes>` of events.
74 fn on_nack(&mut self, nack: &NackPayload) -> Vec<Arc<RetransmitDescriptor>>;
75
76 /// Get descriptors that need retransmission due to timeout. See
77 /// [`Self::on_nack`] for the `Arc`-sharing contract.
78 fn get_timed_out(&mut self) -> Vec<Arc<RetransmitDescriptor>>;
79
80 /// Check if there are unacknowledged packets
81 fn has_pending(&self) -> bool;
82
83 /// Get the name of this reliability mode
84 fn name(&self) -> &'static str;
85}
86
87/// Fire-and-forget reliability mode.
88///
89/// No acknowledgments, no retransmission, maximum throughput.
90/// Suitable for:
91/// - LLM token streams
92/// - Embeddings
93/// - Intermediate activations
94/// - Metrics/telemetry
95#[derive(Debug, Default)]
96pub struct FireAndForget {
97 /// Last sequence received (for ordering check)
98 last_seq: AtomicU64,
99}
100
101impl FireAndForget {
102 /// Create a new fire-and-forget mode
103 pub fn new() -> Self {
104 Self::default()
105 }
106}
107
108impl ReliabilityMode for FireAndForget {
109 #[inline]
110 fn on_send(&mut self, _descriptor: Arc<RetransmitDescriptor>) {
111 // Nothing to track
112 }
113
114 #[inline]
115 fn on_receive(&mut self, seq: u64) -> bool {
116 // Update last sequence (informational only)
117 self.last_seq.fetch_max(seq, Ordering::Relaxed);
118 true // Always accept
119 }
120
121 #[inline]
122 fn needs_ack(&self) -> bool {
123 false
124 }
125
126 #[inline]
127 fn build_nack(&self) -> Option<NackPayload> {
128 None
129 }
130
131 #[inline]
132 fn on_nack(&mut self, _nack: &NackPayload) -> Vec<Arc<RetransmitDescriptor>> {
133 Vec::new()
134 }
135
136 #[inline]
137 fn get_timed_out(&mut self) -> Vec<Arc<RetransmitDescriptor>> {
138 Vec::new()
139 }
140
141 #[inline]
142 fn has_pending(&self) -> bool {
143 false
144 }
145
146 #[inline]
147 fn name(&self) -> &'static str {
148 "fire-and-forget"
149 }
150}
151
152/// Unacknowledged packet waiting for ACK/NACK
153#[derive(Debug, Clone)]
154struct UnackedPacket {
155 /// Pre-encryption rebuild inputs. Stashing the descriptor (not
156 /// the encrypted bytes) is what lets the retransmit path
157 /// produce a fresh-counter packet on each NACK / timeout.
158 ///
159 /// Per crypto-session perf #133, the descriptor is held behind
160 /// an `Arc` so that the retransmit emissions (`on_nack` /
161 /// `get_timed_out`) clone the refcount instead of deep-cloning
162 /// the inner `Vec<Bytes>` events list.
163 descriptor: Arc<RetransmitDescriptor>,
164 /// Time when packet was sent
165 sent_at: Instant,
166 /// Number of retransmission attempts
167 retries: u8,
168}
169
170impl UnackedPacket {
171 #[inline]
172 fn seq(&self) -> u64 {
173 self.descriptor.seq
174 }
175}
176
177/// Reliable stream mode with selective NACKs.
178///
179/// Features:
180/// - Bounded retransmit window (32 packets)
181/// - Selective NACKs (receiver-driven)
182/// - Per-stream state
183/// - Configurable RTO
184///
185/// Suitable for:
186/// - Tool call results
187/// - Guardrail decisions
188/// - Session lifecycle events
189/// - Error propagation
190pub struct ReliableStream {
191 /// The next sequence number we haven't yet received. All sequences
192 /// `< next_expected` have been received contiguously. Starts at 0,
193 /// expecting seq 0 as the first packet of the stream.
194 ///
195 /// Use `next_expected()` / `ack_seq()` accessors externally.
196 next_expected: u64,
197 /// SACK bitmap for out-of-order packets. Bit `i` is set iff sequence
198 /// `next_expected + 1 + i` has been received. This represents up to
199 /// 64 future sequences after the contiguous range. As `next_expected`
200 /// advances, the bitmap is right-shifted so bit 0 always represents
201 /// `next_expected + 1`.
202 sack_bitmap: u64,
203 /// Pending unacknowledged packets (bounded)
204 pending: VecDeque<UnackedPacket>,
205 /// Retransmit timeout
206 rto: Duration,
207 /// Maximum pending packets
208 max_pending: usize,
209 /// Maximum retries per packet
210 max_retries: u8,
211 /// Number of unacknowledged packets evicted from `pending` because
212 /// the window was full when `on_send` arrived. The evicted packet
213 /// went on the wire (the caller already issued the syscall) but
214 /// is no longer tracked for retransmit — a NACK for that seq can
215 /// no longer recover it. This counter surfaces the silent loss
216 /// to the metrics layer so operators can size `max_pending` for
217 /// their actual sustained reliable-stream throughput. Pre-fix
218 /// the eviction was unobservable.
219 untracked_evictions: u64,
220}
221
222impl ReliableStream {
223 /// Default retransmit timeout
224 pub const DEFAULT_RTO: Duration = Duration::from_millis(50);
225
226 /// Default max pending packets
227 pub const DEFAULT_MAX_PENDING: usize = 32;
228
229 /// Default max retries
230 pub const DEFAULT_MAX_RETRIES: u8 = 3;
231
232 /// Create a new reliable stream with default settings
233 pub fn new() -> Self {
234 Self {
235 next_expected: 0,
236 sack_bitmap: 0,
237 pending: VecDeque::with_capacity(Self::DEFAULT_MAX_PENDING),
238 rto: Self::DEFAULT_RTO,
239 max_pending: Self::DEFAULT_MAX_PENDING,
240 max_retries: Self::DEFAULT_MAX_RETRIES,
241 untracked_evictions: 0,
242 }
243 }
244
245 /// Create with custom settings
246 pub fn with_settings(rto: Duration, max_pending: usize, max_retries: u8) -> Self {
247 Self {
248 next_expected: 0,
249 sack_bitmap: 0,
250 pending: VecDeque::with_capacity(max_pending),
251 rto,
252 max_pending,
253 max_retries,
254 untracked_evictions: 0,
255 }
256 }
257
258 /// Number of unacknowledged packets that the stream evicted from
259 /// its retransmit window because the window was full at `on_send`
260 /// time. Each eviction means the caller's syscall succeeded
261 /// (bytes left this node) but the packet is no longer tracked
262 /// for retransmit — a NACK can no longer recover it. A non-zero
263 /// value indicates `max_pending` is undersized for the stream's
264 /// sustained throughput. Operators should size up or apply
265 /// upstream backpressure rather than accepting silent loss.
266 #[inline]
267 pub fn untracked_evictions(&self) -> u64 {
268 self.untracked_evictions
269 }
270
271 /// Set the retransmit timeout
272 pub fn set_rto(&mut self, rto: Duration) {
273 self.rto = rto;
274 }
275
276 /// Lowest sequence number we have not yet received. All sequences
277 /// strictly below this value are contiguously received.
278 #[inline]
279 pub fn next_expected(&self) -> u64 {
280 self.next_expected
281 }
282
283 /// Highest contiguously-received sequence number, or `None` if no
284 /// packets have been received yet.
285 #[inline]
286 pub fn last_received_contiguous(&self) -> Option<u64> {
287 if self.next_expected == 0 {
288 None
289 } else {
290 Some(self.next_expected - 1)
291 }
292 }
293
294 /// Get the current ack sequence (highest contiguously-received seq).
295 /// Returns 0 when nothing has been received yet — callers that need
296 /// to distinguish "received seq 0" from "received nothing" should use
297 /// [`Self::last_received_contiguous`] instead.
298 pub fn ack_seq(&self) -> u64 {
299 self.next_expected.saturating_sub(1)
300 }
301
302 /// Process an acknowledgment. `acked` is the highest sequence the
303 /// peer has contiguously received.
304 pub fn on_ack(&mut self, acked: u64) {
305 // Remove all pending packets up to and including acked.
306 while let Some(front) = self.pending.front() {
307 if front.seq() <= acked {
308 self.pending.pop_front();
309 } else {
310 break;
311 }
312 }
313 }
314
315 /// Check if there are gaps in received sequences.
316 ///
317 /// A gap exists whenever at least one future sequence has been
318 /// received out of order — meaning `next_expected` itself is still
319 /// pending (the implicit gap) and any interior missing seqs show
320 /// up as zero bits in the SACK bitmap below the highest received.
321 fn has_gaps(&self) -> bool {
322 self.sack_bitmap != 0
323 }
324
325 /// Get bitmap of missing sequences after `next_expected`.
326 ///
327 /// Bit `i` set means sequence `next_expected + 1 + i` is missing.
328 /// Sequence `next_expected` itself is always implicitly missing
329 /// whenever `has_gaps()` returns true (that's what makes the NACK
330 /// meaningful) — `missing_sequences()` on the resulting NACK emits
331 /// `next_expected` first, then the bits of this bitmap.
332 fn missing_bitmap(&self) -> u64 {
333 // Invert sack_bitmap to get missing sequences; only consider
334 // bits up to the highest received (otherwise we'd claim
335 // sequences we've never heard of are "missing").
336 if self.sack_bitmap == 0 {
337 return 0;
338 }
339 let highest_bit = 63 - self.sack_bitmap.leading_zeros();
340 let mask = if highest_bit >= 63 {
341 u64::MAX
342 } else {
343 (1u64 << (highest_bit + 1)) - 1
344 };
345 (!self.sack_bitmap) & mask
346 }
347}
348
349impl Default for ReliableStream {
350 fn default() -> Self {
351 Self::new()
352 }
353}
354
355impl ReliabilityMode for ReliableStream {
356 fn on_send(&mut self, descriptor: Arc<RetransmitDescriptor>) {
357 // Evict oldest unacked packet if window is full so that the
358 // newest packet is always tracked for retransmission. Without
359 // this, packets sent when the window is full are silently lost
360 // from the retransmit buffer even though they were sent on the
361 // wire — a gap the receiver can never recover via NACK.
362 //
363 // Bump `untracked_evictions` on every eviction so the silent
364 // loss surfaces via the `untracked_evictions()` accessor (and
365 // any metrics layer hooked into it). Pre-fix the eviction was
366 // unobservable: a `max_pending`-undersized stream looked
367 // healthy from the sender side until NACKs started arriving
368 // for sequences whose retransmit had already been dropped.
369 if self.pending.len() >= self.max_pending {
370 self.pending.pop_front();
371 self.untracked_evictions = self.untracked_evictions.saturating_add(1);
372 tracing::warn!(
373 untracked_evictions = self.untracked_evictions,
374 max_pending = self.max_pending,
375 "ReliableStream: retransmit window full; evicted oldest \
376 unacked packet — NACK for that seq can no longer \
377 recover it. Increase max_pending or apply upstream \
378 backpressure.",
379 );
380 }
381 self.pending.push_back(UnackedPacket {
382 descriptor,
383 sent_at: Instant::now(),
384 retries: 0,
385 });
386 }
387
388 fn on_receive(&mut self, seq: u64) -> bool {
389 // Anything below next_expected has already been received
390 // contiguously; reject as a duplicate.
391 if seq < self.next_expected {
392 return false;
393 }
394 if seq == self.next_expected {
395 // Next expected sequence — advance the contiguous range,
396 // then absorb any already-received future seqs that have
397 // just become contiguous.
398 //
399 // Bitmap invariant (before this call): bit i is set iff
400 // seq (old next_expected + 1 + i) has been received. After
401 // incrementing next_expected by 1 (but BEFORE shifting),
402 // bit 0 of the bitmap now refers to seq new_next_expected
403 // itself — which, if set, means that seq was also received
404 // out-of-order earlier and we can advance past it too.
405 self.next_expected += 1;
406 while self.sack_bitmap & 1 != 0 {
407 self.next_expected += 1;
408 self.sack_bitmap >>= 1;
409 }
410 // Restore the bitmap invariant: after the loop,
411 // bit 0 of the bitmap still refers to seq `next_expected`
412 // (not yet received; otherwise the loop would have
413 // consumed it). The invariant wants bit 0 to refer to
414 // seq `next_expected + 1`, so shift once more.
415 self.sack_bitmap >>= 1;
416 return true;
417 }
418 // seq > next_expected: future sequence.
419 //
420 // The bitmap can represent up to 64 future seqs past the
421 // contiguous range. `offset` here is (seq - next_expected),
422 // which is ≥ 1. Bit 0 of the bitmap represents
423 // `next_expected + 1`, so the bit index is `offset - 1`.
424 //
425 // If the first packet of a stream arrives with seq > 0, this
426 // branch records it without advancing next_expected, so
427 // sequences `[0, seq)` remain flagged as missing in the
428 // SACK bitmap — the receiver will request them via a NACK
429 // instead of silently skipping them (which is what the old
430 // code's `seq == ack_seq + 1` branch did, treating seq 0 as
431 // already-acknowledged when the stream actually started with
432 // a lost packet).
433 let offset = seq - self.next_expected;
434 if offset > 64 {
435 return false;
436 }
437 let bit = offset - 1;
438 let mask = 1u64 << bit;
439 if self.sack_bitmap & mask != 0 {
440 // Duplicate of a previously-recorded future seq.
441 return false;
442 }
443 self.sack_bitmap |= mask;
444 true
445 }
446
447 #[inline]
448 fn needs_ack(&self) -> bool {
449 true
450 }
451
452 fn build_nack(&self) -> Option<NackPayload> {
453 if self.has_gaps() {
454 Some(NackPayload {
455 next_expected: self.next_expected,
456 missing_bitmap: self.missing_bitmap(),
457 })
458 } else {
459 None
460 }
461 }
462
463 fn on_nack(&mut self, nack: &NackPayload) -> Vec<Arc<RetransmitDescriptor>> {
464 let mut retransmits = Vec::new();
465
466 // Find packets to retransmit based on NACK. Return the
467 // pre-encryption descriptors so the caller can rebuild
468 // each packet with a fresh cipher counter — replaying the
469 // stashed encrypted bytes would trip the receiver's replay
470 // window. Per perf #133 the descriptor is `Arc`-shared, so
471 // each emission is one atomic refcount bump rather than a
472 // deep `Vec<Bytes>` clone.
473 for missing_seq in nack.missing_sequences() {
474 for unacked in &mut self.pending {
475 if unacked.seq() == missing_seq && unacked.retries < self.max_retries {
476 retransmits.push(Arc::clone(&unacked.descriptor));
477 unacked.retries += 1;
478 unacked.sent_at = Instant::now();
479 break;
480 }
481 }
482 }
483
484 retransmits
485 }
486
487 fn get_timed_out(&mut self) -> Vec<Arc<RetransmitDescriptor>> {
488 let now = Instant::now();
489 let mut retransmits = Vec::new();
490
491 // Per perf #133 — `Arc::clone` bumps a refcount instead of
492 // deep-cloning the `Vec<Bytes>` events list per timed-out
493 // packet.
494 for unacked in &mut self.pending {
495 if now.duration_since(unacked.sent_at) > self.rto && unacked.retries < self.max_retries
496 {
497 retransmits.push(Arc::clone(&unacked.descriptor));
498 unacked.retries += 1;
499 unacked.sent_at = now;
500 }
501 }
502
503 retransmits
504 }
505
506 #[inline]
507 fn has_pending(&self) -> bool {
508 !self.pending.is_empty()
509 }
510
511 #[inline]
512 fn name(&self) -> &'static str {
513 "reliable"
514 }
515}
516
517impl std::fmt::Debug for ReliableStream {
518 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
519 f.debug_struct("ReliableStream")
520 .field("next_expected", &self.next_expected)
521 .field("sack_bitmap", &format!("{:064b}", self.sack_bitmap))
522 .field("pending_count", &self.pending.len())
523 .field("rto_ms", &self.rto.as_millis())
524 .finish()
525 }
526}
527
528/// Create a boxed reliability mode from configuration
529pub fn create_reliability_mode(reliable: bool) -> Box<dyn ReliabilityMode> {
530 if reliable {
531 Box::new(ReliableStream::new())
532 } else {
533 Box::new(FireAndForget::new())
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540
541 /// Test helper: build a `RetransmitDescriptor` from the legacy
542 /// `(seq, packet_bytes)` shape these tests were written against.
543 /// Wraps the bytes as a single-event payload so the in-memory
544 /// shape has something to round-trip through. Returns an
545 /// `Arc<...>` per perf #133 — `on_send` consumes the shared
546 /// allocation.
547 fn descriptor(seq: u64, packet: Bytes) -> Arc<RetransmitDescriptor> {
548 Arc::new(RetransmitDescriptor {
549 seq,
550 stream_id: 0,
551 events: vec![packet],
552 flags: PacketFlags::RELIABLE,
553 })
554 }
555
556 #[test]
557 fn test_fire_and_forget() {
558 let mut mode = FireAndForget::new();
559
560 // Should always accept
561 assert!(mode.on_receive(1));
562 assert!(mode.on_receive(3)); // Gap is fine
563 assert!(mode.on_receive(2)); // Out of order is fine
564
565 // No acks needed
566 assert!(!mode.needs_ack());
567 assert!(mode.build_nack().is_none());
568 assert!(!mode.has_pending());
569
570 // No retransmits
571 mode.on_send(descriptor(1, Bytes::from_static(b"test")));
572 assert!(mode.get_timed_out().is_empty());
573 }
574
575 #[test]
576 fn test_reliable_stream_in_order() {
577 let mut mode = ReliableStream::new();
578
579 // Receive in order starting from seq 0 (the sender always
580 // begins at 0).
581 assert!(mode.on_receive(0));
582 assert_eq!(mode.ack_seq(), 0);
583 assert_eq!(mode.last_received_contiguous(), Some(0));
584
585 assert!(mode.on_receive(1));
586 assert_eq!(mode.ack_seq(), 1);
587
588 assert!(mode.on_receive(2));
589 assert_eq!(mode.ack_seq(), 2);
590
591 assert!(mode.on_receive(3));
592 assert_eq!(mode.ack_seq(), 3);
593
594 // No NACK needed
595 assert!(mode.build_nack().is_none());
596 }
597
598 #[test]
599 fn test_reliable_stream_gap() {
600 let mut mode = ReliableStream::new();
601
602 // Receive with gap (after an initial in-order seq 0 so the
603 // gap is a real mid-stream hole, not a missing prefix).
604 assert!(mode.on_receive(0));
605 assert!(mode.on_receive(1));
606 assert!(mode.on_receive(3)); // Gap at 2
607 assert!(mode.on_receive(5)); // Gap at 4
608
609 assert_eq!(mode.ack_seq(), 1);
610
611 // Should have NACK
612 let nack = mode.build_nack().unwrap();
613 assert_eq!(nack.next_expected, 2);
614
615 // Missing: 2 (the next expected — implicit), 4 (bitmap bit 1).
616 let missing: Vec<_> = nack.missing_sequences().collect();
617 assert!(missing.contains(&2));
618 assert!(missing.contains(&4));
619 }
620
621 #[test]
622 fn test_reliable_stream_fill_gap() {
623 let mut mode = ReliableStream::new();
624
625 // Receive out of order (with seq 0 so the gap is interior, not
626 // a missing prefix).
627 assert!(mode.on_receive(0));
628 assert!(mode.on_receive(1));
629 assert!(mode.on_receive(3));
630 assert!(mode.on_receive(4));
631 assert_eq!(mode.ack_seq(), 1);
632
633 // Fill gap
634 assert!(mode.on_receive(2));
635
636 // Should advance
637 assert_eq!(mode.ack_seq(), 4);
638
639 // No NACK needed
640 assert!(mode.build_nack().is_none());
641 }
642
643 #[test]
644 fn test_reliable_stream_duplicate() {
645 let mut mode = ReliableStream::new();
646
647 assert!(mode.on_receive(0));
648 assert!(mode.on_receive(1));
649 assert!(mode.on_receive(2));
650
651 // Duplicate should be rejected
652 assert!(!mode.on_receive(1));
653 assert!(!mode.on_receive(2));
654
655 assert_eq!(mode.ack_seq(), 2);
656 }
657
658 #[test]
659 fn test_reliable_stream_pending() {
660 let mut mode = ReliableStream::new();
661
662 assert!(!mode.has_pending());
663
664 mode.on_send(descriptor(1, Bytes::from_static(b"packet1")));
665 mode.on_send(descriptor(2, Bytes::from_static(b"packet2")));
666
667 assert!(mode.has_pending());
668
669 // ACK should clear pending
670 mode.on_ack(2);
671 assert!(!mode.has_pending());
672 }
673
674 #[test]
675 fn test_reliable_stream_nack_retransmit() {
676 let mut mode = ReliableStream::new();
677
678 mode.on_send(descriptor(1, Bytes::from_static(b"packet1")));
679 mode.on_send(descriptor(2, Bytes::from_static(b"packet2")));
680 mode.on_send(descriptor(3, Bytes::from_static(b"packet3")));
681
682 // NACK saying "received through seq 1, seq 2 is the next
683 // expected (and therefore missing)".
684 let nack = NackPayload {
685 next_expected: 2,
686 missing_bitmap: 0,
687 };
688
689 let retransmits = mode.on_nack(&nack);
690 assert_eq!(retransmits.len(), 1);
691 // The descriptor's first event is the (test-helper-built)
692 // payload of the original send.
693 assert_eq!(&retransmits[0].events[0][..], b"packet2");
694 assert_eq!(retransmits[0].seq, 2);
695 }
696
697 #[test]
698 fn test_reliable_stream_too_far_ahead() {
699 let mut mode = ReliableStream::new();
700
701 assert!(mode.on_receive(0));
702 assert!(mode.on_receive(1));
703
704 // Sequence 100 is too far ahead (beyond 64-bit window)
705 assert!(!mode.on_receive(100));
706
707 assert_eq!(mode.ack_seq(), 1);
708 }
709
710 #[test]
711 fn test_reliable_stream_nack_bitmap_full_window() {
712 // Regression: when the highest received bit was 63 (full 64-bit window),
713 // 1u64 << 64 overflowed, panicking in debug or producing wrong results
714 // in release.
715 let mut mode = ReliableStream::new();
716
717 // Receive packet 0, 1, then packet 65 (exactly 64 past `1`, at
718 // the edge of the window).
719 assert!(mode.on_receive(0));
720 assert!(mode.on_receive(1));
721 assert!(mode.on_receive(65));
722
723 // build_nack should not panic and should report missing sequences
724 let nack = mode.build_nack();
725 assert!(
726 nack.is_some(),
727 "NACK should be generated for a gap spanning the full window"
728 );
729
730 let missing: Vec<_> = nack.unwrap().missing_sequences().collect();
731 // Sequences 2..=64 are missing
732 assert!(!missing.is_empty());
733 }
734
735 /// Regression: when `pending.len() >= max_pending`, `on_send`
736 /// evicts the oldest unacked packet to make room for the new
737 /// one. The evicted packet went on the wire but is no longer
738 /// tracked for retransmit — a NACK can no longer recover it.
739 /// Pre-fix the eviction was unobservable. The fix exposes a
740 /// `untracked_evictions()` counter so a metrics layer can
741 /// surface the silent loss to operators.
742 #[test]
743 fn reliable_stream_records_untracked_evictions_when_window_full() {
744 const MAX_PENDING: usize = 4;
745 let mut mode = ReliableStream::with_settings(Duration::from_millis(50), MAX_PENDING, 3);
746 assert_eq!(mode.untracked_evictions(), 0);
747
748 // Fill the window — no evictions yet.
749 for seq in 0..(MAX_PENDING as u64) {
750 mode.on_send(descriptor(seq, Bytes::from(format!("pkt-{seq}"))));
751 }
752 assert_eq!(mode.untracked_evictions(), 0);
753
754 // The next 3 sends each force an eviction.
755 for seq in (MAX_PENDING as u64)..(MAX_PENDING as u64 + 3) {
756 mode.on_send(descriptor(seq, Bytes::from(format!("pkt-{seq}"))));
757 }
758 assert_eq!(
759 mode.untracked_evictions(),
760 3,
761 "every on_send beyond max_pending must bump untracked_evictions",
762 );
763
764 // The evicted seqs (0, 1, 2) are no longer recoverable via
765 // NACK — pin that behavior so a future change that quietly
766 // re-orders eviction is caught. `missing_sequences()` yields
767 // `[next_expected, next_expected+1+i for set bits]`, so
768 // `next_expected: 0, missing_bitmap: 0b011` requests
769 // [0, 1, 2] without spilling into the still-pending seq 3.
770 let nack = NackPayload {
771 next_expected: 0,
772 missing_bitmap: 0b011,
773 };
774 let retransmits = mode.on_nack(&nack);
775 assert!(
776 retransmits.is_empty(),
777 "evicted seqs must not produce retransmit descriptors, got {} entries",
778 retransmits.len(),
779 );
780 }
781
782 #[test]
783 fn test_create_reliability_mode() {
784 let mode = create_reliability_mode(false);
785 assert_eq!(mode.name(), "fire-and-forget");
786
787 let mode = create_reliability_mode(true);
788 assert_eq!(mode.name(), "reliable");
789 }
790
791 #[test]
792 fn test_reliable_stream_nack_retransmit_full_cycle() {
793 // Full cycle: send packets, receive out of order with gaps,
794 // build NACK, retransmit missing, fill gaps, verify ack_seq advances.
795 let mut sender = ReliableStream::new();
796 let mut receiver = ReliableStream::new();
797
798 // Sender sends packets 0..10
799 for seq in 0..10u64 {
800 sender.on_send(descriptor(seq, Bytes::from(format!("pkt-{}", seq))));
801 }
802 assert!(sender.has_pending());
803
804 // Receiver gets packets 0, 1, 3, 5, 6, 7, 9 (missing 2, 4, 8)
805 assert!(receiver.on_receive(0));
806 assert!(receiver.on_receive(1));
807 assert!(receiver.on_receive(3)); // gap at 2
808 assert!(receiver.on_receive(5)); // gap at 4
809 assert!(receiver.on_receive(6));
810 assert!(receiver.on_receive(7));
811 assert!(receiver.on_receive(9)); // gap at 8
812
813 assert_eq!(receiver.ack_seq(), 1); // contiguous through 1
814
815 // Receiver builds NACK
816 let nack = receiver.build_nack().expect("should have gaps");
817 assert_eq!(nack.next_expected, 2);
818 let missing: Vec<u64> = nack.missing_sequences().collect();
819 assert!(missing.contains(&2), "should report seq 2 missing");
820 assert!(missing.contains(&4), "should report seq 4 missing");
821 assert!(missing.contains(&8), "should report seq 8 missing");
822
823 // Sender processes NACK → retransmits missing packets
824 let retransmits = sender.on_nack(&nack);
825 assert_eq!(retransmits.len(), 3, "should retransmit 3 packets");
826
827 // Receiver fills gaps
828 assert!(receiver.on_receive(2));
829 // After receiving 2: ack_seq should advance through 3, 5, 6, 7
830 // Wait — 4 is still missing, so ack_seq advances to 3 then stops
831 assert_eq!(
832 receiver.ack_seq(),
833 3,
834 "should advance through contiguous 2,3"
835 );
836
837 assert!(receiver.on_receive(4));
838 // Now 4 fills gap: ack_seq advances through 5, 6, 7
839 assert_eq!(receiver.ack_seq(), 7, "should advance through 4,5,6,7");
840
841 assert!(receiver.on_receive(8));
842 // 8 fills gap: ack_seq advances through 9
843 assert_eq!(receiver.ack_seq(), 9, "should advance through 8,9");
844
845 // No more gaps
846 assert!(
847 receiver.build_nack().is_none(),
848 "no gaps remaining after retransmit"
849 );
850 }
851
852 #[test]
853 fn test_reliable_stream_retransmit_timeout() {
854 let mut mode = ReliableStream::with_settings(
855 Duration::from_millis(50), // 50ms RTO — large enough to avoid CI jitter
856 32,
857 3,
858 );
859
860 mode.on_send(descriptor(0, Bytes::from_static(b"pkt-0")));
861 mode.on_send(descriptor(1, Bytes::from_static(b"pkt-1")));
862
863 // Nothing should time out yet (we just sent)
864 let too_early = mode.get_timed_out();
865 assert!(
866 too_early.is_empty(),
867 "packets should not time out before RTO"
868 );
869
870 // Wait well past RTO
871 std::thread::sleep(Duration::from_millis(80));
872
873 let timed_out = mode.get_timed_out();
874 assert_eq!(timed_out.len(), 2, "both packets should time out");
875 assert_eq!(&timed_out[0].events[0][..], b"pkt-0");
876 assert_eq!(&timed_out[1].events[0][..], b"pkt-1");
877
878 // Immediately after retransmit, sent_at was reset — shouldn't time out
879 // again until another RTO elapses
880 let again = mode.get_timed_out();
881 assert!(
882 again.is_empty(),
883 "just retransmitted, shouldn't timeout yet"
884 );
885 }
886
887 #[test]
888 fn test_reliable_stream_max_retries_exhausted() {
889 let mut mode = ReliableStream::with_settings(
890 Duration::from_millis(50),
891 32,
892 2, // max 2 retries
893 );
894
895 mode.on_send(descriptor(0, Bytes::from_static(b"pkt-0")));
896
897 // Exhaust retries (each iteration waits past RTO then triggers retransmit)
898 for _ in 0..3 {
899 std::thread::sleep(Duration::from_millis(80));
900 let _ = mode.get_timed_out();
901 }
902
903 // After max_retries, the packet should no longer be retransmitted
904 std::thread::sleep(Duration::from_millis(80));
905 let timed_out = mode.get_timed_out();
906 assert!(
907 timed_out.is_empty(),
908 "packet should stop being retransmitted after max_retries"
909 );
910 }
911
912 #[test]
913 fn test_regression_has_gaps_misses_interior_holes() {
914 // Regression: has_gaps() used `trailing_zeros() > 0` which relied
915 // on the subtle invariant that bit 0 of sack_bitmap is always 0
916 // after on_receive returns. The old code was accidentally correct
917 // but fragile — any refactor of on_receive could silently break
918 // gap detection.
919 //
920 // Fix: has_gaps() now delegates to missing_bitmap() != 0, which
921 // is correct by construction regardless of bitmap invariants.
922 let mut mode = ReliableStream::new();
923
924 // Receive 0, 1, 2, 4 — gap at 3
925 assert!(mode.on_receive(0));
926 assert!(mode.on_receive(1));
927 assert!(mode.on_receive(2));
928 assert!(mode.on_receive(4));
929
930 assert_eq!(mode.ack_seq(), 2);
931
932 let nack = mode.build_nack().unwrap();
933 let missing: Vec<u64> = nack.missing_sequences().collect();
934 assert!(missing.contains(&3), "should detect gap at seq 3");
935 }
936
937 #[test]
938 fn test_regression_has_gaps_with_filled_first_slot() {
939 // Verify has_gaps detects interior holes even when sequences
940 // immediately after ack_seq are present.
941 let mut mode = ReliableStream::new();
942
943 // Receive 0, 1, 3, 5, 7 — gaps at 2, 4, 6
944 assert!(mode.on_receive(0));
945 assert!(mode.on_receive(1));
946 assert!(mode.on_receive(3));
947 assert!(mode.on_receive(5));
948 assert!(mode.on_receive(7));
949
950 assert_eq!(mode.ack_seq(), 1);
951
952 let nack = mode.build_nack().expect("should detect gaps");
953 let missing: Vec<u64> = nack.missing_sequences().collect();
954 assert!(missing.contains(&2), "should detect gap at seq 2");
955 assert!(missing.contains(&4), "should detect gap at seq 4");
956 assert!(missing.contains(&6), "should detect gap at seq 6");
957 // 4 entries: next_expected=2 (implicit), plus bits for 4 and 6.
958 assert_eq!(missing.len(), 3);
959 }
960
961 #[test]
962 fn test_regression_on_send_evicts_oldest_when_full() {
963 // Regression: on_send silently dropped packets when the pending
964 // queue was full. The packet was sent on the wire but never
965 // recorded for retransmission, so if lost it could never be
966 // recovered via NACK — silently degrading reliability.
967 //
968 // Fix: on_send now evicts the oldest unacked packet to make room,
969 // so the most recent packets are always tracked.
970 let mut mode = ReliableStream::with_settings(
971 Duration::from_millis(50),
972 4, // max 4 pending
973 3,
974 );
975
976 // Send 6 packets (exceeds max_pending of 4)
977 for seq in 0..6u64 {
978 mode.on_send(descriptor(seq, Bytes::from(format!("pkt-{}", seq))));
979 }
980
981 // Should still have exactly max_pending packets tracked
982 assert_eq!(
983 mode.pending.len(),
984 4,
985 "pending queue should be at max_pending"
986 );
987
988 // The oldest packets (0, 1) should have been evicted;
989 // the newest (2, 3, 4, 5) should be retained.
990 let seqs: Vec<u64> = mode.pending.iter().map(|p| p.seq()).collect();
991 assert_eq!(
992 seqs,
993 vec![2, 3, 4, 5],
994 "should retain the most recent packets"
995 );
996
997 // NACK saying "seq 5 is the next expected (and therefore
998 // missing)" — receiver is asking for the retransmit.
999 let nack = NackPayload {
1000 next_expected: 5,
1001 missing_bitmap: 0,
1002 };
1003 let retransmits = mode.on_nack(&nack);
1004 assert_eq!(retransmits.len(), 1);
1005 assert_eq!(&retransmits[0].events[0][..], b"pkt-5");
1006 assert_eq!(retransmits[0].seq, 5);
1007 }
1008
1009 #[test]
1010 fn test_regression_duplicate_seq_zero_rejected() {
1011 // Regression: on_receive had a special case for seq=0 that checked
1012 // `seq == 0 && self.ack_seq == 0`. After receiving seq 0, ack_seq
1013 // was still 0, so a duplicate seq 0 hit the same early return and
1014 // was accepted again — violating exactly-once delivery for reliable
1015 // streams.
1016 //
1017 // Fix: added `received_first` flag to distinguish "never received
1018 // anything" from "received seq 0".
1019 let mut mode = ReliableStream::new();
1020
1021 // First reception of seq 0 should succeed
1022 assert!(mode.on_receive(0), "first seq 0 should be accepted");
1023 assert_eq!(mode.ack_seq(), 0);
1024
1025 // Duplicate seq 0 should be rejected
1026 assert!(
1027 !mode.on_receive(0),
1028 "duplicate seq 0 must be rejected for exactly-once delivery"
1029 );
1030
1031 // Normal continuation should still work
1032 assert!(mode.on_receive(1));
1033 assert_eq!(mode.ack_seq(), 1);
1034 }
1035
1036 #[test]
1037 fn test_regression_seq_zero_after_higher_seqs_rejected() {
1038 // Regression: seq 0 arriving after ack_seq had advanced (e.g., to 5)
1039 // would pass the `seq == 0 && !received_first` check (false, so it
1040 // fell through) and then hit `seq <= self.ack_seq` → duplicate.
1041 // That path was correct, but an earlier version without received_first
1042 // would have reset ack_seq to 0, moving the window backwards.
1043 // This test ensures the fix holds.
1044 let mut mode = ReliableStream::new();
1045
1046 // Receive 0..5 in order
1047 for seq in 0..=5 {
1048 assert!(mode.on_receive(seq));
1049 }
1050 assert_eq!(mode.ack_seq(), 5);
1051
1052 // Late/replayed seq 0 must be rejected and must NOT move ack_seq backwards
1053 assert!(!mode.on_receive(0), "late seq 0 must be rejected");
1054 assert_eq!(mode.ack_seq(), 5, "ack_seq must not move backwards");
1055 }
1056
1057 #[test]
1058 fn test_regression_first_received_seq_one_nacks_seq_zero() {
1059 // Regression (HIGH, BUGS.md): when the first received packet
1060 // on a reliable stream had seq > 0 (the real-world case where
1061 // seq 0 was lost in transit), the receiver silently advanced
1062 // `ack_seq` to that seq, claiming seq 0 had been acknowledged.
1063 // The sender's retransmit of seq 0 was then rejected as a
1064 // duplicate, and seq 0 was permanently lost to the application
1065 // — a reliability-contract violation.
1066 //
1067 // Fix: the receiver now leaves `next_expected` at 0 whenever
1068 // the first received seq is > 0, so the prefix gap is visible
1069 // to `build_nack()` and the retransmit of seq 0 is accepted
1070 // when it arrives.
1071 let mut mode = ReliableStream::new();
1072
1073 // First received packet has seq 1 (seq 0 was lost in transit).
1074 assert!(mode.on_receive(1));
1075 // next_expected must stay at 0 — we haven't received seq 0.
1076 assert_eq!(mode.next_expected(), 0);
1077 assert_eq!(
1078 mode.last_received_contiguous(),
1079 None,
1080 "no contiguous prefix yet"
1081 );
1082
1083 // A NACK must be generated reporting seq 0 as missing.
1084 let nack = mode.build_nack().expect("prefix gap must produce a NACK");
1085 assert_eq!(nack.next_expected, 0, "next_expected in NACK is 0");
1086 let missing: Vec<u64> = nack.missing_sequences().collect();
1087 assert!(
1088 missing.contains(&0),
1089 "NACK must report seq 0 as missing (was the lost first packet)"
1090 );
1091
1092 // Retransmit of seq 0 must be accepted and advance the stream.
1093 assert!(
1094 mode.on_receive(0),
1095 "retransmit of seq 0 must be accepted after it was NACK'd"
1096 );
1097 // Now we have seq 0 and 1 contiguously; next_expected advances.
1098 assert_eq!(mode.next_expected(), 2);
1099 assert_eq!(mode.ack_seq(), 1);
1100
1101 // No more gaps.
1102 assert!(
1103 mode.build_nack().is_none(),
1104 "no gaps after the retransmit filled the prefix"
1105 );
1106 }
1107
1108 #[test]
1109 fn test_regression_first_received_large_seq_bounded_by_window() {
1110 // When the first received packet has a large seq (e.g. the
1111 // first 10 packets were all lost), the receiver can still
1112 // NACK up to the 64-bit bitmap window's worth of gaps. The
1113 // important property is that seq 0 is reported missing and
1114 // can be accepted on retransmit — not that *every* gap before
1115 // the first received seq fits in the bitmap.
1116 let mut mode = ReliableStream::new();
1117
1118 // First received packet is seq 10 (0..9 all lost).
1119 assert!(mode.on_receive(10));
1120 assert_eq!(mode.next_expected(), 0);
1121
1122 let nack = mode.build_nack().expect("prefix gap must produce a NACK");
1123 let missing: Vec<u64> = nack.missing_sequences().collect();
1124 // seq 0 is always reported as missing when any prefix gap exists.
1125 assert!(missing.contains(&0), "NACK must report seq 0 missing");
1126 // seq 1..9 also missing (within the 64-bit bitmap window).
1127 for expected in 1..=9 {
1128 assert!(
1129 missing.contains(&expected),
1130 "NACK must report seq {expected} missing"
1131 );
1132 }
1133
1134 // Sender retransmits seq 0..9 in order.
1135 for seq in 0..10u64 {
1136 assert!(mode.on_receive(seq), "retransmit of seq {seq} accepted");
1137 }
1138 assert_eq!(mode.next_expected(), 11);
1139 }
1140
1141 #[test]
1142 fn test_regression_first_received_duplicate_rejected() {
1143 // When seq 1 arrives first and is accepted (with seq 0 still
1144 // pending NACK), a subsequent duplicate of seq 1 must be
1145 // rejected — not double-counted in the bitmap.
1146 let mut mode = ReliableStream::new();
1147
1148 assert!(mode.on_receive(1), "first seq 1 accepted");
1149 assert!(
1150 !mode.on_receive(1),
1151 "duplicate of seq 1 must be rejected for exactly-once delivery"
1152 );
1153 // State unchanged.
1154 assert_eq!(mode.next_expected(), 0);
1155 }
1156
1157 /// Regression: the retransmit path now stashes pre-encryption
1158 /// rebuild inputs (`RetransmitDescriptor`), not encrypted bytes.
1159 /// Previously, `on_send` recorded the fully-encrypted packet
1160 /// `Bytes` and `on_nack` / `get_timed_out` returned those exact
1161 /// bytes. Replaying them produced the original wire counter on
1162 /// the wire, which the receiver's `update_rx_counter` rejects
1163 /// as a replay — making NACK-driven recovery dead-on-arrival.
1164 ///
1165 /// We pin the new shape: descriptors carry stream_id, seq,
1166 /// events, and flags; multiple retransmits of the same packet
1167 /// must yield the same descriptor (so the caller's
1168 /// re-`builder.build` produces a fresh-counter packet each
1169 /// time).
1170 #[test]
1171 fn retransmit_descriptors_carry_pre_encryption_inputs() {
1172 let mut mode = ReliableStream::with_settings(Duration::from_millis(20), 32, 5);
1173
1174 // Send three packets with realistic descriptors (stream_id,
1175 // events list, flags).
1176 let events_a = vec![Bytes::from_static(b"event-A-payload")];
1177 let events_b = vec![Bytes::from_static(b"event-B-payload")];
1178 let events_c = vec![Bytes::from_static(b"event-C-payload")];
1179 mode.on_send(Arc::new(RetransmitDescriptor {
1180 seq: 0,
1181 stream_id: 7,
1182 events: events_a.clone(),
1183 flags: PacketFlags::RELIABLE,
1184 }));
1185 mode.on_send(Arc::new(RetransmitDescriptor {
1186 seq: 1,
1187 stream_id: 7,
1188 events: events_b.clone(),
1189 flags: PacketFlags::RELIABLE,
1190 }));
1191 mode.on_send(Arc::new(RetransmitDescriptor {
1192 seq: 2,
1193 stream_id: 7,
1194 events: events_c.clone(),
1195 flags: PacketFlags::RELIABLE,
1196 }));
1197
1198 // NACK seq=1.
1199 let nack = NackPayload {
1200 next_expected: 1,
1201 missing_bitmap: 0,
1202 };
1203 let retransmits = mode.on_nack(&nack);
1204 assert_eq!(retransmits.len(), 1);
1205 let r = &retransmits[0];
1206 assert_eq!(r.seq, 1);
1207 assert_eq!(r.stream_id, 7);
1208 assert_eq!(r.events, events_b);
1209 assert_eq!(r.flags, PacketFlags::RELIABLE);
1210
1211 // The descriptor has the inputs needed for
1212 // `PacketBuilder::build(stream_id, seq, &events, flags)`.
1213 // Each retransmit lets the caller produce a fresh-counter
1214 // packet — distinct from the original even though the
1215 // descriptor itself is identical to what was originally
1216 // pushed. This is what fixes the replay-window rejection.
1217 let nack2 = NackPayload {
1218 next_expected: 1,
1219 missing_bitmap: 0,
1220 };
1221 let retransmits2 = mode.on_nack(&nack2);
1222 assert_eq!(retransmits2.len(), 1);
1223 let r2 = &retransmits2[0];
1224 // The descriptor is the same — the *cipher counter* freshness
1225 // is the responsibility of the rebuild caller, not of the
1226 // reliability layer.
1227 assert_eq!(r2.seq, r.seq);
1228 assert_eq!(r2.events, r.events);
1229 assert_eq!(r2.flags, r.flags);
1230 assert_eq!(r2.stream_id, r.stream_id);
1231 }
1232
1233 /// Pin crypto-session perf #133: `on_nack` and `get_timed_out`
1234 /// must emit `Arc::clone`s of the descriptor already held in
1235 /// the retransmit window, not deep copies. Compare backing
1236 /// pointers via `Arc::as_ptr` — a regression that swaps back to
1237 /// `descriptor.clone()` on the inner `RetransmitDescriptor`
1238 /// would silently re-introduce the per-retransmit
1239 /// `Vec<Bytes>` allocation + N `Bytes` refcount bumps.
1240 #[test]
1241 fn retransmits_share_descriptor_via_arc_refcount_not_deep_clone() {
1242 let mut mode = ReliableStream::with_settings(Duration::from_millis(20), 32, 5);
1243
1244 let original = Arc::new(RetransmitDescriptor {
1245 seq: 0,
1246 stream_id: 7,
1247 events: vec![Bytes::from_static(b"event-A")],
1248 flags: PacketFlags::RELIABLE,
1249 });
1250 let original_ptr = Arc::as_ptr(&original);
1251 mode.on_send(Arc::clone(&original));
1252
1253 // NACK path: emitted Arc points at the same allocation as
1254 // the original we pushed (refcount bump, not a clone).
1255 let nack = NackPayload {
1256 next_expected: 0,
1257 missing_bitmap: 1,
1258 };
1259 let from_nack = mode.on_nack(&nack);
1260 assert_eq!(from_nack.len(), 1, "nack should produce one retransmit");
1261 assert_eq!(
1262 Arc::as_ptr(&from_nack[0]),
1263 original_ptr,
1264 "on_nack must clone the Arc, not deep-clone the descriptor"
1265 );
1266
1267 // Timeout path: re-arm the timer, sleep, drain. Same
1268 // pointer-identity assertion as the NACK path.
1269 std::thread::sleep(Duration::from_millis(35));
1270 let from_timeout = mode.get_timed_out();
1271 assert!(
1272 !from_timeout.is_empty(),
1273 "expected at least one timed-out retransmit"
1274 );
1275 assert_eq!(
1276 Arc::as_ptr(&from_timeout[0]),
1277 original_ptr,
1278 "get_timed_out must clone the Arc, not deep-clone the descriptor"
1279 );
1280 }
1281}