Skip to main content

net/adapter/net/
router.rs

1//! Net Router for single-hop and multi-hop packet routing.
2//!
3//! The router handles:
4//! - Stream multiplexing across thousands of streams
5//! - Fair scheduling to prevent stream starvation
6//! - Low-latency packet forwarding
7//! - Per-stream statistics
8
9use arc_swap::ArcSwap;
10use bytes::{Bytes, BytesMut};
11use crossbeam_queue::ArrayQueue;
12use dashmap::DashMap;
13use std::net::SocketAddr;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::net::UdpSocket;
18use tokio::sync::Notify;
19
20use super::protocol::HEADER_SIZE;
21use super::route::{RoutingHeader, RoutingTable, ROUTING_HEADER_SIZE};
22
23// --- Phase 0 instrument (NRPC_SEND_LOOP_BATCHING_PLAN) -------------------
24//
25// Measures the send loop's *drain run-length*: how many packets
26// `scheduler.dequeue()` returns back-to-back before the loop falls through
27// to `wait()`. This is the batchability signal — a sendmmsg drain only pays
28// when runs are long. Off unless `arm_send_drain_histo()` is called BEFORE
29// the send loop starts (the loop latches the flag once at startup), so an
30// unarmed/production loop pays only a stack-local increment.
31//
32// Measurement scaffolding — not part of the supported public API. The
33// arm/snapshot entry points are re-exported only so the in-repo integration
34// tests (`send_drain_depth`, `scheduled_stream_integrity`), which compile as
35// external crates, can drive them; every such entry point is `#[doc(hidden)]`
36// so it never appears in the crate's documented surface and downstream code
37// is not invited to depend on it.
38static SEND_DRAIN_HISTO: [AtomicU64; 9] = [
39    AtomicU64::new(0),
40    AtomicU64::new(0),
41    AtomicU64::new(0),
42    AtomicU64::new(0),
43    AtomicU64::new(0),
44    AtomicU64::new(0),
45    AtomicU64::new(0),
46    AtomicU64::new(0),
47    AtomicU64::new(0),
48];
49static SEND_DRAIN_ARMED: AtomicBool = AtomicBool::new(false);
50static SEND_DRAIN_MAX: AtomicU64 = AtomicU64::new(0);
51
52/// Arm the send-loop drain-run histogram. Must be called BEFORE the
53/// router's send loop starts — the loop reads the flag once at startup.
54#[doc(hidden)]
55pub fn arm_send_drain_histo() {
56    SEND_DRAIN_ARMED.store(true, Ordering::Relaxed);
57}
58
59/// Snapshot the drain-run histogram. Index `i` (for `i < 8`) counts drain
60/// runs whose length fell in `[2^i, 2^(i+1))`; index `8` is `[256, ∞)`.
61#[doc(hidden)]
62pub fn send_drain_histo_snapshot() -> [u64; 9] {
63    let mut out = [0u64; 9];
64    for (i, b) in SEND_DRAIN_HISTO.iter().enumerate() {
65        out[i] = b.load(Ordering::Relaxed);
66    }
67    out
68}
69
70/// Longest single drain run observed so far (exact, not bucketed).
71#[doc(hidden)]
72pub fn send_drain_max() -> u64 {
73    SEND_DRAIN_MAX.load(Ordering::Relaxed)
74}
75
76#[inline]
77fn record_drain_run(run_len: u64) {
78    if run_len == 0 {
79        return;
80    }
81    // floor(log2(run_len)), clamped to the top bucket.
82    let bucket = (63 - run_len.leading_zeros()).min(8) as usize;
83    SEND_DRAIN_HISTO[bucket].fetch_add(1, Ordering::Relaxed);
84    SEND_DRAIN_MAX.fetch_max(run_len, Ordering::Relaxed);
85}
86
87// One flush = one destination group sent in a drain = one `sendmmsg`
88// syscall on Linux. The packets/flush ratio is the syscall-collapse factor.
89static SEND_BATCH_FLUSHES: AtomicU64 = AtomicU64::new(0);
90static SEND_BATCH_PACKETS: AtomicU64 = AtomicU64::new(0);
91
92/// `(flushes, packets)` sent via the batched-drain path. `packets / flushes`
93/// is the realized syscall-collapse factor (≈ `MAX_BATCH_SIZE` when full).
94#[doc(hidden)]
95pub fn send_batch_stats() -> (u64, u64) {
96    (
97        SEND_BATCH_FLUSHES.load(Ordering::Relaxed),
98        SEND_BATCH_PACKETS.load(Ordering::Relaxed),
99    )
100}
101
102#[inline]
103fn record_batch_flush(packets: u64) {
104    SEND_BATCH_FLUSHES.fetch_add(1, Ordering::Relaxed);
105    SEND_BATCH_PACKETS.fetch_add(packets, Ordering::Relaxed);
106}
107
108/// Append `data` to the destination group for `dest`, creating the group if
109/// this is the first packet for that peer in the current drain. Linear scan
110/// over `groups` — fine, since the distinct-peer count per drain is small
111/// (and bounded by `reset_dest_groups`).
112#[inline]
113fn group_by_dest(groups: &mut Vec<(SocketAddr, Vec<Bytes>)>, dest: SocketAddr, data: Bytes) {
114    match groups.iter_mut().find(|(d, _)| *d == dest) {
115        Some((_, v)) => v.push(data),
116        None => groups.push((dest, vec![data])),
117    }
118}
119
120/// Reset the per-destination group buffers between drains. Reuses the slots
121/// (and their grown inner-Vec capacity) to avoid reallocating on the send
122/// hot path, but drops the whole set once it exceeds `cap` so the dest set
123/// cannot grow without bound under peer churn — a node that sends to many
124/// distinct peers over its lifetime would otherwise keep a stale slot per
125/// peer forever, inflating memory and the linear `group_by_dest` scan. One
126/// drain touches at most `cap` (`MAX_DRAIN`) dests, so the bounded set stays
127/// at `cap + 1` worst case.
128#[inline]
129fn reset_dest_groups(groups: &mut Vec<(SocketAddr, Vec<Bytes>)>, cap: usize) {
130    if groups.len() > cap {
131        groups.clear();
132    } else {
133        for (_, v) in groups.iter_mut() {
134            v.clear();
135        }
136    }
137}
138
139/// Test-only loss injection shared by the single and batched send paths:
140/// returns `true` when this packet should be dropped (every Nth, `n > 0`).
141#[inline]
142fn drop_injected(drop_every_n: &AtomicU64, drop_counter: &AtomicU64) -> bool {
143    let n = drop_every_n.load(Ordering::Relaxed);
144    n > 0
145        && drop_counter
146            .fetch_add(1, Ordering::Relaxed)
147            .wrapping_add(1)
148            .is_multiple_of(n)
149}
150// ------------------------------------------------------------------------
151
152/// Router configuration
153#[derive(Debug, Clone)]
154pub struct RouterConfig {
155    /// Local node ID
156    pub local_id: u64,
157    /// Bind address
158    pub bind_addr: SocketAddr,
159    /// Maximum queue depth per stream
160    pub max_queue_depth: usize,
161    /// Fair scheduling quantum (packets per stream per round)
162    pub fair_quantum: usize,
163    /// Idle stream timeout (nanoseconds)
164    pub idle_timeout_ns: u64,
165    /// Enable priority queue bypass
166    pub priority_bypass: bool,
167}
168
169impl Default for RouterConfig {
170    fn default() -> Self {
171        Self {
172            local_id: 0,
173            bind_addr: SocketAddr::from(([0, 0, 0, 0], 0)),
174            max_queue_depth: 1024,
175            fair_quantum: 16,
176            idle_timeout_ns: 30_000_000_000, // 30 seconds
177            priority_bypass: true,
178        }
179    }
180}
181
182impl RouterConfig {
183    /// Create a new router config with defaults
184    pub fn new(local_id: u64, bind_addr: SocketAddr) -> Self {
185        Self {
186            local_id,
187            bind_addr,
188            ..Default::default()
189        }
190    }
191}
192
193/// Queued packet for fair scheduling
194pub struct QueuedPacket {
195    /// Packet data
196    pub data: Bytes,
197    /// Destination address
198    pub dest: SocketAddr,
199    /// Stream identifier
200    pub stream_id: u64,
201    /// Whether this is a priority packet
202    pub priority: bool,
203    /// Time the packet was queued
204    pub queued_at: Instant,
205}
206
207/// Per-stream queue for fair scheduling
208struct StreamQueue {
209    queue: ArrayQueue<QueuedPacket>,
210    packets_sent_this_round: AtomicU64,
211    /// Fairness weight (quantum multiplier). `1` is equal-share; higher
212    /// values let this stream ship `weight × session_quantum` packets
213    /// per round before round-reset. Always ≥ 1.
214    weight: AtomicU64,
215}
216
217impl StreamQueue {
218    fn new(capacity: usize) -> Self {
219        Self::new_with_weight(capacity, 1)
220    }
221
222    fn new_with_weight(capacity: usize, weight: u8) -> Self {
223        Self {
224            queue: ArrayQueue::new(capacity),
225            packets_sent_this_round: AtomicU64::new(0),
226            weight: AtomicU64::new(weight.max(1) as u64),
227        }
228    }
229
230    fn push(&self, packet: QueuedPacket) -> Result<(), QueuedPacket> {
231        self.queue.push(packet)
232    }
233
234    fn pop(&self) -> Option<QueuedPacket> {
235        self.queue.pop()
236    }
237
238    #[allow(dead_code)]
239    fn len(&self) -> usize {
240        self.queue.len()
241    }
242
243    fn is_empty(&self) -> bool {
244        self.queue.is_empty()
245    }
246
247    fn reset_round(&self) {
248        self.packets_sent_this_round.store(0, Ordering::Relaxed);
249    }
250
251    fn increment_sent(&self) -> u64 {
252        self.packets_sent_this_round.fetch_add(1, Ordering::Relaxed)
253    }
254
255    fn sent_this_round(&self) -> u64 {
256        self.packets_sent_this_round.load(Ordering::Relaxed)
257    }
258
259    fn weight(&self) -> u64 {
260        self.weight.load(Ordering::Relaxed).max(1)
261    }
262
263    #[allow(dead_code)]
264    fn set_weight(&self, weight: u8) {
265        self.weight.store(weight.max(1) as u64, Ordering::Relaxed);
266    }
267}
268
269/// Fair scheduler for stream fairness
270pub struct FairScheduler {
271    /// Per-stream queues
272    streams: DashMap<u64, Arc<StreamQueue>>,
273    /// PERF_AUDIT §3.1 — incrementally-maintained snapshot of the
274    /// set of stream-ids in `streams`. Pre-fix `dequeue` allocated
275    /// a fresh `Vec<u64>` per call (often twice on quantum-reset
276    /// paths) by walking the DashMap — O(streams) work + a heap
277    /// alloc on the per-packet send path that §2.1's batching
278    /// wants to route MORE traffic through. The snapshot is
279    /// rebuilt only when the stream-set actually changes (the
280    /// vacant arm of `enqueue` / `set_stream_weight`, or after
281    /// `cleanup_empty` evicts at least one entry), so steady-
282    /// state dequeue pays a single `ArcSwap::load_full` (a
283    /// relaxed-load + Arc clone). Long-lived streams mean
284    /// rebuild rate is orders of magnitude below dequeue rate.
285    ///
286    /// The snapshot may briefly lag the DashMap (a concurrent
287    /// enqueue can add a stream between snapshot-build and
288    /// dequeue's iteration). That's the same race as the pre-fix
289    /// code — a dequeue that started before the new stream landed
290    /// won't service it this call; the next call's load picks up
291    /// the updated snapshot.
292    active_streams: ArcSwap<Vec<u64>>,
293    /// Version counter for the active-stream snapshot protocol.
294    /// Bumped at every stream-set mutation site *after* the DashMap
295    /// change and *before* [`Self::rebuild_active_streams`] runs.
296    /// The rebuild loop re-checks this after its `store`: if the
297    /// version moved while it was collecting, another mutation
298    /// landed concurrently and this rebuild's snapshot may be stale
299    /// — retry. Without this, two racing rebuilds can store out of
300    /// order (cleanup's pre-insert collection overwriting enqueue's
301    /// post-insert one), leaving a stream with queued packets
302    /// permanently absent from the snapshot: occupied-arm enqueues
303    /// never rebuild and cleanup never evicts a non-empty queue, so
304    /// nothing would ever repair it (lost wakeup).
305    active_streams_version: AtomicU64,
306    /// Priority queue (bypasses fair scheduling)
307    priority_queue: ArrayQueue<QueuedPacket>,
308    /// Fair quantum
309    quantum: usize,
310    /// Max queue depth per stream
311    max_depth: usize,
312    /// Notification for new packets
313    notify: Notify,
314    /// Total packets queued
315    total_queued: AtomicU64,
316    /// Total packets dropped
317    total_dropped: AtomicU64,
318    /// Rotation index for round-robin fairness across dequeue calls.
319    /// Only advances when a dequeue actually pops a packet, so
320    /// rotation correlates with service events rather than poll
321    /// frequency.
322    round_robin_idx: AtomicU64,
323    /// Counter incremented on every `dequeue` call (regardless of
324    /// whether a packet was popped) for the modulo-1024 cleanup
325    /// trigger. Decoupled from `round_robin_idx` so the rotation fix
326    /// doesn't accidentally suppress cleanup.
327    dequeue_call_count: AtomicU64,
328    /// **Current** in-queue packet count (enqueued minus dequeued),
329    /// across the priority lane and all per-stream queues. Unlike
330    /// `total_queued` (a cumulative enqueue counter that never
331    /// decrements), this reflects live backlog — the send loop reads it
332    /// to decide whether a batched drain is worthwhile.
333    current_depth: AtomicU64,
334}
335
336impl FairScheduler {
337    /// Create a new fair scheduler
338    pub fn new(quantum: usize, max_depth: usize) -> Self {
339        Self {
340            streams: DashMap::new(),
341            active_streams: ArcSwap::from_pointee(Vec::new()),
342            active_streams_version: AtomicU64::new(0),
343            priority_queue: ArrayQueue::new(max_depth),
344            quantum,
345            max_depth,
346            notify: Notify::new(),
347            total_queued: AtomicU64::new(0),
348            total_dropped: AtomicU64::new(0),
349            round_robin_idx: AtomicU64::new(0),
350            dequeue_call_count: AtomicU64::new(0),
351            current_depth: AtomicU64::new(0),
352        }
353    }
354
355    /// PERF_AUDIT §3.1 — mark the stream set as changed and rebuild
356    /// the snapshot. Must be called *after* the DashMap mutation so
357    /// any concurrent rebuild that observes the new version also
358    /// observes the mutation. Called from every stream-set mutation
359    /// site: the vacant arm of [`Self::enqueue`] and
360    /// [`Self::set_stream_weight`], and [`Self::cleanup_empty`]
361    /// when at least one entry was removed.
362    fn note_stream_set_changed(&self) {
363        self.active_streams_version.fetch_add(1, Ordering::SeqCst);
364        self.rebuild_active_streams();
365    }
366
367    /// PERF_AUDIT §3.1 — rebuild the active-stream snapshot from
368    /// the current `streams` DashMap and atomically swap it in.
369    ///
370    /// Lost-wakeup protection: the version counter is re-checked
371    /// after the `store`. If it moved while this rebuild was
372    /// collecting, a concurrent mutation landed and our snapshot
373    /// may predate it — and ArcSwap stores from racing rebuilds
374    /// can land in either order, so a stale collection could
375    /// otherwise overwrite a fresher one and strand a non-empty
376    /// stream outside the snapshot forever. Retrying until the
377    /// version is stable across collect+store guarantees the last
378    /// snapshot standing reflects every mutation (each mutation
379    /// bumps the version *before* its own rebuild, so whichever
380    /// rebuild finishes last either saw the final version or
381    /// retries). SeqCst keeps the version loads and the ArcSwap
382    /// store in one total order; rebuilds only run on stream-set
383    /// changes, so the cost is off the per-packet path.
384    fn rebuild_active_streams(&self) {
385        loop {
386            let v = self.active_streams_version.load(Ordering::SeqCst);
387            let keys: Vec<u64> = self.streams.iter().map(|e| *e.key()).collect();
388            self.active_streams.store(Arc::new(keys));
389            if self.active_streams_version.load(Ordering::SeqCst) == v {
390                break;
391            }
392            // Version moved mid-rebuild: another mutation raced us.
393            // Our snapshot may be stale — collect again.
394        }
395    }
396
397    /// Set the fair-scheduling weight for a stream. `weight` is a quantum
398    /// multiplier: 1 = equal share (default), higher values give the
399    /// stream proportionally more packets per round. Creates the stream
400    /// queue if it doesn't exist yet, so callers can set the weight
401    /// before any traffic flows.
402    pub fn set_stream_weight(&self, stream_id: u64, weight: u8) {
403        let weight = weight.max(1);
404        // PERF_AUDIT §3.1 — detect new vs existing so the
405        // active-stream snapshot only rebuilds when the set
406        // actually changes.
407        let was_new = match self.streams.entry(stream_id) {
408            dashmap::mapref::entry::Entry::Occupied(occ) => {
409                occ.get().set_weight(weight);
410                false
411            }
412            dashmap::mapref::entry::Entry::Vacant(vac) => {
413                vac.insert(Arc::new(StreamQueue::new_with_weight(
414                    self.max_depth,
415                    weight,
416                )));
417                true
418            }
419        };
420        if was_new {
421            self.note_stream_set_changed();
422        }
423    }
424
425    /// Enqueue a packet
426    pub fn enqueue(&self, packet: QueuedPacket) -> bool {
427        if packet.priority {
428            // Priority packets bypass fair scheduling
429            if self.priority_queue.push(packet).is_ok() {
430                self.total_queued.fetch_add(1, Ordering::Relaxed);
431                self.current_depth.fetch_add(1, Ordering::Relaxed);
432                self.notify.notify_one();
433                return true;
434            }
435        } else {
436            // PERF_AUDIT §3.1 — get-or-create with new/existing
437            // discrimination so the active-stream snapshot is
438            // rebuilt only when the set actually changes.
439            let (queue, is_new) = match self.streams.entry(packet.stream_id) {
440                dashmap::mapref::entry::Entry::Occupied(occ) => (occ.get().clone(), false),
441                dashmap::mapref::entry::Entry::Vacant(vac) => {
442                    let arc = Arc::new(StreamQueue::new(self.max_depth));
443                    vac.insert(arc.clone());
444                    (arc, true)
445                }
446            };
447            if is_new {
448                self.note_stream_set_changed();
449            }
450
451            if queue.push(packet).is_ok() {
452                self.total_queued.fetch_add(1, Ordering::Relaxed);
453                self.current_depth.fetch_add(1, Ordering::Relaxed);
454                self.notify.notify_one();
455                return true;
456            }
457        }
458
459        self.total_dropped.fetch_add(1, Ordering::Relaxed);
460        false
461    }
462
463    /// Dequeue next packet (fair round-robin)
464    pub fn dequeue(&self) -> Option<QueuedPacket> {
465        // Periodically clean up empty stream queues to prevent unbounded growth
466        // of the DashMap. Check every 1024 dequeue calls (cheap modular check).
467        // Tracked on a separate counter from `round_robin_idx` so
468        // that decoupling rotation from poll frequency doesn't
469        // suppress the cleanup trigger.
470        let call_count = self
471            .dequeue_call_count
472            .fetch_add(1, Ordering::Relaxed)
473            .wrapping_add(1);
474        if call_count.is_multiple_of(1024) {
475            self.cleanup_empty();
476        }
477
478        // Priority queue first
479        if let Some(packet) = self.priority_queue.pop() {
480            self.current_depth.fetch_sub(1, Ordering::Relaxed);
481            return Some(packet);
482        }
483
484        // PERF_AUDIT §3.1 — read the incrementally-maintained
485        // active-stream snapshot instead of walking the DashMap
486        // per call. `load_full` is one relaxed atomic load + Arc
487        // clone (no Vec alloc, no DashMap iteration).
488        let keys = self.active_streams.load_full();
489        if keys.is_empty() {
490            return None;
491        }
492        // Advance the round-robin cursor only when we actually pop a
493        // packet. Previously this used `fetch_add(1)` unconditionally,
494        // so every empty poll advanced the rotation as if we'd
495        // serviced a stream. The result was that under bursty mixed
496        // traffic, polls that found no packet still nudged the cursor
497        // past the stream that *would* have had a packet on the next
498        // tick, biasing service away from streams that became
499        // non-empty
500        // mid-pass.
501        //
502        // Now: read the cursor for the starting offset, then
503        // commit a `fetch_add(1)` only inside the successful pop
504        // arm. Behavioral effect is the same when packets are
505        // available; the difference shows up under contention
506        // and on dequeues that find nothing.
507        let start = self.round_robin_idx.load(Ordering::Relaxed) as usize % keys.len();
508
509        // Round-robin across streams, starting from the rotated index.
510        // Each stream's quantum is `base_quantum × stream.weight`, so
511        // a weight-4 stream gets 4× the packets per round of a weight-1
512        // stream before round-reset. Default weight is 1 = unchanged.
513        for i in 0..keys.len() {
514            let key = keys[(start + i) % keys.len()];
515            if let Some(queue) = self.streams.get(&key) {
516                let stream_quantum = (self.quantum as u64).saturating_mul(queue.weight());
517                if queue.sent_this_round() < stream_quantum && !queue.is_empty() {
518                    if let Some(packet) = queue.pop() {
519                        queue.increment_sent();
520                        self.current_depth.fetch_sub(1, Ordering::Relaxed);
521                        // Advance the rotation cursor only on
522                        // successful pop.
523                        self.round_robin_idx.fetch_add(1, Ordering::Relaxed);
524                        return Some(packet);
525                    }
526                }
527            }
528        }
529
530        // If all streams exhausted their quantum, reset and try again.
531        // Re-load the active-stream snapshot so streams added since
532        // the first load are also considered — `load_full` is
533        // cheap (no Vec alloc), and may pick up an updated snapshot
534        // if a concurrent enqueue rebuilt it mid-pass.
535        let keys = self.active_streams.load_full();
536        if keys.is_empty() {
537            return None;
538        }
539        let mut has_packets = false;
540        for key in keys.iter() {
541            if let Some(queue) = self.streams.get(key) {
542                queue.reset_round();
543                if !queue.is_empty() {
544                    has_packets = true;
545                }
546            }
547        }
548
549        if has_packets {
550            let start = self.round_robin_idx.load(Ordering::Relaxed) as usize % keys.len();
551            for i in 0..keys.len() {
552                let key = keys[(start + i) % keys.len()];
553                if let Some(queue) = self.streams.get(&key) {
554                    if let Some(packet) = queue.pop() {
555                        queue.increment_sent();
556                        self.current_depth.fetch_sub(1, Ordering::Relaxed);
557                        self.round_robin_idx.fetch_add(1, Ordering::Relaxed);
558                        return Some(packet);
559                    }
560                }
561            }
562        }
563
564        None
565    }
566
567    /// Wait for new packets
568    pub async fn wait(&self) {
569        self.notify.notified().await;
570    }
571
572    /// Current live backlog: packets enqueued but not yet dequeued,
573    /// summed across the priority lane and all per-stream queues.
574    /// Distinct from [`Self::total_queued`], which only counts upward.
575    pub fn current_depth(&self) -> u64 {
576        self.current_depth.load(Ordering::Relaxed)
577    }
578
579    /// Get total queued count
580    pub fn total_queued(&self) -> u64 {
581        self.total_queued.load(Ordering::Relaxed)
582    }
583
584    /// Get total dropped count
585    pub fn total_dropped(&self) -> u64 {
586        self.total_dropped.load(Ordering::Relaxed)
587    }
588
589    /// Get number of active streams
590    pub fn stream_count(&self) -> usize {
591        self.streams.len()
592    }
593
594    /// Clean up empty stream queues.
595    ///
596    /// Only removes queues that are both empty and have no outstanding
597    /// `Arc` references (strong_count == 1 means only the DashMap holds it).
598    /// This prevents a race where `enqueue` clones the Arc, cleanup removes
599    /// the entry, and the enqueued packet becomes unreachable.
600    pub fn cleanup_empty(&self) -> usize {
601        let mut removed = 0;
602        self.streams.retain(|_, queue| {
603            if queue.is_empty() && Arc::strong_count(queue) == 1 {
604                removed += 1;
605                false
606            } else {
607                true
608            }
609        });
610        // PERF_AUDIT §3.1 — rebuild the active-stream snapshot
611        // when at least one entry was evicted so the dequeue
612        // path stops iterating over keys that no longer exist.
613        // No-op rebuild on a clean pass is cheap (single Vec
614        // alloc the same size as the prior snapshot, swapped in
615        // via ArcSwap), but the branch keeps the steady-state
616        // cleanup-call cost at zero rebuilds.
617        if removed > 0 {
618            self.note_stream_set_changed();
619        }
620        removed
621    }
622}
623
624/// Router statistics
625#[derive(Debug, Clone, Default)]
626pub struct RouterStats {
627    /// Packets received
628    pub packets_received: u64,
629    /// Packets forwarded
630    pub packets_forwarded: u64,
631    /// Packets delivered locally
632    pub packets_local: u64,
633    /// Packets dropped (TTL, no route, queue full)
634    pub packets_dropped: u64,
635    /// Bytes received
636    pub bytes_received: u64,
637    /// Bytes forwarded
638    pub bytes_forwarded: u64,
639    /// Active routes
640    pub routes: usize,
641    /// Active streams
642    pub streams: usize,
643    /// Average routing latency (nanoseconds)
644    pub avg_latency_ns: u64,
645}
646
647/// Net Router
648pub struct NetRouter {
649    /// Configuration
650    #[allow(dead_code)]
651    config: RouterConfig,
652    /// UDP socket
653    socket: Arc<UdpSocket>,
654    /// Routing table
655    routing_table: Arc<RoutingTable>,
656    /// Fair scheduler
657    scheduler: Arc<FairScheduler>,
658    /// Running flag
659    running: Arc<AtomicBool>,
660    /// Statistics
661    packets_received: AtomicU64,
662    packets_forwarded: AtomicU64,
663    packets_local: AtomicU64,
664    packets_dropped: AtomicU64,
665    bytes_received: AtomicU64,
666    bytes_forwarded: AtomicU64,
667    total_latency_ns: AtomicU64,
668    latency_samples: AtomicU64,
669    /// Test-only fault injection: when `> 0`, the send loop drops every
670    /// Nth dequeued (scheduled) packet instead of sending it, simulating
671    /// link loss for reliability tests. `0` (default) disables it — one
672    /// relaxed load per send, negligible. Shared into the send-loop task.
673    test_drop_every_n: Arc<AtomicU64>,
674    test_drop_counter: Arc<AtomicU64>,
675}
676
677impl NetRouter {
678    /// Create a new router
679    pub async fn new(config: RouterConfig) -> std::io::Result<Self> {
680        let socket = UdpSocket::bind(config.bind_addr).await?;
681        let routing_table = Arc::new(RoutingTable::new(config.local_id));
682        let scheduler = Arc::new(FairScheduler::new(
683            config.fair_quantum,
684            config.max_queue_depth,
685        ));
686
687        Ok(Self {
688            config,
689            socket: Arc::new(socket),
690            routing_table,
691            scheduler,
692            running: Arc::new(AtomicBool::new(false)),
693            packets_received: AtomicU64::new(0),
694            packets_forwarded: AtomicU64::new(0),
695            packets_local: AtomicU64::new(0),
696            packets_dropped: AtomicU64::new(0),
697            bytes_received: AtomicU64::new(0),
698            bytes_forwarded: AtomicU64::new(0),
699            total_latency_ns: AtomicU64::new(0),
700            latency_samples: AtomicU64::new(0),
701            test_drop_every_n: Arc::new(AtomicU64::new(0)),
702            test_drop_counter: Arc::new(AtomicU64::new(0)),
703        })
704    }
705
706    /// Test-only: drop every `n`th dequeued (scheduled) packet in the
707    /// send loop to simulate link loss. `0` disables. Used by reliability
708    /// / retransmit tests; has no effect on the default (`0`) path.
709    pub fn set_test_drop_every_n(&self, n: u64) {
710        self.test_drop_every_n.store(n, Ordering::Relaxed);
711    }
712
713    /// Get local address
714    pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
715        self.socket.local_addr()
716    }
717
718    /// Get routing table
719    pub fn routing_table(&self) -> &Arc<RoutingTable> {
720        &self.routing_table
721    }
722
723    /// Get the fair scheduler. Exposed so `MeshNode::open_stream` can
724    /// propagate a stream's `fairness_weight` to the forwarding path.
725    pub fn scheduler(&self) -> &Arc<FairScheduler> {
726        &self.scheduler
727    }
728
729    /// Add a route
730    pub fn add_route(&self, dest_id: u64, next_hop: SocketAddr) {
731        self.routing_table.add_route(dest_id, next_hop);
732    }
733
734    /// Remove a route
735    pub fn remove_route(&self, dest_id: u64) {
736        self.routing_table.remove_route(dest_id);
737    }
738
739    /// Route a packet (called from receive loop)
740    pub fn route_packet(&self, data: Bytes, _from: SocketAddr) -> Result<RouteAction, RouterError> {
741        let start = Instant::now();
742        let len = data.len() as u64;
743
744        self.packets_received.fetch_add(1, Ordering::Relaxed);
745        self.bytes_received.fetch_add(len, Ordering::Relaxed);
746
747        // Need at least routing header
748        if data.len() < ROUTING_HEADER_SIZE {
749            self.packets_dropped.fetch_add(1, Ordering::Relaxed);
750            return Err(RouterError::PacketTooSmall);
751        }
752
753        // Parse routing header
754        let routing_header = RoutingHeader::from_bytes(&data[..ROUTING_HEADER_SIZE])
755            .ok_or(RouterError::InvalidHeader)?;
756
757        // Extract stream ID from Net header if present
758        let stream_id = if data.len() >= ROUTING_HEADER_SIZE + HEADER_SIZE {
759            let net_header = &data[ROUTING_HEADER_SIZE..ROUTING_HEADER_SIZE + HEADER_SIZE];
760            u64::from_le_bytes(net_header[32..40].try_into().unwrap_or([0; 8]))
761        } else {
762            0
763        };
764
765        // Per-stream `record_in` is deferred until the packet
766        // either delivers locally or survives all drop checks
767        // and is queued for forward. Pre-fix the call fired
768        // here (immediately after parse), so a packet that
769        // failed loop suppression or TTL incremented BOTH
770        // `packets_in` and `packets_dropped` for the stream —
771        // double-counting against any dashboard computing
772        // `delivery rate = packets_out / packets_in` (drops
773        // inflated the denominator without affecting the
774        // numerator, masking healthy networks behind apparent
775        // delivery loss). The drop paths now call
776        // `record_drop` only; `record_in` fires once for each
777        // successful local-delivery / forward.
778
779        // Check if local delivery
780        if self.routing_table.is_local(routing_header.dest_id) {
781            self.routing_table.record_in(stream_id, len);
782            self.packets_local.fetch_add(1, Ordering::Relaxed);
783            self.record_latency(start);
784            return Ok(RouteAction::Local(data.slice(ROUTING_HEADER_SIZE..)));
785        }
786
787        // Loop suppression: if we're about to forward a packet whose
788        // `src_id` is us, we sent it earlier and it has come back via
789        // a misconfigured route or a malicious peer. Drop it locally
790        // so the only thing breaking the loop is TTL exhaustion;
791        // every looping hop wastes one queue slot and 2x bandwidth
792        // on the link. The `src_id` field is u32; `local_id` is u64
793        // (only the low 32 bits identify us on the wire).
794        if routing_header.src_id == (self.routing_table.local_id() as u32) {
795            self.packets_dropped.fetch_add(1, Ordering::Relaxed);
796            self.routing_table.record_drop(stream_id);
797            return Err(RouterError::RoutingLoop);
798        }
799
800        // Check TTL
801        if routing_header.is_expired() {
802            self.packets_dropped.fetch_add(1, Ordering::Relaxed);
803            self.routing_table.record_drop(stream_id);
804            return Err(RouterError::TtlExpired);
805        }
806
807        // Lookup next hop
808        let next_hop = self
809            .routing_table
810            .lookup(routing_header.dest_id)
811            .ok_or(RouterError::NoRoute)?;
812
813        // Update header for forwarding
814        let mut fwd_header = routing_header;
815        fwd_header.forward();
816        // Re-check expiry after `forward()` decrements the TTL: if
817        // TTL hit 0, the next hop would just drop the packet on its
818        // own `is_expired()` check — wasting one forward, bandwidth,
819        // and a queue slot per last-hop packet. Drop locally here so
820        // the next hop never sees the doomed packet.
821        if fwd_header.is_expired() {
822            self.packets_dropped.fetch_add(1, Ordering::Relaxed);
823            self.routing_table.record_drop(stream_id);
824            return Err(RouterError::TtlExpired);
825        }
826        // All drop gates passed — count this packet as
827        // successfully ingressed for the stream before queueing
828        // the forward.
829        self.routing_table.record_in(stream_id, len);
830
831        // Fast path (perf #18): if the inbound `data` is sole-
832        // owned (the typical case for UDP packets just received
833        // from the socket), `try_into_mut` returns the same
834        // allocation as a `BytesMut` in O(1). We overwrite the
835        // first ROUTING_HEADER_SIZE bytes in place — refcount
836        // bump only, no body copy. Pre-fix this allocated a
837        // fresh buffer the size of the entire packet and
838        // memcpy'd the body for every forward; for a relay node
839        // moving GB/s of packets, bandwidth-class waste.
840        //
841        // Slow path: an outstanding clone of `data` (e.g. some
842        // observer / mirror) forces the copy. We fall back to
843        // the prior allocation strategy.
844        let forwarded_data = match data.try_into_mut() {
845            Ok(mut mut_data) => {
846                fwd_header.write_at(&mut mut_data[..ROUTING_HEADER_SIZE]);
847                mut_data.freeze()
848            }
849            Err(orig_data) => {
850                let mut new_data = BytesMut::with_capacity(orig_data.len());
851                fwd_header.write_to(&mut new_data);
852                new_data.extend_from_slice(&orig_data[ROUTING_HEADER_SIZE..]);
853                new_data.freeze()
854            }
855        };
856
857        // Queue for sending
858        let packet = QueuedPacket {
859            data: forwarded_data,
860            dest: next_hop,
861            stream_id,
862            priority: routing_header.flags.is_priority(),
863            queued_at: Instant::now(),
864        };
865
866        if self.scheduler.enqueue(packet) {
867            self.packets_forwarded.fetch_add(1, Ordering::Relaxed);
868            self.bytes_forwarded.fetch_add(len, Ordering::Relaxed);
869            self.routing_table.record_out(stream_id, len);
870            self.record_latency(start);
871            Ok(RouteAction::Forwarded(next_hop))
872        } else {
873            self.packets_dropped.fetch_add(1, Ordering::Relaxed);
874            self.routing_table.record_drop(stream_id);
875            Err(RouterError::QueueFull)
876        }
877    }
878
879    /// Send a packet directly (bypassing routing)
880    pub async fn send_to(&self, data: &[u8], dest: SocketAddr) -> std::io::Result<usize> {
881        self.socket.send_to(data, dest).await
882    }
883
884    /// Receive a packet
885    pub async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
886        self.socket.recv_from(buf).await
887    }
888
889    /// Start the router (spawns send loop). Returns `None` if a
890    /// dispatch loop is already running for this router; calling
891    /// twice would otherwise spawn a second loop racing the first
892    /// one's `scheduler.dequeue()`, producing reordered or
893    /// duplicate sends.
894    pub fn start(&self) -> Option<tokio::task::JoinHandle<()>> {
895        if self
896            .running
897            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
898            .is_err()
899        {
900            return None;
901        }
902
903        let socket = self.socket.clone();
904        let scheduler = self.scheduler.clone();
905        let running = self.running.clone();
906        let drop_every_n = self.test_drop_every_n.clone();
907        let drop_counter = self.test_drop_counter.clone();
908
909        Some(tokio::spawn(async move {
910            // Phase 0 instrument: latch the arm flag once; when armed, count
911            // how many packets we drain back-to-back before blocking.
912            let measure_drain = SEND_DRAIN_ARMED.load(Ordering::Relaxed);
913            let mut drain_run: u64 = 0;
914
915            // Batched-drain scratch (Phase 1, NRPC_SEND_LOOP_BATCHING_PLAN).
916            // When the scheduler has a backlog, drain up to MAX_DRAIN packets
917            // and ship them grouped by destination — one `sendmmsg` per peer
918            // on Linux, falling back to per-packet `send_to` elsewhere. The
919            // `groups` holds one (dest, packets) slot per peer touched in a
920            // drain. Inner Vecs are reused across drains to avoid reallocating
921            // on the hot path; the slot set is bounded on reset (see below) so
922            // it can't accumulate a stale entry per peer forever under churn.
923            const MAX_DRAIN: usize = 64;
924            let mut groups: Vec<(SocketAddr, Vec<Bytes>)> = Vec::new();
925            // Linux-only batched sender over the same socket fd. The send loop
926            // is the socket's sole, single-threaded sender, so it owns one
927            // `BatchedTransport` for its whole lifetime and reuses the iovec /
928            // mmsghdr / sockaddr scratch across every flush — no per-flush
929            // allocation. (`PacketSender::send_batch` builds a fresh transport
930            // each call to stay shareable across concurrent senders; here there
931            // is exactly one sender, so we hold the reusable form directly.)
932            #[cfg(target_os = "linux")]
933            let mut batch_sender = {
934                use std::os::unix::io::AsRawFd;
935                super::linux::BatchedTransport::new_send_only(socket.as_raw_fd())
936            };
937
938            while running.load(Ordering::Acquire) {
939                // Dequeue and send
940                if let Some(first) = scheduler.dequeue() {
941                    // Count every packet `dequeue()` hands back, by design:
942                    // `drain_run` measures the scheduler's drain run-length,
943                    // not wire packets, so test-injected drops below still
944                    // count (in production `drop_every_n == 0`, so there are
945                    // none). This is deliberately distinct from the batch-flush
946                    // packet counter, which tracks only what reaches the wire.
947                    drain_run += 1;
948
949                    // Fast path: nothing else queued → send this one packet
950                    // directly, exactly as before. Keeps depth-≤1 traffic
951                    // (single scheduled stream, sparse sends) on the original
952                    // zero-overhead path. Guarded by the live `current_depth`.
953                    if scheduler.current_depth() == 0 {
954                        if !drop_injected(&drop_every_n, &drop_counter) {
955                            let _ = socket.send_to(&first.data, first.dest).await;
956                        }
957                        continue;
958                    }
959
960                    // Backlog present → batched drain. Reset the per-dest
961                    // buffers for this drain (bounded — see `reset_dest_groups`).
962                    reset_dest_groups(&mut groups, MAX_DRAIN);
963                    if !drop_injected(&drop_every_n, &drop_counter) {
964                        group_by_dest(&mut groups, first.dest, first.data);
965                    }
966                    let mut drained = 1usize;
967                    while drained < MAX_DRAIN {
968                        match scheduler.dequeue() {
969                            Some(p) => {
970                                drain_run += 1;
971                                drained += 1;
972                                if !drop_injected(&drop_every_n, &drop_counter) {
973                                    group_by_dest(&mut groups, p.dest, p.data);
974                                }
975                            }
976                            None => break,
977                        }
978                    }
979
980                    // Flush each destination group: one batched syscall per
981                    // peer on Linux, per-packet send elsewhere.
982                    for (dest, data) in groups.iter() {
983                        if data.is_empty() {
984                            continue;
985                        }
986                        #[cfg(target_os = "linux")]
987                        {
988                            // `send_batch` is a synchronous `sendmmsg` on the
989                            // socket's non-blocking fd — it returns immediately
990                            // (filling the kernel buffer, then partial /
991                            // EWOULDBLOCK), so it does NOT block the worker and
992                            // must not be moved to `spawn_blocking`. The async
993                            // `send_to` below ships any unsent tail (partial
994                            // send / EWOULDBLOCK), which re-registers the waker
995                            // so we preserve backpressure rather than dropping
996                            // or spinning.
997                            let sent = batch_sender.send_batch(data, *dest).unwrap_or(0);
998                            for d in &data[sent..] {
999                                let _ = socket.send_to(d, *dest).await;
1000                            }
1001                        }
1002                        #[cfg(not(target_os = "linux"))]
1003                        {
1004                            for d in data {
1005                                let _ = socket.send_to(d, *dest).await;
1006                            }
1007                        }
1008                        if measure_drain {
1009                            record_batch_flush(data.len() as u64);
1010                        }
1011                    }
1012                } else {
1013                    // Drain run ended — record its length, then block.
1014                    if measure_drain {
1015                        record_drain_run(drain_run);
1016                    }
1017                    drain_run = 0;
1018                    // Wait for new packets (with timeout).
1019                    //
1020                    // The `notify_one` → `wait()` pattern is
1021                    // sound — `tokio::sync::Notify`
1022                    // stores a permit when `notify_one` is called
1023                    // with no waiter, and the next `notified().await`
1024                    // consumes it immediately. So an enqueue that
1025                    // races our `dequeue() → None` transition is
1026                    // observed on the next loop iteration regardless
1027                    // of which side won the race; no notify is lost.
1028                    //
1029                    // The 1ms polling fallback is **defensive
1030                    // paranoia**, not load-bearing for correctness:
1031                    // it bounds the worst-case wakeup latency under
1032                    // any future refactor that accidentally breaks
1033                    // the permit-stash invariant. It also catches
1034                    // the edge case where a `select!` branch fires
1035                    // while `notified()` is still in its
1036                    // pre-sleep registration window, dropping the
1037                    // notified future without consuming the permit.
1038                    // Cost: at most one wakeup per millisecond on
1039                    // an idle router; negligible.
1040                    //
1041                    // The audit suggested `Notify::notify_waiters`
1042                    // — that's wrong for this site: `notify_waiters`
1043                    // does NOT store a permit when no one is
1044                    // waiting, so it would re-introduce the genuine
1045                    // lost-wakeup case the permit-stash semantics
1046                    // close.
1047                    tokio::select! {
1048                        _ = scheduler.wait() => {}
1049                        _ = tokio::time::sleep(Duration::from_millis(1)) => {}
1050                    }
1051                }
1052            }
1053        }))
1054    }
1055
1056    /// Stop the router
1057    pub fn stop(&self) {
1058        self.running.store(false, Ordering::Release);
1059    }
1060
1061    /// Check if running
1062    pub fn is_running(&self) -> bool {
1063        self.running.load(Ordering::Acquire)
1064    }
1065
1066    /// Get statistics
1067    pub fn stats(&self) -> RouterStats {
1068        let samples = self.latency_samples.load(Ordering::Relaxed);
1069        let total_latency = self.total_latency_ns.load(Ordering::Relaxed);
1070        let avg_latency = total_latency.checked_div(samples).unwrap_or(0);
1071
1072        RouterStats {
1073            packets_received: self.packets_received.load(Ordering::Relaxed),
1074            packets_forwarded: self.packets_forwarded.load(Ordering::Relaxed),
1075            packets_local: self.packets_local.load(Ordering::Relaxed),
1076            packets_dropped: self.packets_dropped.load(Ordering::Relaxed),
1077            bytes_received: self.bytes_received.load(Ordering::Relaxed),
1078            bytes_forwarded: self.bytes_forwarded.load(Ordering::Relaxed),
1079            routes: self.routing_table.route_count(),
1080            streams: self.routing_table.stream_count(),
1081            avg_latency_ns: avg_latency,
1082        }
1083    }
1084
1085    /// Reset statistics
1086    pub fn reset_stats(&self) {
1087        self.packets_received.store(0, Ordering::Relaxed);
1088        self.packets_forwarded.store(0, Ordering::Relaxed);
1089        self.packets_local.store(0, Ordering::Relaxed);
1090        self.packets_dropped.store(0, Ordering::Relaxed);
1091        self.bytes_received.store(0, Ordering::Relaxed);
1092        self.bytes_forwarded.store(0, Ordering::Relaxed);
1093        self.total_latency_ns.store(0, Ordering::Relaxed);
1094        self.latency_samples.store(0, Ordering::Relaxed);
1095    }
1096
1097    fn record_latency(&self, start: Instant) {
1098        let latency = start.elapsed().as_nanos() as u64;
1099        self.total_latency_ns.fetch_add(latency, Ordering::Relaxed);
1100        self.latency_samples.fetch_add(1, Ordering::Relaxed);
1101    }
1102}
1103
1104/// Result of routing a packet
1105#[derive(Debug)]
1106pub enum RouteAction {
1107    /// Packet is for local delivery
1108    Local(Bytes),
1109    /// Packet was forwarded to next hop
1110    Forwarded(SocketAddr),
1111}
1112
1113/// Router errors
1114#[derive(Debug, Clone, PartialEq, Eq)]
1115pub enum RouterError {
1116    /// Packet too small
1117    PacketTooSmall,
1118    /// Invalid routing header
1119    InvalidHeader,
1120    /// TTL expired
1121    TtlExpired,
1122    /// No route to destination
1123    NoRoute,
1124    /// Queue full
1125    QueueFull,
1126    /// Source is this node — packet looped back via a misconfigured
1127    /// route or hostile peer.
1128    RoutingLoop,
1129}
1130
1131impl std::fmt::Display for RouterError {
1132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1133        match self {
1134            Self::PacketTooSmall => write!(f, "packet too small"),
1135            Self::InvalidHeader => write!(f, "invalid routing header"),
1136            Self::TtlExpired => write!(f, "TTL expired"),
1137            Self::NoRoute => write!(f, "no route to destination"),
1138            Self::QueueFull => write!(f, "queue full"),
1139            Self::RoutingLoop => write!(f, "routing loop (packet returned to its source)"),
1140        }
1141    }
1142}
1143
1144impl std::error::Error for RouterError {}
1145
1146#[cfg(test)]
1147mod tests {
1148    use super::super::protocol::NetHeader;
1149    use super::*;
1150
1151    #[test]
1152    fn test_fair_scheduler_basic() {
1153        let scheduler = FairScheduler::new(2, 16);
1154
1155        // Enqueue packets from different streams
1156        for stream in 0..3 {
1157            for _ in 0..4 {
1158                let packet = QueuedPacket {
1159                    data: Bytes::from(vec![0u8; 64]),
1160                    dest: "127.0.0.1:9000".parse().unwrap(),
1161                    stream_id: stream,
1162                    priority: false,
1163                    queued_at: Instant::now(),
1164                };
1165                assert!(scheduler.enqueue(packet));
1166            }
1167        }
1168
1169        assert_eq!(scheduler.stream_count(), 3);
1170        assert_eq!(scheduler.total_queued(), 12);
1171
1172        // Dequeue should round-robin
1173        let mut stream_order = Vec::new();
1174        while let Some(packet) = scheduler.dequeue() {
1175            stream_order.push(packet.stream_id);
1176        }
1177
1178        // Should have processed all 12 packets
1179        assert_eq!(stream_order.len(), 12);
1180
1181        // Check fairness: each stream should get ~4 packets
1182        let mut counts = [0; 3];
1183        for stream in stream_order {
1184            counts[stream as usize] += 1;
1185        }
1186        assert_eq!(counts, [4, 4, 4]);
1187    }
1188
1189    #[test]
1190    fn current_depth_tracks_live_backlog_and_returns_to_zero() {
1191        // `total_queued` is cumulative; `current_depth` must reflect the
1192        // live backlog (enqueued − dequeued) across the priority lane and
1193        // per-stream queues, and a rejected enqueue must not bump it.
1194        let scheduler = FairScheduler::new(2, 4); // per-queue cap = 4
1195        let mk = |stream_id: u64, priority: bool| QueuedPacket {
1196            data: Bytes::from(vec![0u8; 32]),
1197            dest: "127.0.0.1:9000".parse().unwrap(),
1198            stream_id,
1199            priority,
1200            queued_at: Instant::now(),
1201        };
1202
1203        assert_eq!(scheduler.current_depth(), 0);
1204
1205        // 3 on a stream + 2 on the priority lane → depth 5.
1206        for _ in 0..3 {
1207            assert!(scheduler.enqueue(mk(1, false)));
1208        }
1209        for _ in 0..2 {
1210            assert!(scheduler.enqueue(mk(0, true)));
1211        }
1212        assert_eq!(scheduler.current_depth(), 5);
1213
1214        // Overflow the priority lane (cap 4): two pushes succeed (already
1215        // had 2), the 5th is rejected and must NOT change depth.
1216        assert!(scheduler.enqueue(mk(0, true)));
1217        assert!(scheduler.enqueue(mk(0, true)));
1218        assert_eq!(scheduler.current_depth(), 7);
1219        assert!(!scheduler.enqueue(mk(0, true))); // priority lane full
1220        assert_eq!(
1221            scheduler.current_depth(),
1222            7,
1223            "rejected enqueue bumped depth"
1224        );
1225
1226        // Drain everything → back to 0.
1227        let mut drained = 0;
1228        while scheduler.dequeue().is_some() {
1229            drained += 1;
1230        }
1231        assert_eq!(drained, 7);
1232        assert_eq!(scheduler.current_depth(), 0, "depth did not return to zero");
1233
1234        // total_queued stays cumulative (7 successful enqueues), unchanged
1235        // by draining — the contrast the new counter exists to provide.
1236        assert_eq!(scheduler.total_queued(), 7);
1237    }
1238
1239    #[test]
1240    fn group_by_dest_partitions_preserving_per_dest_order() {
1241        // The batched drain groups a mixed-destination run into one bucket
1242        // per peer (send_batch is single-target). This pins the two
1243        // invariants the send loop relies on: (1) packets to the same peer
1244        // keep dequeue order; (2) the reuse-clear pattern empties the inner
1245        // vecs while keeping the dest slots for the next drain.
1246        let a: SocketAddr = "127.0.0.1:1".parse().unwrap();
1247        let b: SocketAddr = "127.0.0.1:2".parse().unwrap();
1248        let c: SocketAddr = "127.0.0.1:3".parse().unwrap();
1249        let mk = |n: u8| Bytes::from(vec![n]);
1250
1251        let mut groups: Vec<(SocketAddr, Vec<Bytes>)> = Vec::new();
1252        // Interleaved across three peers.
1253        group_by_dest(&mut groups, a, mk(1));
1254        group_by_dest(&mut groups, b, mk(2));
1255        group_by_dest(&mut groups, a, mk(3));
1256        group_by_dest(&mut groups, c, mk(4));
1257        group_by_dest(&mut groups, b, mk(5));
1258
1259        assert_eq!(groups.len(), 3, "one group per distinct peer");
1260        // Slots appear in first-seen order; packets within a peer keep order.
1261        assert_eq!(groups[0], (a, vec![mk(1), mk(3)]));
1262        assert_eq!(groups[1], (b, vec![mk(2), mk(5)]));
1263        assert_eq!(groups[2], (c, vec![mk(4)]));
1264
1265        // Reuse: clear inner vecs (keep slots), regroup one peer.
1266        for (_, v) in groups.iter_mut() {
1267            v.clear();
1268        }
1269        group_by_dest(&mut groups, c, mk(9));
1270        assert_eq!(groups.len(), 3, "dest slots persist across drains");
1271        assert!(groups[0].1.is_empty() && groups[1].1.is_empty());
1272        assert_eq!(
1273            groups[2].1,
1274            vec![mk(9)],
1275            "existing slot reused, not duplicated"
1276        );
1277    }
1278
1279    #[test]
1280    fn reset_dest_groups_stays_bounded_under_peer_churn() {
1281        // Regression for the unbounded dest-slot cache: simulate many drains,
1282        // each to a brand-new peer (worst-case churn). Without the cap the slot
1283        // set would grow to one entry per peer ever seen (here 500); the
1284        // bounded reset must keep it at `cap + 1`.
1285        const CAP: usize = 64;
1286        let mut groups: Vec<(SocketAddr, Vec<Bytes>)> = Vec::new();
1287        let mut max_len = 0usize;
1288        for port in 0u16..500 {
1289            reset_dest_groups(&mut groups, CAP);
1290            let dest: SocketAddr = format!("127.0.0.1:{}", port + 1).parse().unwrap();
1291            group_by_dest(&mut groups, dest, Bytes::from_static(b"x"));
1292            max_len = max_len.max(groups.len());
1293        }
1294        assert!(
1295            max_len <= CAP + 1,
1296            "dest-group set must stay bounded under churn; peaked at {max_len}",
1297        );
1298    }
1299
1300    #[test]
1301    fn test_fair_scheduler_priority() {
1302        let scheduler = FairScheduler::new(2, 16);
1303
1304        // Enqueue normal packets
1305        for _ in 0..4 {
1306            let packet = QueuedPacket {
1307                data: Bytes::from(vec![0u8; 64]),
1308                dest: "127.0.0.1:9000".parse().unwrap(),
1309                stream_id: 0,
1310                priority: false,
1311                queued_at: Instant::now(),
1312            };
1313            scheduler.enqueue(packet);
1314        }
1315
1316        // Enqueue priority packet
1317        let priority = QueuedPacket {
1318            data: Bytes::from(vec![1u8; 64]),
1319            dest: "127.0.0.1:9000".parse().unwrap(),
1320            stream_id: 1,
1321            priority: true,
1322            queued_at: Instant::now(),
1323        };
1324        scheduler.enqueue(priority);
1325
1326        // Priority should come first
1327        let first = scheduler.dequeue().unwrap();
1328        assert_eq!(first.data[0], 1);
1329        assert!(first.priority);
1330    }
1331
1332    #[test]
1333    fn test_fair_scheduler_no_starvation() {
1334        // Regression: dequeue() always started iterating from the beginning
1335        // of the DashMap, so streams appearing earlier in iteration order
1336        // were systematically preferred, starving later streams.
1337        //
1338        // With the rotation fix, each dequeue() starts from a different
1339        // position, so all streams should get roughly equal service.
1340        let quantum = 1;
1341        let scheduler = FairScheduler::new(quantum, 64);
1342
1343        // Use many streams to make DashMap iteration-order bias visible
1344        let num_streams = 8u64;
1345        let packets_per_stream = 20;
1346
1347        for stream in 0..num_streams {
1348            for _ in 0..packets_per_stream {
1349                let packet = QueuedPacket {
1350                    data: Bytes::from(vec![stream as u8; 1]),
1351                    dest: "127.0.0.1:9000".parse().unwrap(),
1352                    stream_id: stream,
1353                    priority: false,
1354                    queued_at: Instant::now(),
1355                };
1356                scheduler.enqueue(packet);
1357            }
1358        }
1359
1360        // Dequeue all packets and track which stream each came from
1361        let mut first_half_counts = vec![0u32; num_streams as usize];
1362        let total = num_streams * packets_per_stream as u64;
1363        let half = total / 2;
1364
1365        for i in 0..total {
1366            let packet = scheduler.dequeue().unwrap();
1367            if i < half {
1368                first_half_counts[packet.stream_id as usize] += 1;
1369            }
1370        }
1371
1372        // In the first half of dequeues, every stream should have been served
1373        // at least once. Without rotation, some streams could get 0 service.
1374        for (stream, &count) in first_half_counts.iter().enumerate() {
1375            assert!(
1376                count > 0,
1377                "stream {} was starved in the first half of dequeues ({} of {} packets)",
1378                stream,
1379                count,
1380                half
1381            );
1382        }
1383    }
1384
1385    #[tokio::test]
1386    async fn test_router_creation() {
1387        let config = RouterConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
1388        let router = NetRouter::new(config).await.unwrap();
1389
1390        assert!(!router.is_running());
1391        assert_eq!(router.stats().routes, 0);
1392    }
1393
1394    /// `start()` must spawn at most one dispatch loop. A second
1395    /// call while the first loop is still running would race the
1396    /// scheduler's `dequeue` and produce reordered or duplicate
1397    /// sends.
1398    #[tokio::test]
1399    async fn start_is_idempotent_returns_none_when_already_running() {
1400        let config = RouterConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
1401        let router = NetRouter::new(config).await.unwrap();
1402
1403        let first = router.start();
1404        assert!(first.is_some(), "first start() should spawn a loop");
1405        assert!(router.is_running());
1406
1407        let second = router.start();
1408        assert!(
1409            second.is_none(),
1410            "second start() while running must NOT spawn a duplicate loop",
1411        );
1412
1413        // After stop(), the first loop exits and a fresh start()
1414        // is allowed again.
1415        router.stop();
1416        // Give the loop a tick to observe `running == false`.
1417        if let Some(h) = first {
1418            let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1419        }
1420
1421        let third = router.start();
1422        assert!(
1423            third.is_some(),
1424            "start() after stop() should be allowed to spawn again",
1425        );
1426        router.stop();
1427        if let Some(h) = third {
1428            let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1429        }
1430    }
1431
1432    #[tokio::test]
1433    async fn test_router_routing_table() {
1434        let config = RouterConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
1435        let router = NetRouter::new(config).await.unwrap();
1436
1437        let dest: SocketAddr = "127.0.0.1:9001".parse().unwrap();
1438        router.add_route(0x5678, dest);
1439
1440        assert_eq!(router.routing_table().lookup(0x5678), Some(dest));
1441        assert_eq!(router.stats().routes, 1);
1442    }
1443
1444    #[tokio::test]
1445    async fn test_router_extracts_stream_id_at_correct_offset() {
1446        // Regression: stream_id was read from bytes 8..16 (inside the nonce)
1447        // instead of bytes 40..48 where it actually lives in the Net header.
1448        let local_id = 0x1234u64;
1449        let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1450        let router = NetRouter::new(config).await.unwrap();
1451
1452        let expected_stream_id: u64 = 0xDEAD_BEEF_CAFE_BABEu64;
1453
1454        // Build routing header pointing to local_id so we get RouteAction::Local
1455        let routing = RoutingHeader::new(local_id, 0x5678, 4);
1456        let routing_bytes = routing.to_bytes();
1457
1458        // Build a Net header with a known stream_id
1459        let net = NetHeader::new(
1460            0xAAAA,             // session_id
1461            expected_stream_id, // stream_id
1462            1,                  // sequence
1463            [0u8; 12],          // nonce
1464            0,                  // payload_len
1465            0,                  // event_count
1466            super::super::protocol::PacketFlags::NONE,
1467        );
1468        let net_bytes = net.to_bytes();
1469
1470        // Concatenate routing header + Net header
1471        let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1472        packet.extend_from_slice(&routing_bytes);
1473        packet.extend_from_slice(&net_bytes);
1474
1475        let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1476        let _ = router.route_packet(packet.freeze(), from);
1477
1478        // The stream stats should be keyed by the correct stream_id
1479        let stats = router
1480            .routing_table()
1481            .get_stream_stats(expected_stream_id)
1482            .expect("stream stats recorded for the routed packet");
1483        assert_eq!(
1484            stats.get_packets_in(),
1485            1,
1486            "stream stats should record 1 packet for stream_id 0x{:X}",
1487            expected_stream_id
1488        );
1489    }
1490
1491    /// Pin: a packet whose `src_id` matches this router's
1492    /// `local_id` must be dropped immediately on receipt with
1493    /// `RoutingLoop` instead of being forwarded again. Pre-fix
1494    /// the router happily looped the packet back along the
1495    /// route, only breaking the cycle on TTL exhaustion (so
1496    /// every loop wasted up to 64 hops × bandwidth before
1497    /// dying).
1498    #[tokio::test]
1499    async fn route_packet_drops_when_src_id_is_local() {
1500        let local_id = 0x1234u64;
1501        let dest_id = 0x9999u64;
1502        let dest_addr: SocketAddr = "127.0.0.2:6000".parse().unwrap();
1503
1504        let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1505        let router = NetRouter::new(config).await.unwrap();
1506        router.routing_table().add_route(dest_id, dest_addr);
1507
1508        // RoutingHeader stores src_id as u32 — the low 32 bits of
1509        // the local node id are what identifies us on the wire.
1510        let routing = RoutingHeader::new(dest_id, local_id as u32, 16);
1511        let routing_bytes = routing.to_bytes();
1512
1513        let net = NetHeader::new(
1514            0xAAAA,
1515            0xBEEF,
1516            1,
1517            [0u8; 12],
1518            0,
1519            0,
1520            super::super::protocol::PacketFlags::NONE,
1521        );
1522        let net_bytes = net.to_bytes();
1523
1524        let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1525        packet.extend_from_slice(&routing_bytes);
1526        packet.extend_from_slice(&net_bytes);
1527
1528        let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1529        let result = router.route_packet(packet.freeze(), from);
1530        match result {
1531            Err(RouterError::RoutingLoop) => {}
1532            other => panic!(
1533                "expected RoutingLoop for src_id == local_id, got {:?}",
1534                other
1535            ),
1536        }
1537
1538        // Counters: dropped += 1, forwarded unchanged.
1539        let stats = router.stats();
1540        assert_eq!(
1541            stats.packets_dropped, 1,
1542            "looping packet must increment packets_dropped"
1543        );
1544        assert_eq!(
1545            stats.packets_forwarded, 0,
1546            "looping packet must NOT be forwarded"
1547        );
1548    }
1549
1550    /// Regression for BUG_AUDIT_2026_04_30_CORE.md #119: pre-fix
1551    /// `route_packet` decremented TTL via `forward()` and queued
1552    /// the packet for the next hop without checking whether the
1553    /// post-decrement TTL was 0. The next hop received a TTL=0
1554    /// packet and dropped it on its own `is_expired()` check —
1555    /// wasting one forward + bandwidth + queue slot per
1556    /// last-hop packet. Post-fix the router drops here with
1557    /// `TtlExpired` if `forward()` made the TTL 0.
1558    #[tokio::test]
1559    async fn route_packet_drops_when_forward_makes_ttl_zero() {
1560        let local_id = 0x1234u64;
1561        let dest_id = 0x9999u64;
1562        let dest_addr: SocketAddr = "127.0.0.2:6000".parse().unwrap();
1563
1564        let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1565        let router = NetRouter::new(config).await.unwrap();
1566        router.routing_table().add_route(dest_id, dest_addr);
1567
1568        // TTL = 1 — `forward()` will decrement to 0. Pre-fix
1569        // this would still queue the packet (RouteAction::Forwarded);
1570        // post-fix it's dropped with TtlExpired.
1571        let routing = RoutingHeader::new(dest_id, 0x5678, 1);
1572        let routing_bytes = routing.to_bytes();
1573
1574        let net = NetHeader::new(
1575            0xAAAA,
1576            0xBEEF,
1577            1,
1578            [0u8; 12],
1579            0,
1580            0,
1581            super::super::protocol::PacketFlags::NONE,
1582        );
1583        let net_bytes = net.to_bytes();
1584
1585        let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1586        packet.extend_from_slice(&routing_bytes);
1587        packet.extend_from_slice(&net_bytes);
1588
1589        let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1590        let result = router.route_packet(packet.freeze(), from);
1591
1592        match result {
1593            Err(RouterError::TtlExpired) => {} // expected
1594            Ok(action) => panic!(
1595                "post-fix must drop with TtlExpired, not forward (got {:?})",
1596                action
1597            ),
1598            Err(e) => panic!("unexpected error: {:?}", e),
1599        }
1600    }
1601
1602    /// Sanity: a TTL=2 packet should still forward (decrements
1603    /// to 1 — non-zero, next hop accepts).
1604    #[tokio::test]
1605    async fn route_packet_forwards_when_ttl_remains_positive_after_decrement() {
1606        let local_id = 0x1234u64;
1607        let dest_id = 0x9999u64;
1608        let dest_addr: SocketAddr = "127.0.0.2:6000".parse().unwrap();
1609
1610        let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1611        let router = NetRouter::new(config).await.unwrap();
1612        router.routing_table().add_route(dest_id, dest_addr);
1613
1614        let routing = RoutingHeader::new(dest_id, 0x5678, 2);
1615        let routing_bytes = routing.to_bytes();
1616        let net = NetHeader::new(
1617            0xAAAA,
1618            0xBEEF,
1619            1,
1620            [0u8; 12],
1621            0,
1622            0,
1623            super::super::protocol::PacketFlags::NONE,
1624        );
1625        let net_bytes = net.to_bytes();
1626        let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1627        packet.extend_from_slice(&routing_bytes);
1628        packet.extend_from_slice(&net_bytes);
1629        let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1630        let result = router.route_packet(packet.freeze(), from);
1631        match result {
1632            Ok(RouteAction::Forwarded(addr)) => assert_eq!(addr, dest_addr),
1633            other => panic!("expected Forwarded, got {:?}", other),
1634        }
1635    }
1636
1637    /// Regression: a packet that fails any drop gate (loop
1638    /// suppression, pre-decrement TTL, post-decrement TTL) must
1639    /// NOT increment the per-stream `packets_in` counter — only
1640    /// `packets_dropped`. Pre-fix `record_in` fired immediately
1641    /// after parse, so a dropped packet incremented BOTH stream
1642    /// counters; a dashboard computing `delivery_rate =
1643    /// packets_out / packets_in` would see drops inflate the
1644    /// denominator without affecting the numerator, masking
1645    /// healthy networks behind apparent delivery loss.
1646    ///
1647    /// This pins the post-decrement TTL drop path (the audit's
1648    /// stated trigger), but the structural change applies to
1649    /// every drop path; the per-stream invariant is now
1650    /// "packets_in counts only delivered or forwarded packets."
1651    #[tokio::test]
1652    async fn ttl_drop_does_not_double_count_packets_in_for_stream() {
1653        let local_id = 0x1234u64;
1654        let dest_id = 0x9999u64;
1655        let dest_addr: SocketAddr = "127.0.0.2:6000".parse().unwrap();
1656
1657        let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1658        let router = NetRouter::new(config).await.unwrap();
1659        router.routing_table().add_route(dest_id, dest_addr);
1660
1661        // TTL = 1 hits the post-decrement drop path. Same
1662        // setup as `route_packet_drops_when_forward_makes_ttl_zero`.
1663        let routing = RoutingHeader::new(dest_id, 0x5678, 1);
1664        let routing_bytes = routing.to_bytes();
1665        // NetHeader::new params: (session_id, stream_id, sequence, nonce, payload_len, event_count, flags)
1666        let session_id = 0xAAAAu64;
1667        let stream_id = 0x4242u64;
1668        let net = NetHeader::new(
1669            session_id,
1670            stream_id,
1671            1,
1672            [0u8; 12],
1673            0,
1674            0,
1675            super::super::protocol::PacketFlags::NONE,
1676        );
1677        let net_bytes = net.to_bytes();
1678        let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1679        packet.extend_from_slice(&routing_bytes);
1680        packet.extend_from_slice(&net_bytes);
1681        let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1682
1683        let result = router.route_packet(packet.freeze(), from);
1684        assert!(matches!(result, Err(RouterError::TtlExpired)));
1685
1686        let stats = router
1687            .routing_table()
1688            .get_stream_stats(stream_id)
1689            .expect("stream stats present for the dropped packet");
1690        assert_eq!(
1691            stats.get_packets_in(),
1692            0,
1693            "regression: a dropped packet must NOT increment per-stream \
1694             packets_in. Pre-fix this counter was incremented before \
1695             the TTL/loop checks, so drops double-counted as both \
1696             ingressed and dropped — masking delivery rate dashboards."
1697        );
1698        assert_eq!(
1699            stats.get_drops(),
1700            1,
1701            "drop must still increment per-stream packets_dropped"
1702        );
1703    }
1704
1705    #[test]
1706    fn test_regression_fair_scheduler_cleanup_called() {
1707        // Regression: FairScheduler never removed empty stream queues from its
1708        // DashMap. After many unique stream_ids passed through, the map grew
1709        // without bound, causing O(n) iteration in dequeue() where n is the
1710        // number of *ever-seen* streams rather than *active* streams. This
1711        // degraded dequeue latency over time.
1712        //
1713        // Fix: dequeue() calls cleanup_empty() every 1024 iterations,
1714        // removing streams whose queues have been fully drained.
1715        let scheduler = FairScheduler::new(4, 16);
1716
1717        // Enqueue packets across many unique streams
1718        let num_streams = 200u64;
1719        for stream in 0..num_streams {
1720            let packet = QueuedPacket {
1721                data: Bytes::from(vec![0u8; 8]),
1722                dest: "127.0.0.1:9000".parse().unwrap(),
1723                stream_id: stream,
1724                priority: false,
1725                queued_at: Instant::now(),
1726            };
1727            assert!(scheduler.enqueue(packet));
1728        }
1729
1730        assert_eq!(scheduler.stream_count(), num_streams as usize);
1731
1732        // Drain all packets
1733        let mut drained = 0;
1734        while scheduler.dequeue().is_some() {
1735            drained += 1;
1736        }
1737        assert_eq!(drained, num_streams as usize);
1738
1739        // The cleanup triggers every 1024 dequeue calls. We need enough
1740        // dequeue calls (even returning None) to cross the 1024 boundary.
1741        // We already did `num_streams` dequeues above; do more no-op
1742        // dequeues to push past the threshold.
1743        for _ in 0..(1025 - drained) {
1744            let _ = scheduler.dequeue();
1745        }
1746
1747        // After cleanup, all empty stream queues should have been removed
1748        assert_eq!(
1749            scheduler.stream_count(),
1750            0,
1751            "empty stream queues must be cleaned up after enough dequeue \
1752             iterations to prevent unbounded DashMap growth"
1753        );
1754    }
1755
1756    #[test]
1757    fn test_regression_scheduler_sees_streams_added_after_quantum_exhaustion() {
1758        // Regression: dequeue() collected stream keys once, then reused
1759        // the stale snapshot for the retry loop after quantum reset.
1760        // Streams added between the two loops were invisible until the
1761        // next dequeue() call, causing extra latency.
1762        //
1763        // Fix: re-collect keys before the retry loop.
1764        let scheduler = FairScheduler::new(1, 16);
1765
1766        // Stream 0 with 1 packet (quantum = 1, so first pass drains it)
1767        scheduler.enqueue(QueuedPacket {
1768            data: Bytes::from_static(b"s0"),
1769            dest: "127.0.0.1:9000".parse().unwrap(),
1770            stream_id: 0,
1771            priority: false,
1772            queued_at: Instant::now(),
1773        });
1774
1775        // Drain stream 0's quantum
1776        let pkt = scheduler.dequeue().unwrap();
1777        assert_eq!(pkt.stream_id, 0);
1778
1779        // Now add stream 1 while the scheduler is "between rounds"
1780        scheduler.enqueue(QueuedPacket {
1781            data: Bytes::from_static(b"s1"),
1782            dest: "127.0.0.1:9000".parse().unwrap(),
1783            stream_id: 1,
1784            priority: false,
1785            queued_at: Instant::now(),
1786        });
1787
1788        // Next dequeue should find stream 1
1789        let pkt = scheduler.dequeue().unwrap();
1790        assert_eq!(
1791            pkt.stream_id, 1,
1792            "newly added stream should be visible after quantum reset"
1793        );
1794    }
1795
1796    /// PERF_AUDIT §3.1 regression: the active-stream snapshot
1797    /// must mutate exactly when the stream set does:
1798    ///   - enqueue → new stream: rebuild (ArcSwap pointer changes).
1799    ///   - enqueue → existing stream: NO rebuild (pointer stable).
1800    ///   - set_stream_weight → new stream: rebuild.
1801    ///   - set_stream_weight → existing stream weight change: NO rebuild.
1802    ///   - cleanup_empty actually removes something: rebuild.
1803    ///   - cleanup_empty with nothing to remove: NO rebuild.
1804    ///
1805    /// Pre-fix dequeue collected a fresh Vec<u64> from the DashMap
1806    /// on EVERY call (twice on the quantum-reset path). A regression
1807    /// that drops the snapshot-store from any of the three mutation
1808    /// sites would silently leave dequeue iterating a stale set.
1809    #[test]
1810    fn active_stream_snapshot_tracks_stream_set_changes() {
1811        let scheduler = FairScheduler::new(2, 16);
1812        let initial_ptr = Arc::as_ptr(&scheduler.active_streams.load_full());
1813        assert!(scheduler.active_streams.load_full().is_empty());
1814
1815        // enqueue → new stream → rebuild.
1816        scheduler.enqueue(QueuedPacket {
1817            data: Bytes::from_static(b"a"),
1818            dest: "127.0.0.1:9000".parse().unwrap(),
1819            stream_id: 7,
1820            priority: false,
1821            queued_at: Instant::now(),
1822        });
1823        let after_new = scheduler.active_streams.load_full();
1824        assert_ne!(
1825            Arc::as_ptr(&after_new),
1826            initial_ptr,
1827            "vacant-arm enqueue must swap a fresh snapshot in"
1828        );
1829        assert_eq!(&*after_new, &[7u64]);
1830
1831        // enqueue → existing stream → snapshot stable.
1832        let before_existing = Arc::as_ptr(&scheduler.active_streams.load_full());
1833        scheduler.enqueue(QueuedPacket {
1834            data: Bytes::from_static(b"b"),
1835            dest: "127.0.0.1:9000".parse().unwrap(),
1836            stream_id: 7,
1837            priority: false,
1838            queued_at: Instant::now(),
1839        });
1840        assert_eq!(
1841            Arc::as_ptr(&scheduler.active_streams.load_full()),
1842            before_existing,
1843            "occupied-arm enqueue must NOT rebuild the snapshot — a \
1844             regression here defeats the §3.1 fix's steady-state win"
1845        );
1846
1847        // set_stream_weight → new stream → rebuild.
1848        let before_weight_new = Arc::as_ptr(&scheduler.active_streams.load_full());
1849        scheduler.set_stream_weight(11, 3);
1850        let after_weight_new = scheduler.active_streams.load_full();
1851        assert_ne!(
1852            Arc::as_ptr(&after_weight_new),
1853            before_weight_new,
1854            "set_stream_weight on a new stream must rebuild the snapshot"
1855        );
1856        let mut sorted: Vec<u64> = after_weight_new.iter().copied().collect();
1857        sorted.sort_unstable();
1858        assert_eq!(sorted, vec![7u64, 11]);
1859
1860        // set_stream_weight → existing stream → stable.
1861        let before_weight_existing = Arc::as_ptr(&scheduler.active_streams.load_full());
1862        scheduler.set_stream_weight(11, 5);
1863        assert_eq!(
1864            Arc::as_ptr(&scheduler.active_streams.load_full()),
1865            before_weight_existing,
1866            "set_stream_weight on existing stream must NOT rebuild"
1867        );
1868
1869        // cleanup_empty with nothing to remove (stream 7 has pending
1870        // packets) → stable.
1871        let before_clean_noop = Arc::as_ptr(&scheduler.active_streams.load_full());
1872        let removed = scheduler.cleanup_empty();
1873        assert_eq!(
1874            removed, 1,
1875            "stream 11 has no packets and only DashMap refs it"
1876        );
1877        let after_clean = scheduler.active_streams.load_full();
1878        assert_ne!(
1879            Arc::as_ptr(&after_clean),
1880            before_clean_noop,
1881            "cleanup_empty that actually removed an entry MUST rebuild"
1882        );
1883        assert_eq!(&*after_clean, &[7u64]);
1884
1885        // cleanup_empty with nothing removable (stream 7 still has
1886        // packets queued) → stable pointer.
1887        let before_clean_stable = Arc::as_ptr(&scheduler.active_streams.load_full());
1888        let removed = scheduler.cleanup_empty();
1889        assert_eq!(removed, 0);
1890        assert_eq!(
1891            Arc::as_ptr(&scheduler.active_streams.load_full()),
1892            before_clean_stable,
1893            "cleanup_empty that removed nothing must NOT rebuild"
1894        );
1895    }
1896
1897    /// PERF_AUDIT §3.1 lost-wakeup regression: concurrent snapshot
1898    /// rebuilds (enqueue vacant-arm racing cleanup_empty's rebuild)
1899    /// must never leave the snapshot out of sync with the stream
1900    /// map once activity quiesces. Without the version-counter
1901    /// retry, a rebuild that collected keys BEFORE a concurrent
1902    /// insert could store AFTER the inserting thread's own rebuild,
1903    /// wiping the new stream from the snapshot; with no further
1904    /// stream-set change the stream's packets would be stranded
1905    /// forever (occupied-arm enqueues don't rebuild, and cleanup
1906    /// never evicts a non-empty queue). This hammers the race and
1907    /// asserts the quiescent invariant snapshot == live stream set.
1908    #[test]
1909    fn active_stream_snapshot_survives_concurrent_rebuild_races() {
1910        use std::thread;
1911
1912        let scheduler = Arc::new(FairScheduler::new(2, 16));
1913        let mut handles = Vec::new();
1914
1915        // Writers: continuously create fresh streams (vacant-arm
1916        // rebuilds) and drain packets so queues trend toward empty
1917        // and stay eligible for cleanup eviction (cleanup rebuilds).
1918        for t in 0..4u64 {
1919            let sched = Arc::clone(&scheduler);
1920            handles.push(thread::spawn(move || {
1921                for i in 0..500u64 {
1922                    let stream_id = t * 100_000 + i;
1923                    sched.enqueue(QueuedPacket {
1924                        data: Bytes::from_static(b"x"),
1925                        dest: "127.0.0.1:9000".parse().unwrap(),
1926                        stream_id,
1927                        priority: false,
1928                        queued_at: Instant::now(),
1929                    });
1930                    let _ = sched.dequeue();
1931                }
1932            }));
1933        }
1934
1935        // Dedicated cleanup hammer: maximizes rebuild/insert overlap.
1936        {
1937            let sched = Arc::clone(&scheduler);
1938            handles.push(thread::spawn(move || {
1939                for _ in 0..2_000 {
1940                    sched.cleanup_empty();
1941                }
1942            }));
1943        }
1944
1945        for h in handles {
1946            h.join().unwrap();
1947        }
1948
1949        // Quiescent invariant: snapshot == live stream set. A lost
1950        // rebuild leaves a stream in the map (with packets queued)
1951        // that the snapshot — and therefore dequeue — never sees.
1952        let mut snapshot: Vec<u64> = scheduler
1953            .active_streams
1954            .load_full()
1955            .iter()
1956            .copied()
1957            .collect();
1958        snapshot.sort_unstable();
1959        let mut live: Vec<u64> = scheduler.streams.iter().map(|e| *e.key()).collect();
1960        live.sort_unstable();
1961        assert_eq!(
1962            snapshot, live,
1963            "active-stream snapshot diverged from the stream map after \
1964             concurrent enqueue/cleanup churn — version-counter retry \
1965             protocol failed to repair a racing rebuild"
1966        );
1967    }
1968
1969    /// Fairness weight: a weight-4 stream should ship 4× the packets per
1970    /// round of a weight-1 stream. With both queues full and one full
1971    /// round of dequeues, we should see ~4:1 ratio before any round
1972    /// reset fires.
1973    #[test]
1974    fn test_fair_scheduler_respects_stream_weight() {
1975        // Base quantum = 1: every stream with weight 1 gets 1 packet per
1976        // round; a weight-4 stream gets 4 packets per round.
1977        let scheduler = FairScheduler::new(1, 64);
1978
1979        scheduler.set_stream_weight(1, 1);
1980        scheduler.set_stream_weight(2, 4);
1981
1982        let dest: SocketAddr = "127.0.0.1:9999".parse().unwrap();
1983        // Fill both streams with 8 packets each.
1984        for stream_id in [1u64, 2u64] {
1985            for _ in 0..8 {
1986                scheduler.enqueue(QueuedPacket {
1987                    data: Bytes::from_static(&[0u8; 16]),
1988                    dest,
1989                    stream_id,
1990                    priority: false,
1991                    queued_at: Instant::now(),
1992                });
1993            }
1994        }
1995
1996        // Dequeue 5 packets. The weight-4 stream should ship at least
1997        // 4 of those 5 in the first round (its quantum is 4; stream 1's
1998        // quantum is 1). Depending on round-robin start, stream 1 may
1999        // ship 1 packet in the middle.
2000        let mut counts = [0u64; 3];
2001        for _ in 0..5 {
2002            if let Some(pkt) = scheduler.dequeue() {
2003                counts[pkt.stream_id as usize] += 1;
2004            }
2005        }
2006        assert_eq!(counts[0], 0);
2007        assert!(
2008            counts[2] >= 4,
2009            "weight-4 stream should ship >= 4 packets in 5 dequeues; \
2010             saw weight-1={} weight-4={}",
2011            counts[1],
2012            counts[2]
2013        );
2014        assert!(
2015            counts[1] <= 1,
2016            "weight-1 stream should ship <= 1 packet before round reset; \
2017             saw {}",
2018            counts[1]
2019        );
2020    }
2021
2022    /// Anti-starvation: a bulk transfer with a huge backlog must NOT
2023    /// head-of-line-block a competing interactive stream of equal
2024    /// weight. This is the property that justifies routing blob
2025    /// transfers through the scheduler (FairScheduler transport plan) —
2026    /// a 2 MiB transfer (~260 packets) queued ahead of a few
2027    /// interactive packets cannot make the interactive flow wait for
2028    /// the whole transfer to drain; round-robin interleaves them so
2029    /// every interactive packet is serviced within a bounded number of
2030    /// dequeues regardless of how deep the bulk backlog is.
2031    #[test]
2032    fn bulk_backlog_does_not_starve_an_equal_weight_interactive_stream() {
2033        let scheduler = FairScheduler::new(1, 512);
2034        let bulk = 1u64;
2035        let interactive = 2u64;
2036        scheduler.set_stream_weight(bulk, 1);
2037        scheduler.set_stream_weight(interactive, 1);
2038
2039        let dest: SocketAddr = "127.0.0.1:9999".parse().unwrap();
2040        // A transfer-scale backlog on the bulk stream …
2041        let bulk_backlog = 260usize;
2042        for _ in 0..bulk_backlog {
2043            scheduler.enqueue(QueuedPacket {
2044                data: Bytes::from_static(&[0u8; 16]),
2045                dest,
2046                stream_id: bulk,
2047                priority: false,
2048                queued_at: Instant::now(),
2049            });
2050        }
2051        // … and just a handful of interactive packets behind it.
2052        let interactive_count = 4usize;
2053        for _ in 0..interactive_count {
2054            scheduler.enqueue(QueuedPacket {
2055                data: Bytes::from_static(&[0u8; 16]),
2056                dest,
2057                stream_id: interactive,
2058                priority: false,
2059                queued_at: Instant::now(),
2060            });
2061        }
2062
2063        // Drain, recording the dequeue index at which each interactive
2064        // packet emerges.
2065        let mut last_interactive_at = 0usize;
2066        let mut seen_interactive = 0usize;
2067        for idx in 0..(bulk_backlog + interactive_count) {
2068            if let Some(pkt) = scheduler.dequeue() {
2069                if pkt.stream_id == interactive {
2070                    seen_interactive += 1;
2071                    last_interactive_at = idx;
2072                }
2073            }
2074        }
2075
2076        assert_eq!(
2077            seen_interactive, interactive_count,
2078            "every interactive packet must be delivered"
2079        );
2080        // With quantum 1 and equal weight, round-robin alternates one
2081        // bulk, one interactive per round, so the k-th interactive
2082        // packet emerges by dequeue ~2k — bounded by 2×count, utterly
2083        // independent of the 260-deep bulk backlog. If the scheduler
2084        // had FIFO/head-of-line semantics, the last interactive packet
2085        // would not appear until index ~260.
2086        assert!(
2087            last_interactive_at <= 2 * interactive_count,
2088            "interactive stream starved: last interactive packet at dequeue {} \
2089             (bulk backlog {}); fair interleaving should deliver it by ~{}",
2090            last_interactive_at,
2091            bulk_backlog,
2092            2 * interactive_count
2093        );
2094    }
2095
2096    /// Regression: BUG_REPORT.md #31 — `round_robin_idx` was
2097    /// `fetch_add(1)`-ed unconditionally on every `dequeue` call,
2098    /// so polls that returned `None` (no streams have packets)
2099    /// still rotated the cursor. Under bursty mixed traffic this
2100    /// biased service away from streams that became non-empty
2101    /// mid-pass. The fix advances the cursor only when a packet
2102    /// is actually popped.
2103    #[test]
2104    fn round_robin_idx_advances_only_on_successful_pop() {
2105        let scheduler = FairScheduler::new(1, 64);
2106
2107        // Empty scheduler: many polls must NOT advance the index.
2108        let initial = scheduler.round_robin_idx.load(Ordering::Relaxed);
2109        for _ in 0..1000 {
2110            assert!(scheduler.dequeue().is_none());
2111        }
2112        assert_eq!(
2113            scheduler.round_robin_idx.load(Ordering::Relaxed),
2114            initial,
2115            "empty-poll dequeues must not advance round_robin_idx (#31)"
2116        );
2117
2118        // Now enqueue + drain. Each successful pop should advance
2119        // the index by exactly 1.
2120        for stream in 0..5u64 {
2121            scheduler.enqueue(QueuedPacket {
2122                data: Bytes::from(vec![0u8; 8]),
2123                dest: "127.0.0.1:9000".parse().unwrap(),
2124                stream_id: stream,
2125                priority: false,
2126                queued_at: Instant::now(),
2127            });
2128        }
2129        let before_drain = scheduler.round_robin_idx.load(Ordering::Relaxed);
2130        let mut popped = 0;
2131        while scheduler.dequeue().is_some() {
2132            popped += 1;
2133        }
2134        let after_drain = scheduler.round_robin_idx.load(Ordering::Relaxed);
2135        assert_eq!(popped, 5);
2136        assert_eq!(
2137            after_drain.wrapping_sub(before_drain),
2138            popped as u64,
2139            "round_robin_idx must advance by exactly the number of successful pops (#31)"
2140        );
2141    }
2142}