net/adapter/net/router.rs
1//! Net Router for single-hop and multi-hop packet routing.
2//!
3//! The router handles:
4//! - Stream multiplexing across thousands of streams
5//! - Fair scheduling to prevent stream starvation
6//! - Low-latency packet forwarding
7//! - Per-stream statistics
8
9use arc_swap::ArcSwap;
10use bytes::{Bytes, BytesMut};
11use crossbeam_queue::ArrayQueue;
12use dashmap::DashMap;
13use std::net::SocketAddr;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::net::UdpSocket;
18use tokio::sync::Notify;
19
20use super::protocol::HEADER_SIZE;
21use super::route::{RoutingHeader, RoutingTable, ROUTING_HEADER_SIZE};
22
23// --- Phase 0 instrument (NRPC_SEND_LOOP_BATCHING_PLAN) -------------------
24//
25// Measures the send loop's *drain run-length*: how many packets
26// `scheduler.dequeue()` returns back-to-back before the loop falls through
27// to `wait()`. This is the batchability signal — a sendmmsg drain only pays
28// when runs are long. Off unless `arm_send_drain_histo()` is called BEFORE
29// the send loop starts (the loop latches the flag once at startup), so an
30// unarmed/production loop pays only a stack-local increment.
31//
32// Measurement scaffolding — not part of the supported public API. The
33// arm/snapshot entry points are re-exported only so the in-repo integration
34// tests (`send_drain_depth`, `scheduled_stream_integrity`), which compile as
35// external crates, can drive them; every such entry point is `#[doc(hidden)]`
36// so it never appears in the crate's documented surface and downstream code
37// is not invited to depend on it.
38static SEND_DRAIN_HISTO: [AtomicU64; 9] = [
39 AtomicU64::new(0),
40 AtomicU64::new(0),
41 AtomicU64::new(0),
42 AtomicU64::new(0),
43 AtomicU64::new(0),
44 AtomicU64::new(0),
45 AtomicU64::new(0),
46 AtomicU64::new(0),
47 AtomicU64::new(0),
48];
49static SEND_DRAIN_ARMED: AtomicBool = AtomicBool::new(false);
50static SEND_DRAIN_MAX: AtomicU64 = AtomicU64::new(0);
51
52/// Arm the send-loop drain-run histogram. Must be called BEFORE the
53/// router's send loop starts — the loop reads the flag once at startup.
54#[doc(hidden)]
55pub fn arm_send_drain_histo() {
56 SEND_DRAIN_ARMED.store(true, Ordering::Relaxed);
57}
58
59/// Snapshot the drain-run histogram. Index `i` (for `i < 8`) counts drain
60/// runs whose length fell in `[2^i, 2^(i+1))`; index `8` is `[256, ∞)`.
61#[doc(hidden)]
62pub fn send_drain_histo_snapshot() -> [u64; 9] {
63 let mut out = [0u64; 9];
64 for (i, b) in SEND_DRAIN_HISTO.iter().enumerate() {
65 out[i] = b.load(Ordering::Relaxed);
66 }
67 out
68}
69
70/// Longest single drain run observed so far (exact, not bucketed).
71#[doc(hidden)]
72pub fn send_drain_max() -> u64 {
73 SEND_DRAIN_MAX.load(Ordering::Relaxed)
74}
75
76#[inline]
77fn record_drain_run(run_len: u64) {
78 if run_len == 0 {
79 return;
80 }
81 // floor(log2(run_len)), clamped to the top bucket.
82 let bucket = (63 - run_len.leading_zeros()).min(8) as usize;
83 SEND_DRAIN_HISTO[bucket].fetch_add(1, Ordering::Relaxed);
84 SEND_DRAIN_MAX.fetch_max(run_len, Ordering::Relaxed);
85}
86
87// One flush = one destination group sent in a drain = one `sendmmsg`
88// syscall on Linux. The packets/flush ratio is the syscall-collapse factor.
89static SEND_BATCH_FLUSHES: AtomicU64 = AtomicU64::new(0);
90static SEND_BATCH_PACKETS: AtomicU64 = AtomicU64::new(0);
91
92/// `(flushes, packets)` sent via the batched-drain path. `packets / flushes`
93/// is the realized syscall-collapse factor (≈ `MAX_BATCH_SIZE` when full).
94#[doc(hidden)]
95pub fn send_batch_stats() -> (u64, u64) {
96 (
97 SEND_BATCH_FLUSHES.load(Ordering::Relaxed),
98 SEND_BATCH_PACKETS.load(Ordering::Relaxed),
99 )
100}
101
102#[inline]
103fn record_batch_flush(packets: u64) {
104 SEND_BATCH_FLUSHES.fetch_add(1, Ordering::Relaxed);
105 SEND_BATCH_PACKETS.fetch_add(packets, Ordering::Relaxed);
106}
107
108/// Append `data` to the destination group for `dest`, creating the group if
109/// this is the first packet for that peer in the current drain. Linear scan
110/// over `groups` — fine, since the distinct-peer count per drain is small
111/// (and bounded by `reset_dest_groups`).
112#[inline]
113fn group_by_dest(groups: &mut Vec<(SocketAddr, Vec<Bytes>)>, dest: SocketAddr, data: Bytes) {
114 match groups.iter_mut().find(|(d, _)| *d == dest) {
115 Some((_, v)) => v.push(data),
116 None => groups.push((dest, vec![data])),
117 }
118}
119
120/// Reset the per-destination group buffers between drains. Reuses the slots
121/// (and their grown inner-Vec capacity) to avoid reallocating on the send
122/// hot path, but drops the whole set once it exceeds `cap` so the dest set
123/// cannot grow without bound under peer churn — a node that sends to many
124/// distinct peers over its lifetime would otherwise keep a stale slot per
125/// peer forever, inflating memory and the linear `group_by_dest` scan. One
126/// drain touches at most `cap` (`MAX_DRAIN`) dests, so the bounded set stays
127/// at `cap + 1` worst case.
128#[inline]
129fn reset_dest_groups(groups: &mut Vec<(SocketAddr, Vec<Bytes>)>, cap: usize) {
130 if groups.len() > cap {
131 groups.clear();
132 } else {
133 for (_, v) in groups.iter_mut() {
134 v.clear();
135 }
136 }
137}
138
139/// Test-only loss injection shared by the single and batched send paths:
140/// returns `true` when this packet should be dropped (every Nth, `n > 0`).
141#[inline]
142fn drop_injected(drop_every_n: &AtomicU64, drop_counter: &AtomicU64) -> bool {
143 let n = drop_every_n.load(Ordering::Relaxed);
144 n > 0
145 && drop_counter
146 .fetch_add(1, Ordering::Relaxed)
147 .wrapping_add(1)
148 .is_multiple_of(n)
149}
150// ------------------------------------------------------------------------
151
152/// Router configuration
153#[derive(Debug, Clone)]
154pub struct RouterConfig {
155 /// Local node ID
156 pub local_id: u64,
157 /// Bind address
158 pub bind_addr: SocketAddr,
159 /// Maximum queue depth per stream
160 pub max_queue_depth: usize,
161 /// Fair scheduling quantum (packets per stream per round)
162 pub fair_quantum: usize,
163 /// Idle stream timeout (nanoseconds)
164 pub idle_timeout_ns: u64,
165 /// Enable priority queue bypass
166 pub priority_bypass: bool,
167}
168
169impl Default for RouterConfig {
170 fn default() -> Self {
171 Self {
172 local_id: 0,
173 bind_addr: SocketAddr::from(([0, 0, 0, 0], 0)),
174 max_queue_depth: 1024,
175 fair_quantum: 16,
176 idle_timeout_ns: 30_000_000_000, // 30 seconds
177 priority_bypass: true,
178 }
179 }
180}
181
182impl RouterConfig {
183 /// Create a new router config with defaults
184 pub fn new(local_id: u64, bind_addr: SocketAddr) -> Self {
185 Self {
186 local_id,
187 bind_addr,
188 ..Default::default()
189 }
190 }
191}
192
193/// Queued packet for fair scheduling
194pub struct QueuedPacket {
195 /// Packet data
196 pub data: Bytes,
197 /// Destination address
198 pub dest: SocketAddr,
199 /// Stream identifier
200 pub stream_id: u64,
201 /// Whether this is a priority packet
202 pub priority: bool,
203 /// Time the packet was queued
204 pub queued_at: Instant,
205}
206
207/// Per-stream queue for fair scheduling
208struct StreamQueue {
209 queue: ArrayQueue<QueuedPacket>,
210 packets_sent_this_round: AtomicU64,
211 /// Fairness weight (quantum multiplier). `1` is equal-share; higher
212 /// values let this stream ship `weight × session_quantum` packets
213 /// per round before round-reset. Always ≥ 1.
214 weight: AtomicU64,
215}
216
217impl StreamQueue {
218 fn new(capacity: usize) -> Self {
219 Self::new_with_weight(capacity, 1)
220 }
221
222 fn new_with_weight(capacity: usize, weight: u8) -> Self {
223 Self {
224 queue: ArrayQueue::new(capacity),
225 packets_sent_this_round: AtomicU64::new(0),
226 weight: AtomicU64::new(weight.max(1) as u64),
227 }
228 }
229
230 fn push(&self, packet: QueuedPacket) -> Result<(), QueuedPacket> {
231 self.queue.push(packet)
232 }
233
234 fn pop(&self) -> Option<QueuedPacket> {
235 self.queue.pop()
236 }
237
238 #[allow(dead_code)]
239 fn len(&self) -> usize {
240 self.queue.len()
241 }
242
243 fn is_empty(&self) -> bool {
244 self.queue.is_empty()
245 }
246
247 fn reset_round(&self) {
248 self.packets_sent_this_round.store(0, Ordering::Relaxed);
249 }
250
251 fn increment_sent(&self) -> u64 {
252 self.packets_sent_this_round.fetch_add(1, Ordering::Relaxed)
253 }
254
255 fn sent_this_round(&self) -> u64 {
256 self.packets_sent_this_round.load(Ordering::Relaxed)
257 }
258
259 fn weight(&self) -> u64 {
260 self.weight.load(Ordering::Relaxed).max(1)
261 }
262
263 #[allow(dead_code)]
264 fn set_weight(&self, weight: u8) {
265 self.weight.store(weight.max(1) as u64, Ordering::Relaxed);
266 }
267}
268
269/// Fair scheduler for stream fairness
270pub struct FairScheduler {
271 /// Per-stream queues
272 streams: DashMap<u64, Arc<StreamQueue>>,
273 /// PERF_AUDIT §3.1 — incrementally-maintained snapshot of the
274 /// set of stream-ids in `streams`. Pre-fix `dequeue` allocated
275 /// a fresh `Vec<u64>` per call (often twice on quantum-reset
276 /// paths) by walking the DashMap — O(streams) work + a heap
277 /// alloc on the per-packet send path that §2.1's batching
278 /// wants to route MORE traffic through. The snapshot is
279 /// rebuilt only when the stream-set actually changes (the
280 /// vacant arm of `enqueue` / `set_stream_weight`, or after
281 /// `cleanup_empty` evicts at least one entry), so steady-
282 /// state dequeue pays a single `ArcSwap::load_full` (a
283 /// relaxed-load + Arc clone). Long-lived streams mean
284 /// rebuild rate is orders of magnitude below dequeue rate.
285 ///
286 /// The snapshot may briefly lag the DashMap (a concurrent
287 /// enqueue can add a stream between snapshot-build and
288 /// dequeue's iteration). That's the same race as the pre-fix
289 /// code — a dequeue that started before the new stream landed
290 /// won't service it this call; the next call's load picks up
291 /// the updated snapshot.
292 active_streams: ArcSwap<Vec<u64>>,
293 /// Version counter for the active-stream snapshot protocol.
294 /// Bumped at every stream-set mutation site *after* the DashMap
295 /// change and *before* [`Self::rebuild_active_streams`] runs.
296 /// The rebuild loop re-checks this after its `store`: if the
297 /// version moved while it was collecting, another mutation
298 /// landed concurrently and this rebuild's snapshot may be stale
299 /// — retry. Without this, two racing rebuilds can store out of
300 /// order (cleanup's pre-insert collection overwriting enqueue's
301 /// post-insert one), leaving a stream with queued packets
302 /// permanently absent from the snapshot: occupied-arm enqueues
303 /// never rebuild and cleanup never evicts a non-empty queue, so
304 /// nothing would ever repair it (lost wakeup).
305 active_streams_version: AtomicU64,
306 /// Priority queue (bypasses fair scheduling)
307 priority_queue: ArrayQueue<QueuedPacket>,
308 /// Fair quantum
309 quantum: usize,
310 /// Max queue depth per stream
311 max_depth: usize,
312 /// Notification for new packets
313 notify: Notify,
314 /// Total packets queued
315 total_queued: AtomicU64,
316 /// Total packets dropped
317 total_dropped: AtomicU64,
318 /// Rotation index for round-robin fairness across dequeue calls.
319 /// Only advances when a dequeue actually pops a packet, so
320 /// rotation correlates with service events rather than poll
321 /// frequency.
322 round_robin_idx: AtomicU64,
323 /// Counter incremented on every `dequeue` call (regardless of
324 /// whether a packet was popped) for the modulo-1024 cleanup
325 /// trigger. Decoupled from `round_robin_idx` so the rotation fix
326 /// doesn't accidentally suppress cleanup.
327 dequeue_call_count: AtomicU64,
328 /// **Current** in-queue packet count (enqueued minus dequeued),
329 /// across the priority lane and all per-stream queues. Unlike
330 /// `total_queued` (a cumulative enqueue counter that never
331 /// decrements), this reflects live backlog — the send loop reads it
332 /// to decide whether a batched drain is worthwhile.
333 current_depth: AtomicU64,
334}
335
336impl FairScheduler {
337 /// Create a new fair scheduler
338 pub fn new(quantum: usize, max_depth: usize) -> Self {
339 Self {
340 streams: DashMap::new(),
341 active_streams: ArcSwap::from_pointee(Vec::new()),
342 active_streams_version: AtomicU64::new(0),
343 priority_queue: ArrayQueue::new(max_depth),
344 quantum,
345 max_depth,
346 notify: Notify::new(),
347 total_queued: AtomicU64::new(0),
348 total_dropped: AtomicU64::new(0),
349 round_robin_idx: AtomicU64::new(0),
350 dequeue_call_count: AtomicU64::new(0),
351 current_depth: AtomicU64::new(0),
352 }
353 }
354
355 /// PERF_AUDIT §3.1 — mark the stream set as changed and rebuild
356 /// the snapshot. Must be called *after* the DashMap mutation so
357 /// any concurrent rebuild that observes the new version also
358 /// observes the mutation. Called from every stream-set mutation
359 /// site: the vacant arm of [`Self::enqueue`] and
360 /// [`Self::set_stream_weight`], and [`Self::cleanup_empty`]
361 /// when at least one entry was removed.
362 fn note_stream_set_changed(&self) {
363 self.active_streams_version.fetch_add(1, Ordering::SeqCst);
364 self.rebuild_active_streams();
365 }
366
367 /// PERF_AUDIT §3.1 — rebuild the active-stream snapshot from
368 /// the current `streams` DashMap and atomically swap it in.
369 ///
370 /// Lost-wakeup protection: the version counter is re-checked
371 /// after the `store`. If it moved while this rebuild was
372 /// collecting, a concurrent mutation landed and our snapshot
373 /// may predate it — and ArcSwap stores from racing rebuilds
374 /// can land in either order, so a stale collection could
375 /// otherwise overwrite a fresher one and strand a non-empty
376 /// stream outside the snapshot forever. Retrying until the
377 /// version is stable across collect+store guarantees the last
378 /// snapshot standing reflects every mutation (each mutation
379 /// bumps the version *before* its own rebuild, so whichever
380 /// rebuild finishes last either saw the final version or
381 /// retries). SeqCst keeps the version loads and the ArcSwap
382 /// store in one total order; rebuilds only run on stream-set
383 /// changes, so the cost is off the per-packet path.
384 fn rebuild_active_streams(&self) {
385 loop {
386 let v = self.active_streams_version.load(Ordering::SeqCst);
387 let keys: Vec<u64> = self.streams.iter().map(|e| *e.key()).collect();
388 self.active_streams.store(Arc::new(keys));
389 if self.active_streams_version.load(Ordering::SeqCst) == v {
390 break;
391 }
392 // Version moved mid-rebuild: another mutation raced us.
393 // Our snapshot may be stale — collect again.
394 }
395 }
396
397 /// Set the fair-scheduling weight for a stream. `weight` is a quantum
398 /// multiplier: 1 = equal share (default), higher values give the
399 /// stream proportionally more packets per round. Creates the stream
400 /// queue if it doesn't exist yet, so callers can set the weight
401 /// before any traffic flows.
402 pub fn set_stream_weight(&self, stream_id: u64, weight: u8) {
403 let weight = weight.max(1);
404 // PERF_AUDIT §3.1 — detect new vs existing so the
405 // active-stream snapshot only rebuilds when the set
406 // actually changes.
407 let was_new = match self.streams.entry(stream_id) {
408 dashmap::mapref::entry::Entry::Occupied(occ) => {
409 occ.get().set_weight(weight);
410 false
411 }
412 dashmap::mapref::entry::Entry::Vacant(vac) => {
413 vac.insert(Arc::new(StreamQueue::new_with_weight(
414 self.max_depth,
415 weight,
416 )));
417 true
418 }
419 };
420 if was_new {
421 self.note_stream_set_changed();
422 }
423 }
424
425 /// Enqueue a packet
426 pub fn enqueue(&self, packet: QueuedPacket) -> bool {
427 if packet.priority {
428 // Priority packets bypass fair scheduling
429 if self.priority_queue.push(packet).is_ok() {
430 self.total_queued.fetch_add(1, Ordering::Relaxed);
431 self.current_depth.fetch_add(1, Ordering::Relaxed);
432 self.notify.notify_one();
433 return true;
434 }
435 } else {
436 // PERF_AUDIT §3.1 — get-or-create with new/existing
437 // discrimination so the active-stream snapshot is
438 // rebuilt only when the set actually changes.
439 let (queue, is_new) = match self.streams.entry(packet.stream_id) {
440 dashmap::mapref::entry::Entry::Occupied(occ) => (occ.get().clone(), false),
441 dashmap::mapref::entry::Entry::Vacant(vac) => {
442 let arc = Arc::new(StreamQueue::new(self.max_depth));
443 vac.insert(arc.clone());
444 (arc, true)
445 }
446 };
447 if is_new {
448 self.note_stream_set_changed();
449 }
450
451 if queue.push(packet).is_ok() {
452 self.total_queued.fetch_add(1, Ordering::Relaxed);
453 self.current_depth.fetch_add(1, Ordering::Relaxed);
454 self.notify.notify_one();
455 return true;
456 }
457 }
458
459 self.total_dropped.fetch_add(1, Ordering::Relaxed);
460 false
461 }
462
463 /// Dequeue next packet (fair round-robin)
464 pub fn dequeue(&self) -> Option<QueuedPacket> {
465 // Periodically clean up empty stream queues to prevent unbounded growth
466 // of the DashMap. Check every 1024 dequeue calls (cheap modular check).
467 // Tracked on a separate counter from `round_robin_idx` so
468 // that decoupling rotation from poll frequency doesn't
469 // suppress the cleanup trigger.
470 let call_count = self
471 .dequeue_call_count
472 .fetch_add(1, Ordering::Relaxed)
473 .wrapping_add(1);
474 if call_count.is_multiple_of(1024) {
475 self.cleanup_empty();
476 }
477
478 // Priority queue first
479 if let Some(packet) = self.priority_queue.pop() {
480 self.current_depth.fetch_sub(1, Ordering::Relaxed);
481 return Some(packet);
482 }
483
484 // PERF_AUDIT §3.1 — read the incrementally-maintained
485 // active-stream snapshot instead of walking the DashMap
486 // per call. `load_full` is one relaxed atomic load + Arc
487 // clone (no Vec alloc, no DashMap iteration).
488 let keys = self.active_streams.load_full();
489 if keys.is_empty() {
490 return None;
491 }
492 // Advance the round-robin cursor only when we actually pop a
493 // packet. Previously this used `fetch_add(1)` unconditionally,
494 // so every empty poll advanced the rotation as if we'd
495 // serviced a stream. The result was that under bursty mixed
496 // traffic, polls that found no packet still nudged the cursor
497 // past the stream that *would* have had a packet on the next
498 // tick, biasing service away from streams that became
499 // non-empty
500 // mid-pass.
501 //
502 // Now: read the cursor for the starting offset, then
503 // commit a `fetch_add(1)` only inside the successful pop
504 // arm. Behavioral effect is the same when packets are
505 // available; the difference shows up under contention
506 // and on dequeues that find nothing.
507 let start = self.round_robin_idx.load(Ordering::Relaxed) as usize % keys.len();
508
509 // Round-robin across streams, starting from the rotated index.
510 // Each stream's quantum is `base_quantum × stream.weight`, so
511 // a weight-4 stream gets 4× the packets per round of a weight-1
512 // stream before round-reset. Default weight is 1 = unchanged.
513 for i in 0..keys.len() {
514 let key = keys[(start + i) % keys.len()];
515 if let Some(queue) = self.streams.get(&key) {
516 let stream_quantum = (self.quantum as u64).saturating_mul(queue.weight());
517 if queue.sent_this_round() < stream_quantum && !queue.is_empty() {
518 if let Some(packet) = queue.pop() {
519 queue.increment_sent();
520 self.current_depth.fetch_sub(1, Ordering::Relaxed);
521 // Advance the rotation cursor only on
522 // successful pop.
523 self.round_robin_idx.fetch_add(1, Ordering::Relaxed);
524 return Some(packet);
525 }
526 }
527 }
528 }
529
530 // If all streams exhausted their quantum, reset and try again.
531 // Re-load the active-stream snapshot so streams added since
532 // the first load are also considered — `load_full` is
533 // cheap (no Vec alloc), and may pick up an updated snapshot
534 // if a concurrent enqueue rebuilt it mid-pass.
535 let keys = self.active_streams.load_full();
536 if keys.is_empty() {
537 return None;
538 }
539 let mut has_packets = false;
540 for key in keys.iter() {
541 if let Some(queue) = self.streams.get(key) {
542 queue.reset_round();
543 if !queue.is_empty() {
544 has_packets = true;
545 }
546 }
547 }
548
549 if has_packets {
550 let start = self.round_robin_idx.load(Ordering::Relaxed) as usize % keys.len();
551 for i in 0..keys.len() {
552 let key = keys[(start + i) % keys.len()];
553 if let Some(queue) = self.streams.get(&key) {
554 if let Some(packet) = queue.pop() {
555 queue.increment_sent();
556 self.current_depth.fetch_sub(1, Ordering::Relaxed);
557 self.round_robin_idx.fetch_add(1, Ordering::Relaxed);
558 return Some(packet);
559 }
560 }
561 }
562 }
563
564 None
565 }
566
567 /// Wait for new packets
568 pub async fn wait(&self) {
569 self.notify.notified().await;
570 }
571
572 /// Current live backlog: packets enqueued but not yet dequeued,
573 /// summed across the priority lane and all per-stream queues.
574 /// Distinct from [`Self::total_queued`], which only counts upward.
575 pub fn current_depth(&self) -> u64 {
576 self.current_depth.load(Ordering::Relaxed)
577 }
578
579 /// Get total queued count
580 pub fn total_queued(&self) -> u64 {
581 self.total_queued.load(Ordering::Relaxed)
582 }
583
584 /// Get total dropped count
585 pub fn total_dropped(&self) -> u64 {
586 self.total_dropped.load(Ordering::Relaxed)
587 }
588
589 /// Get number of active streams
590 pub fn stream_count(&self) -> usize {
591 self.streams.len()
592 }
593
594 /// Clean up empty stream queues.
595 ///
596 /// Only removes queues that are both empty and have no outstanding
597 /// `Arc` references (strong_count == 1 means only the DashMap holds it).
598 /// This prevents a race where `enqueue` clones the Arc, cleanup removes
599 /// the entry, and the enqueued packet becomes unreachable.
600 pub fn cleanup_empty(&self) -> usize {
601 let mut removed = 0;
602 self.streams.retain(|_, queue| {
603 if queue.is_empty() && Arc::strong_count(queue) == 1 {
604 removed += 1;
605 false
606 } else {
607 true
608 }
609 });
610 // PERF_AUDIT §3.1 — rebuild the active-stream snapshot
611 // when at least one entry was evicted so the dequeue
612 // path stops iterating over keys that no longer exist.
613 // No-op rebuild on a clean pass is cheap (single Vec
614 // alloc the same size as the prior snapshot, swapped in
615 // via ArcSwap), but the branch keeps the steady-state
616 // cleanup-call cost at zero rebuilds.
617 if removed > 0 {
618 self.note_stream_set_changed();
619 }
620 removed
621 }
622}
623
624/// Router statistics
625#[derive(Debug, Clone, Default)]
626pub struct RouterStats {
627 /// Packets received
628 pub packets_received: u64,
629 /// Packets forwarded
630 pub packets_forwarded: u64,
631 /// Packets delivered locally
632 pub packets_local: u64,
633 /// Packets dropped (TTL, no route, queue full)
634 pub packets_dropped: u64,
635 /// Bytes received
636 pub bytes_received: u64,
637 /// Bytes forwarded
638 pub bytes_forwarded: u64,
639 /// Active routes
640 pub routes: usize,
641 /// Active streams
642 pub streams: usize,
643 /// Average routing latency (nanoseconds)
644 pub avg_latency_ns: u64,
645}
646
647/// Net Router
648pub struct NetRouter {
649 /// Configuration
650 #[allow(dead_code)]
651 config: RouterConfig,
652 /// UDP socket
653 socket: Arc<UdpSocket>,
654 /// Routing table
655 routing_table: Arc<RoutingTable>,
656 /// Fair scheduler
657 scheduler: Arc<FairScheduler>,
658 /// Running flag
659 running: Arc<AtomicBool>,
660 /// Statistics
661 packets_received: AtomicU64,
662 packets_forwarded: AtomicU64,
663 packets_local: AtomicU64,
664 packets_dropped: AtomicU64,
665 bytes_received: AtomicU64,
666 bytes_forwarded: AtomicU64,
667 total_latency_ns: AtomicU64,
668 latency_samples: AtomicU64,
669 /// Test-only fault injection: when `> 0`, the send loop drops every
670 /// Nth dequeued (scheduled) packet instead of sending it, simulating
671 /// link loss for reliability tests. `0` (default) disables it — one
672 /// relaxed load per send, negligible. Shared into the send-loop task.
673 test_drop_every_n: Arc<AtomicU64>,
674 test_drop_counter: Arc<AtomicU64>,
675}
676
677impl NetRouter {
678 /// Create a new router
679 pub async fn new(config: RouterConfig) -> std::io::Result<Self> {
680 let socket = UdpSocket::bind(config.bind_addr).await?;
681 let routing_table = Arc::new(RoutingTable::new(config.local_id));
682 let scheduler = Arc::new(FairScheduler::new(
683 config.fair_quantum,
684 config.max_queue_depth,
685 ));
686
687 Ok(Self {
688 config,
689 socket: Arc::new(socket),
690 routing_table,
691 scheduler,
692 running: Arc::new(AtomicBool::new(false)),
693 packets_received: AtomicU64::new(0),
694 packets_forwarded: AtomicU64::new(0),
695 packets_local: AtomicU64::new(0),
696 packets_dropped: AtomicU64::new(0),
697 bytes_received: AtomicU64::new(0),
698 bytes_forwarded: AtomicU64::new(0),
699 total_latency_ns: AtomicU64::new(0),
700 latency_samples: AtomicU64::new(0),
701 test_drop_every_n: Arc::new(AtomicU64::new(0)),
702 test_drop_counter: Arc::new(AtomicU64::new(0)),
703 })
704 }
705
706 /// Test-only: drop every `n`th dequeued (scheduled) packet in the
707 /// send loop to simulate link loss. `0` disables. Used by reliability
708 /// / retransmit tests; has no effect on the default (`0`) path.
709 pub fn set_test_drop_every_n(&self, n: u64) {
710 self.test_drop_every_n.store(n, Ordering::Relaxed);
711 }
712
713 /// Get local address
714 pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
715 self.socket.local_addr()
716 }
717
718 /// Get routing table
719 pub fn routing_table(&self) -> &Arc<RoutingTable> {
720 &self.routing_table
721 }
722
723 /// Get the fair scheduler. Exposed so `MeshNode::open_stream` can
724 /// propagate a stream's `fairness_weight` to the forwarding path.
725 pub fn scheduler(&self) -> &Arc<FairScheduler> {
726 &self.scheduler
727 }
728
729 /// Add a route
730 pub fn add_route(&self, dest_id: u64, next_hop: SocketAddr) {
731 self.routing_table.add_route(dest_id, next_hop);
732 }
733
734 /// Remove a route
735 pub fn remove_route(&self, dest_id: u64) {
736 self.routing_table.remove_route(dest_id);
737 }
738
739 /// Route a packet (called from receive loop)
740 pub fn route_packet(&self, data: Bytes, _from: SocketAddr) -> Result<RouteAction, RouterError> {
741 let start = Instant::now();
742 let len = data.len() as u64;
743
744 self.packets_received.fetch_add(1, Ordering::Relaxed);
745 self.bytes_received.fetch_add(len, Ordering::Relaxed);
746
747 // Need at least routing header
748 if data.len() < ROUTING_HEADER_SIZE {
749 self.packets_dropped.fetch_add(1, Ordering::Relaxed);
750 return Err(RouterError::PacketTooSmall);
751 }
752
753 // Parse routing header
754 let routing_header = RoutingHeader::from_bytes(&data[..ROUTING_HEADER_SIZE])
755 .ok_or(RouterError::InvalidHeader)?;
756
757 // Extract stream ID from Net header if present
758 let stream_id = if data.len() >= ROUTING_HEADER_SIZE + HEADER_SIZE {
759 let net_header = &data[ROUTING_HEADER_SIZE..ROUTING_HEADER_SIZE + HEADER_SIZE];
760 u64::from_le_bytes(net_header[32..40].try_into().unwrap_or([0; 8]))
761 } else {
762 0
763 };
764
765 // Per-stream `record_in` is deferred until the packet
766 // either delivers locally or survives all drop checks
767 // and is queued for forward. Pre-fix the call fired
768 // here (immediately after parse), so a packet that
769 // failed loop suppression or TTL incremented BOTH
770 // `packets_in` and `packets_dropped` for the stream —
771 // double-counting against any dashboard computing
772 // `delivery rate = packets_out / packets_in` (drops
773 // inflated the denominator without affecting the
774 // numerator, masking healthy networks behind apparent
775 // delivery loss). The drop paths now call
776 // `record_drop` only; `record_in` fires once for each
777 // successful local-delivery / forward.
778
779 // Check if local delivery
780 if self.routing_table.is_local(routing_header.dest_id) {
781 self.routing_table.record_in(stream_id, len);
782 self.packets_local.fetch_add(1, Ordering::Relaxed);
783 self.record_latency(start);
784 return Ok(RouteAction::Local(data.slice(ROUTING_HEADER_SIZE..)));
785 }
786
787 // Loop suppression: if we're about to forward a packet whose
788 // `src_id` is us, we sent it earlier and it has come back via
789 // a misconfigured route or a malicious peer. Drop it locally
790 // so the only thing breaking the loop is TTL exhaustion;
791 // every looping hop wastes one queue slot and 2x bandwidth
792 // on the link. The `src_id` field is u32; `local_id` is u64
793 // (only the low 32 bits identify us on the wire).
794 if routing_header.src_id == (self.routing_table.local_id() as u32) {
795 self.packets_dropped.fetch_add(1, Ordering::Relaxed);
796 self.routing_table.record_drop(stream_id);
797 return Err(RouterError::RoutingLoop);
798 }
799
800 // Check TTL
801 if routing_header.is_expired() {
802 self.packets_dropped.fetch_add(1, Ordering::Relaxed);
803 self.routing_table.record_drop(stream_id);
804 return Err(RouterError::TtlExpired);
805 }
806
807 // Lookup next hop
808 let next_hop = self
809 .routing_table
810 .lookup(routing_header.dest_id)
811 .ok_or(RouterError::NoRoute)?;
812
813 // Update header for forwarding
814 let mut fwd_header = routing_header;
815 fwd_header.forward();
816 // Re-check expiry after `forward()` decrements the TTL: if
817 // TTL hit 0, the next hop would just drop the packet on its
818 // own `is_expired()` check — wasting one forward, bandwidth,
819 // and a queue slot per last-hop packet. Drop locally here so
820 // the next hop never sees the doomed packet.
821 if fwd_header.is_expired() {
822 self.packets_dropped.fetch_add(1, Ordering::Relaxed);
823 self.routing_table.record_drop(stream_id);
824 return Err(RouterError::TtlExpired);
825 }
826 // All drop gates passed — count this packet as
827 // successfully ingressed for the stream before queueing
828 // the forward.
829 self.routing_table.record_in(stream_id, len);
830
831 // Fast path (perf #18): if the inbound `data` is sole-
832 // owned (the typical case for UDP packets just received
833 // from the socket), `try_into_mut` returns the same
834 // allocation as a `BytesMut` in O(1). We overwrite the
835 // first ROUTING_HEADER_SIZE bytes in place — refcount
836 // bump only, no body copy. Pre-fix this allocated a
837 // fresh buffer the size of the entire packet and
838 // memcpy'd the body for every forward; for a relay node
839 // moving GB/s of packets, bandwidth-class waste.
840 //
841 // Slow path: an outstanding clone of `data` (e.g. some
842 // observer / mirror) forces the copy. We fall back to
843 // the prior allocation strategy.
844 let forwarded_data = match data.try_into_mut() {
845 Ok(mut mut_data) => {
846 fwd_header.write_at(&mut mut_data[..ROUTING_HEADER_SIZE]);
847 mut_data.freeze()
848 }
849 Err(orig_data) => {
850 let mut new_data = BytesMut::with_capacity(orig_data.len());
851 fwd_header.write_to(&mut new_data);
852 new_data.extend_from_slice(&orig_data[ROUTING_HEADER_SIZE..]);
853 new_data.freeze()
854 }
855 };
856
857 // Queue for sending
858 let packet = QueuedPacket {
859 data: forwarded_data,
860 dest: next_hop,
861 stream_id,
862 priority: routing_header.flags.is_priority(),
863 queued_at: Instant::now(),
864 };
865
866 if self.scheduler.enqueue(packet) {
867 self.packets_forwarded.fetch_add(1, Ordering::Relaxed);
868 self.bytes_forwarded.fetch_add(len, Ordering::Relaxed);
869 self.routing_table.record_out(stream_id, len);
870 self.record_latency(start);
871 Ok(RouteAction::Forwarded(next_hop))
872 } else {
873 self.packets_dropped.fetch_add(1, Ordering::Relaxed);
874 self.routing_table.record_drop(stream_id);
875 Err(RouterError::QueueFull)
876 }
877 }
878
879 /// Send a packet directly (bypassing routing)
880 pub async fn send_to(&self, data: &[u8], dest: SocketAddr) -> std::io::Result<usize> {
881 self.socket.send_to(data, dest).await
882 }
883
884 /// Receive a packet
885 pub async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
886 self.socket.recv_from(buf).await
887 }
888
889 /// Start the router (spawns send loop). Returns `None` if a
890 /// dispatch loop is already running for this router; calling
891 /// twice would otherwise spawn a second loop racing the first
892 /// one's `scheduler.dequeue()`, producing reordered or
893 /// duplicate sends.
894 pub fn start(&self) -> Option<tokio::task::JoinHandle<()>> {
895 if self
896 .running
897 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
898 .is_err()
899 {
900 return None;
901 }
902
903 let socket = self.socket.clone();
904 let scheduler = self.scheduler.clone();
905 let running = self.running.clone();
906 let drop_every_n = self.test_drop_every_n.clone();
907 let drop_counter = self.test_drop_counter.clone();
908
909 Some(tokio::spawn(async move {
910 // Phase 0 instrument: latch the arm flag once; when armed, count
911 // how many packets we drain back-to-back before blocking.
912 let measure_drain = SEND_DRAIN_ARMED.load(Ordering::Relaxed);
913 let mut drain_run: u64 = 0;
914
915 // Batched-drain scratch (Phase 1, NRPC_SEND_LOOP_BATCHING_PLAN).
916 // When the scheduler has a backlog, drain up to MAX_DRAIN packets
917 // and ship them grouped by destination — one `sendmmsg` per peer
918 // on Linux, falling back to per-packet `send_to` elsewhere. The
919 // `groups` holds one (dest, packets) slot per peer touched in a
920 // drain. Inner Vecs are reused across drains to avoid reallocating
921 // on the hot path; the slot set is bounded on reset (see below) so
922 // it can't accumulate a stale entry per peer forever under churn.
923 const MAX_DRAIN: usize = 64;
924 let mut groups: Vec<(SocketAddr, Vec<Bytes>)> = Vec::new();
925 // Linux-only batched sender over the same socket fd. The send loop
926 // is the socket's sole, single-threaded sender, so it owns one
927 // `BatchedTransport` for its whole lifetime and reuses the iovec /
928 // mmsghdr / sockaddr scratch across every flush — no per-flush
929 // allocation. (`PacketSender::send_batch` builds a fresh transport
930 // each call to stay shareable across concurrent senders; here there
931 // is exactly one sender, so we hold the reusable form directly.)
932 #[cfg(target_os = "linux")]
933 let mut batch_sender = {
934 use std::os::unix::io::AsRawFd;
935 super::linux::BatchedTransport::new_send_only(socket.as_raw_fd())
936 };
937
938 while running.load(Ordering::Acquire) {
939 // Dequeue and send
940 if let Some(first) = scheduler.dequeue() {
941 // Count every packet `dequeue()` hands back, by design:
942 // `drain_run` measures the scheduler's drain run-length,
943 // not wire packets, so test-injected drops below still
944 // count (in production `drop_every_n == 0`, so there are
945 // none). This is deliberately distinct from the batch-flush
946 // packet counter, which tracks only what reaches the wire.
947 drain_run += 1;
948
949 // Fast path: nothing else queued → send this one packet
950 // directly, exactly as before. Keeps depth-≤1 traffic
951 // (single scheduled stream, sparse sends) on the original
952 // zero-overhead path. Guarded by the live `current_depth`.
953 if scheduler.current_depth() == 0 {
954 if !drop_injected(&drop_every_n, &drop_counter) {
955 let _ = socket.send_to(&first.data, first.dest).await;
956 }
957 continue;
958 }
959
960 // Backlog present → batched drain. Reset the per-dest
961 // buffers for this drain (bounded — see `reset_dest_groups`).
962 reset_dest_groups(&mut groups, MAX_DRAIN);
963 if !drop_injected(&drop_every_n, &drop_counter) {
964 group_by_dest(&mut groups, first.dest, first.data);
965 }
966 let mut drained = 1usize;
967 while drained < MAX_DRAIN {
968 match scheduler.dequeue() {
969 Some(p) => {
970 drain_run += 1;
971 drained += 1;
972 if !drop_injected(&drop_every_n, &drop_counter) {
973 group_by_dest(&mut groups, p.dest, p.data);
974 }
975 }
976 None => break,
977 }
978 }
979
980 // Flush each destination group: one batched syscall per
981 // peer on Linux, per-packet send elsewhere.
982 for (dest, data) in groups.iter() {
983 if data.is_empty() {
984 continue;
985 }
986 #[cfg(target_os = "linux")]
987 {
988 // `send_batch` is a synchronous `sendmmsg` on the
989 // socket's non-blocking fd — it returns immediately
990 // (filling the kernel buffer, then partial /
991 // EWOULDBLOCK), so it does NOT block the worker and
992 // must not be moved to `spawn_blocking`. The async
993 // `send_to` below ships any unsent tail (partial
994 // send / EWOULDBLOCK), which re-registers the waker
995 // so we preserve backpressure rather than dropping
996 // or spinning.
997 let sent = batch_sender.send_batch(data, *dest).unwrap_or(0);
998 for d in &data[sent..] {
999 let _ = socket.send_to(d, *dest).await;
1000 }
1001 }
1002 #[cfg(not(target_os = "linux"))]
1003 {
1004 for d in data {
1005 let _ = socket.send_to(d, *dest).await;
1006 }
1007 }
1008 if measure_drain {
1009 record_batch_flush(data.len() as u64);
1010 }
1011 }
1012 } else {
1013 // Drain run ended — record its length, then block.
1014 if measure_drain {
1015 record_drain_run(drain_run);
1016 }
1017 drain_run = 0;
1018 // Wait for new packets (with timeout).
1019 //
1020 // The `notify_one` → `wait()` pattern is
1021 // sound — `tokio::sync::Notify`
1022 // stores a permit when `notify_one` is called
1023 // with no waiter, and the next `notified().await`
1024 // consumes it immediately. So an enqueue that
1025 // races our `dequeue() → None` transition is
1026 // observed on the next loop iteration regardless
1027 // of which side won the race; no notify is lost.
1028 //
1029 // The 1ms polling fallback is **defensive
1030 // paranoia**, not load-bearing for correctness:
1031 // it bounds the worst-case wakeup latency under
1032 // any future refactor that accidentally breaks
1033 // the permit-stash invariant. It also catches
1034 // the edge case where a `select!` branch fires
1035 // while `notified()` is still in its
1036 // pre-sleep registration window, dropping the
1037 // notified future without consuming the permit.
1038 // Cost: at most one wakeup per millisecond on
1039 // an idle router; negligible.
1040 //
1041 // The audit suggested `Notify::notify_waiters`
1042 // — that's wrong for this site: `notify_waiters`
1043 // does NOT store a permit when no one is
1044 // waiting, so it would re-introduce the genuine
1045 // lost-wakeup case the permit-stash semantics
1046 // close.
1047 tokio::select! {
1048 _ = scheduler.wait() => {}
1049 _ = tokio::time::sleep(Duration::from_millis(1)) => {}
1050 }
1051 }
1052 }
1053 }))
1054 }
1055
1056 /// Stop the router
1057 pub fn stop(&self) {
1058 self.running.store(false, Ordering::Release);
1059 }
1060
1061 /// Check if running
1062 pub fn is_running(&self) -> bool {
1063 self.running.load(Ordering::Acquire)
1064 }
1065
1066 /// Get statistics
1067 pub fn stats(&self) -> RouterStats {
1068 let samples = self.latency_samples.load(Ordering::Relaxed);
1069 let total_latency = self.total_latency_ns.load(Ordering::Relaxed);
1070 let avg_latency = total_latency.checked_div(samples).unwrap_or(0);
1071
1072 RouterStats {
1073 packets_received: self.packets_received.load(Ordering::Relaxed),
1074 packets_forwarded: self.packets_forwarded.load(Ordering::Relaxed),
1075 packets_local: self.packets_local.load(Ordering::Relaxed),
1076 packets_dropped: self.packets_dropped.load(Ordering::Relaxed),
1077 bytes_received: self.bytes_received.load(Ordering::Relaxed),
1078 bytes_forwarded: self.bytes_forwarded.load(Ordering::Relaxed),
1079 routes: self.routing_table.route_count(),
1080 streams: self.routing_table.stream_count(),
1081 avg_latency_ns: avg_latency,
1082 }
1083 }
1084
1085 /// Reset statistics
1086 pub fn reset_stats(&self) {
1087 self.packets_received.store(0, Ordering::Relaxed);
1088 self.packets_forwarded.store(0, Ordering::Relaxed);
1089 self.packets_local.store(0, Ordering::Relaxed);
1090 self.packets_dropped.store(0, Ordering::Relaxed);
1091 self.bytes_received.store(0, Ordering::Relaxed);
1092 self.bytes_forwarded.store(0, Ordering::Relaxed);
1093 self.total_latency_ns.store(0, Ordering::Relaxed);
1094 self.latency_samples.store(0, Ordering::Relaxed);
1095 }
1096
1097 fn record_latency(&self, start: Instant) {
1098 let latency = start.elapsed().as_nanos() as u64;
1099 self.total_latency_ns.fetch_add(latency, Ordering::Relaxed);
1100 self.latency_samples.fetch_add(1, Ordering::Relaxed);
1101 }
1102}
1103
1104/// Result of routing a packet
1105#[derive(Debug)]
1106pub enum RouteAction {
1107 /// Packet is for local delivery
1108 Local(Bytes),
1109 /// Packet was forwarded to next hop
1110 Forwarded(SocketAddr),
1111}
1112
1113/// Router errors
1114#[derive(Debug, Clone, PartialEq, Eq)]
1115pub enum RouterError {
1116 /// Packet too small
1117 PacketTooSmall,
1118 /// Invalid routing header
1119 InvalidHeader,
1120 /// TTL expired
1121 TtlExpired,
1122 /// No route to destination
1123 NoRoute,
1124 /// Queue full
1125 QueueFull,
1126 /// Source is this node — packet looped back via a misconfigured
1127 /// route or hostile peer.
1128 RoutingLoop,
1129}
1130
1131impl std::fmt::Display for RouterError {
1132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1133 match self {
1134 Self::PacketTooSmall => write!(f, "packet too small"),
1135 Self::InvalidHeader => write!(f, "invalid routing header"),
1136 Self::TtlExpired => write!(f, "TTL expired"),
1137 Self::NoRoute => write!(f, "no route to destination"),
1138 Self::QueueFull => write!(f, "queue full"),
1139 Self::RoutingLoop => write!(f, "routing loop (packet returned to its source)"),
1140 }
1141 }
1142}
1143
1144impl std::error::Error for RouterError {}
1145
1146#[cfg(test)]
1147mod tests {
1148 use super::super::protocol::NetHeader;
1149 use super::*;
1150
1151 #[test]
1152 fn test_fair_scheduler_basic() {
1153 let scheduler = FairScheduler::new(2, 16);
1154
1155 // Enqueue packets from different streams
1156 for stream in 0..3 {
1157 for _ in 0..4 {
1158 let packet = QueuedPacket {
1159 data: Bytes::from(vec![0u8; 64]),
1160 dest: "127.0.0.1:9000".parse().unwrap(),
1161 stream_id: stream,
1162 priority: false,
1163 queued_at: Instant::now(),
1164 };
1165 assert!(scheduler.enqueue(packet));
1166 }
1167 }
1168
1169 assert_eq!(scheduler.stream_count(), 3);
1170 assert_eq!(scheduler.total_queued(), 12);
1171
1172 // Dequeue should round-robin
1173 let mut stream_order = Vec::new();
1174 while let Some(packet) = scheduler.dequeue() {
1175 stream_order.push(packet.stream_id);
1176 }
1177
1178 // Should have processed all 12 packets
1179 assert_eq!(stream_order.len(), 12);
1180
1181 // Check fairness: each stream should get ~4 packets
1182 let mut counts = [0; 3];
1183 for stream in stream_order {
1184 counts[stream as usize] += 1;
1185 }
1186 assert_eq!(counts, [4, 4, 4]);
1187 }
1188
1189 #[test]
1190 fn current_depth_tracks_live_backlog_and_returns_to_zero() {
1191 // `total_queued` is cumulative; `current_depth` must reflect the
1192 // live backlog (enqueued − dequeued) across the priority lane and
1193 // per-stream queues, and a rejected enqueue must not bump it.
1194 let scheduler = FairScheduler::new(2, 4); // per-queue cap = 4
1195 let mk = |stream_id: u64, priority: bool| QueuedPacket {
1196 data: Bytes::from(vec![0u8; 32]),
1197 dest: "127.0.0.1:9000".parse().unwrap(),
1198 stream_id,
1199 priority,
1200 queued_at: Instant::now(),
1201 };
1202
1203 assert_eq!(scheduler.current_depth(), 0);
1204
1205 // 3 on a stream + 2 on the priority lane → depth 5.
1206 for _ in 0..3 {
1207 assert!(scheduler.enqueue(mk(1, false)));
1208 }
1209 for _ in 0..2 {
1210 assert!(scheduler.enqueue(mk(0, true)));
1211 }
1212 assert_eq!(scheduler.current_depth(), 5);
1213
1214 // Overflow the priority lane (cap 4): two pushes succeed (already
1215 // had 2), the 5th is rejected and must NOT change depth.
1216 assert!(scheduler.enqueue(mk(0, true)));
1217 assert!(scheduler.enqueue(mk(0, true)));
1218 assert_eq!(scheduler.current_depth(), 7);
1219 assert!(!scheduler.enqueue(mk(0, true))); // priority lane full
1220 assert_eq!(
1221 scheduler.current_depth(),
1222 7,
1223 "rejected enqueue bumped depth"
1224 );
1225
1226 // Drain everything → back to 0.
1227 let mut drained = 0;
1228 while scheduler.dequeue().is_some() {
1229 drained += 1;
1230 }
1231 assert_eq!(drained, 7);
1232 assert_eq!(scheduler.current_depth(), 0, "depth did not return to zero");
1233
1234 // total_queued stays cumulative (7 successful enqueues), unchanged
1235 // by draining — the contrast the new counter exists to provide.
1236 assert_eq!(scheduler.total_queued(), 7);
1237 }
1238
1239 #[test]
1240 fn group_by_dest_partitions_preserving_per_dest_order() {
1241 // The batched drain groups a mixed-destination run into one bucket
1242 // per peer (send_batch is single-target). This pins the two
1243 // invariants the send loop relies on: (1) packets to the same peer
1244 // keep dequeue order; (2) the reuse-clear pattern empties the inner
1245 // vecs while keeping the dest slots for the next drain.
1246 let a: SocketAddr = "127.0.0.1:1".parse().unwrap();
1247 let b: SocketAddr = "127.0.0.1:2".parse().unwrap();
1248 let c: SocketAddr = "127.0.0.1:3".parse().unwrap();
1249 let mk = |n: u8| Bytes::from(vec![n]);
1250
1251 let mut groups: Vec<(SocketAddr, Vec<Bytes>)> = Vec::new();
1252 // Interleaved across three peers.
1253 group_by_dest(&mut groups, a, mk(1));
1254 group_by_dest(&mut groups, b, mk(2));
1255 group_by_dest(&mut groups, a, mk(3));
1256 group_by_dest(&mut groups, c, mk(4));
1257 group_by_dest(&mut groups, b, mk(5));
1258
1259 assert_eq!(groups.len(), 3, "one group per distinct peer");
1260 // Slots appear in first-seen order; packets within a peer keep order.
1261 assert_eq!(groups[0], (a, vec![mk(1), mk(3)]));
1262 assert_eq!(groups[1], (b, vec![mk(2), mk(5)]));
1263 assert_eq!(groups[2], (c, vec![mk(4)]));
1264
1265 // Reuse: clear inner vecs (keep slots), regroup one peer.
1266 for (_, v) in groups.iter_mut() {
1267 v.clear();
1268 }
1269 group_by_dest(&mut groups, c, mk(9));
1270 assert_eq!(groups.len(), 3, "dest slots persist across drains");
1271 assert!(groups[0].1.is_empty() && groups[1].1.is_empty());
1272 assert_eq!(
1273 groups[2].1,
1274 vec![mk(9)],
1275 "existing slot reused, not duplicated"
1276 );
1277 }
1278
1279 #[test]
1280 fn reset_dest_groups_stays_bounded_under_peer_churn() {
1281 // Regression for the unbounded dest-slot cache: simulate many drains,
1282 // each to a brand-new peer (worst-case churn). Without the cap the slot
1283 // set would grow to one entry per peer ever seen (here 500); the
1284 // bounded reset must keep it at `cap + 1`.
1285 const CAP: usize = 64;
1286 let mut groups: Vec<(SocketAddr, Vec<Bytes>)> = Vec::new();
1287 let mut max_len = 0usize;
1288 for port in 0u16..500 {
1289 reset_dest_groups(&mut groups, CAP);
1290 let dest: SocketAddr = format!("127.0.0.1:{}", port + 1).parse().unwrap();
1291 group_by_dest(&mut groups, dest, Bytes::from_static(b"x"));
1292 max_len = max_len.max(groups.len());
1293 }
1294 assert!(
1295 max_len <= CAP + 1,
1296 "dest-group set must stay bounded under churn; peaked at {max_len}",
1297 );
1298 }
1299
1300 #[test]
1301 fn test_fair_scheduler_priority() {
1302 let scheduler = FairScheduler::new(2, 16);
1303
1304 // Enqueue normal packets
1305 for _ in 0..4 {
1306 let packet = QueuedPacket {
1307 data: Bytes::from(vec![0u8; 64]),
1308 dest: "127.0.0.1:9000".parse().unwrap(),
1309 stream_id: 0,
1310 priority: false,
1311 queued_at: Instant::now(),
1312 };
1313 scheduler.enqueue(packet);
1314 }
1315
1316 // Enqueue priority packet
1317 let priority = QueuedPacket {
1318 data: Bytes::from(vec![1u8; 64]),
1319 dest: "127.0.0.1:9000".parse().unwrap(),
1320 stream_id: 1,
1321 priority: true,
1322 queued_at: Instant::now(),
1323 };
1324 scheduler.enqueue(priority);
1325
1326 // Priority should come first
1327 let first = scheduler.dequeue().unwrap();
1328 assert_eq!(first.data[0], 1);
1329 assert!(first.priority);
1330 }
1331
1332 #[test]
1333 fn test_fair_scheduler_no_starvation() {
1334 // Regression: dequeue() always started iterating from the beginning
1335 // of the DashMap, so streams appearing earlier in iteration order
1336 // were systematically preferred, starving later streams.
1337 //
1338 // With the rotation fix, each dequeue() starts from a different
1339 // position, so all streams should get roughly equal service.
1340 let quantum = 1;
1341 let scheduler = FairScheduler::new(quantum, 64);
1342
1343 // Use many streams to make DashMap iteration-order bias visible
1344 let num_streams = 8u64;
1345 let packets_per_stream = 20;
1346
1347 for stream in 0..num_streams {
1348 for _ in 0..packets_per_stream {
1349 let packet = QueuedPacket {
1350 data: Bytes::from(vec![stream as u8; 1]),
1351 dest: "127.0.0.1:9000".parse().unwrap(),
1352 stream_id: stream,
1353 priority: false,
1354 queued_at: Instant::now(),
1355 };
1356 scheduler.enqueue(packet);
1357 }
1358 }
1359
1360 // Dequeue all packets and track which stream each came from
1361 let mut first_half_counts = vec![0u32; num_streams as usize];
1362 let total = num_streams * packets_per_stream as u64;
1363 let half = total / 2;
1364
1365 for i in 0..total {
1366 let packet = scheduler.dequeue().unwrap();
1367 if i < half {
1368 first_half_counts[packet.stream_id as usize] += 1;
1369 }
1370 }
1371
1372 // In the first half of dequeues, every stream should have been served
1373 // at least once. Without rotation, some streams could get 0 service.
1374 for (stream, &count) in first_half_counts.iter().enumerate() {
1375 assert!(
1376 count > 0,
1377 "stream {} was starved in the first half of dequeues ({} of {} packets)",
1378 stream,
1379 count,
1380 half
1381 );
1382 }
1383 }
1384
1385 #[tokio::test]
1386 async fn test_router_creation() {
1387 let config = RouterConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
1388 let router = NetRouter::new(config).await.unwrap();
1389
1390 assert!(!router.is_running());
1391 assert_eq!(router.stats().routes, 0);
1392 }
1393
1394 /// `start()` must spawn at most one dispatch loop. A second
1395 /// call while the first loop is still running would race the
1396 /// scheduler's `dequeue` and produce reordered or duplicate
1397 /// sends.
1398 #[tokio::test]
1399 async fn start_is_idempotent_returns_none_when_already_running() {
1400 let config = RouterConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
1401 let router = NetRouter::new(config).await.unwrap();
1402
1403 let first = router.start();
1404 assert!(first.is_some(), "first start() should spawn a loop");
1405 assert!(router.is_running());
1406
1407 let second = router.start();
1408 assert!(
1409 second.is_none(),
1410 "second start() while running must NOT spawn a duplicate loop",
1411 );
1412
1413 // After stop(), the first loop exits and a fresh start()
1414 // is allowed again.
1415 router.stop();
1416 // Give the loop a tick to observe `running == false`.
1417 if let Some(h) = first {
1418 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1419 }
1420
1421 let third = router.start();
1422 assert!(
1423 third.is_some(),
1424 "start() after stop() should be allowed to spawn again",
1425 );
1426 router.stop();
1427 if let Some(h) = third {
1428 let _ = tokio::time::timeout(Duration::from_secs(2), h).await;
1429 }
1430 }
1431
1432 #[tokio::test]
1433 async fn test_router_routing_table() {
1434 let config = RouterConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
1435 let router = NetRouter::new(config).await.unwrap();
1436
1437 let dest: SocketAddr = "127.0.0.1:9001".parse().unwrap();
1438 router.add_route(0x5678, dest);
1439
1440 assert_eq!(router.routing_table().lookup(0x5678), Some(dest));
1441 assert_eq!(router.stats().routes, 1);
1442 }
1443
1444 #[tokio::test]
1445 async fn test_router_extracts_stream_id_at_correct_offset() {
1446 // Regression: stream_id was read from bytes 8..16 (inside the nonce)
1447 // instead of bytes 40..48 where it actually lives in the Net header.
1448 let local_id = 0x1234u64;
1449 let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1450 let router = NetRouter::new(config).await.unwrap();
1451
1452 let expected_stream_id: u64 = 0xDEAD_BEEF_CAFE_BABEu64;
1453
1454 // Build routing header pointing to local_id so we get RouteAction::Local
1455 let routing = RoutingHeader::new(local_id, 0x5678, 4);
1456 let routing_bytes = routing.to_bytes();
1457
1458 // Build a Net header with a known stream_id
1459 let net = NetHeader::new(
1460 0xAAAA, // session_id
1461 expected_stream_id, // stream_id
1462 1, // sequence
1463 [0u8; 12], // nonce
1464 0, // payload_len
1465 0, // event_count
1466 super::super::protocol::PacketFlags::NONE,
1467 );
1468 let net_bytes = net.to_bytes();
1469
1470 // Concatenate routing header + Net header
1471 let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1472 packet.extend_from_slice(&routing_bytes);
1473 packet.extend_from_slice(&net_bytes);
1474
1475 let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1476 let _ = router.route_packet(packet.freeze(), from);
1477
1478 // The stream stats should be keyed by the correct stream_id
1479 let stats = router
1480 .routing_table()
1481 .get_stream_stats(expected_stream_id)
1482 .expect("stream stats recorded for the routed packet");
1483 assert_eq!(
1484 stats.get_packets_in(),
1485 1,
1486 "stream stats should record 1 packet for stream_id 0x{:X}",
1487 expected_stream_id
1488 );
1489 }
1490
1491 /// Pin: a packet whose `src_id` matches this router's
1492 /// `local_id` must be dropped immediately on receipt with
1493 /// `RoutingLoop` instead of being forwarded again. Pre-fix
1494 /// the router happily looped the packet back along the
1495 /// route, only breaking the cycle on TTL exhaustion (so
1496 /// every loop wasted up to 64 hops × bandwidth before
1497 /// dying).
1498 #[tokio::test]
1499 async fn route_packet_drops_when_src_id_is_local() {
1500 let local_id = 0x1234u64;
1501 let dest_id = 0x9999u64;
1502 let dest_addr: SocketAddr = "127.0.0.2:6000".parse().unwrap();
1503
1504 let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1505 let router = NetRouter::new(config).await.unwrap();
1506 router.routing_table().add_route(dest_id, dest_addr);
1507
1508 // RoutingHeader stores src_id as u32 — the low 32 bits of
1509 // the local node id are what identifies us on the wire.
1510 let routing = RoutingHeader::new(dest_id, local_id as u32, 16);
1511 let routing_bytes = routing.to_bytes();
1512
1513 let net = NetHeader::new(
1514 0xAAAA,
1515 0xBEEF,
1516 1,
1517 [0u8; 12],
1518 0,
1519 0,
1520 super::super::protocol::PacketFlags::NONE,
1521 );
1522 let net_bytes = net.to_bytes();
1523
1524 let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1525 packet.extend_from_slice(&routing_bytes);
1526 packet.extend_from_slice(&net_bytes);
1527
1528 let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1529 let result = router.route_packet(packet.freeze(), from);
1530 match result {
1531 Err(RouterError::RoutingLoop) => {}
1532 other => panic!(
1533 "expected RoutingLoop for src_id == local_id, got {:?}",
1534 other
1535 ),
1536 }
1537
1538 // Counters: dropped += 1, forwarded unchanged.
1539 let stats = router.stats();
1540 assert_eq!(
1541 stats.packets_dropped, 1,
1542 "looping packet must increment packets_dropped"
1543 );
1544 assert_eq!(
1545 stats.packets_forwarded, 0,
1546 "looping packet must NOT be forwarded"
1547 );
1548 }
1549
1550 /// Regression for BUG_AUDIT_2026_04_30_CORE.md #119: pre-fix
1551 /// `route_packet` decremented TTL via `forward()` and queued
1552 /// the packet for the next hop without checking whether the
1553 /// post-decrement TTL was 0. The next hop received a TTL=0
1554 /// packet and dropped it on its own `is_expired()` check —
1555 /// wasting one forward + bandwidth + queue slot per
1556 /// last-hop packet. Post-fix the router drops here with
1557 /// `TtlExpired` if `forward()` made the TTL 0.
1558 #[tokio::test]
1559 async fn route_packet_drops_when_forward_makes_ttl_zero() {
1560 let local_id = 0x1234u64;
1561 let dest_id = 0x9999u64;
1562 let dest_addr: SocketAddr = "127.0.0.2:6000".parse().unwrap();
1563
1564 let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1565 let router = NetRouter::new(config).await.unwrap();
1566 router.routing_table().add_route(dest_id, dest_addr);
1567
1568 // TTL = 1 — `forward()` will decrement to 0. Pre-fix
1569 // this would still queue the packet (RouteAction::Forwarded);
1570 // post-fix it's dropped with TtlExpired.
1571 let routing = RoutingHeader::new(dest_id, 0x5678, 1);
1572 let routing_bytes = routing.to_bytes();
1573
1574 let net = NetHeader::new(
1575 0xAAAA,
1576 0xBEEF,
1577 1,
1578 [0u8; 12],
1579 0,
1580 0,
1581 super::super::protocol::PacketFlags::NONE,
1582 );
1583 let net_bytes = net.to_bytes();
1584
1585 let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1586 packet.extend_from_slice(&routing_bytes);
1587 packet.extend_from_slice(&net_bytes);
1588
1589 let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1590 let result = router.route_packet(packet.freeze(), from);
1591
1592 match result {
1593 Err(RouterError::TtlExpired) => {} // expected
1594 Ok(action) => panic!(
1595 "post-fix must drop with TtlExpired, not forward (got {:?})",
1596 action
1597 ),
1598 Err(e) => panic!("unexpected error: {:?}", e),
1599 }
1600 }
1601
1602 /// Sanity: a TTL=2 packet should still forward (decrements
1603 /// to 1 — non-zero, next hop accepts).
1604 #[tokio::test]
1605 async fn route_packet_forwards_when_ttl_remains_positive_after_decrement() {
1606 let local_id = 0x1234u64;
1607 let dest_id = 0x9999u64;
1608 let dest_addr: SocketAddr = "127.0.0.2:6000".parse().unwrap();
1609
1610 let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1611 let router = NetRouter::new(config).await.unwrap();
1612 router.routing_table().add_route(dest_id, dest_addr);
1613
1614 let routing = RoutingHeader::new(dest_id, 0x5678, 2);
1615 let routing_bytes = routing.to_bytes();
1616 let net = NetHeader::new(
1617 0xAAAA,
1618 0xBEEF,
1619 1,
1620 [0u8; 12],
1621 0,
1622 0,
1623 super::super::protocol::PacketFlags::NONE,
1624 );
1625 let net_bytes = net.to_bytes();
1626 let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1627 packet.extend_from_slice(&routing_bytes);
1628 packet.extend_from_slice(&net_bytes);
1629 let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1630 let result = router.route_packet(packet.freeze(), from);
1631 match result {
1632 Ok(RouteAction::Forwarded(addr)) => assert_eq!(addr, dest_addr),
1633 other => panic!("expected Forwarded, got {:?}", other),
1634 }
1635 }
1636
1637 /// Regression: a packet that fails any drop gate (loop
1638 /// suppression, pre-decrement TTL, post-decrement TTL) must
1639 /// NOT increment the per-stream `packets_in` counter — only
1640 /// `packets_dropped`. Pre-fix `record_in` fired immediately
1641 /// after parse, so a dropped packet incremented BOTH stream
1642 /// counters; a dashboard computing `delivery_rate =
1643 /// packets_out / packets_in` would see drops inflate the
1644 /// denominator without affecting the numerator, masking
1645 /// healthy networks behind apparent delivery loss.
1646 ///
1647 /// This pins the post-decrement TTL drop path (the audit's
1648 /// stated trigger), but the structural change applies to
1649 /// every drop path; the per-stream invariant is now
1650 /// "packets_in counts only delivered or forwarded packets."
1651 #[tokio::test]
1652 async fn ttl_drop_does_not_double_count_packets_in_for_stream() {
1653 let local_id = 0x1234u64;
1654 let dest_id = 0x9999u64;
1655 let dest_addr: SocketAddr = "127.0.0.2:6000".parse().unwrap();
1656
1657 let config = RouterConfig::new(local_id, "127.0.0.1:0".parse().unwrap());
1658 let router = NetRouter::new(config).await.unwrap();
1659 router.routing_table().add_route(dest_id, dest_addr);
1660
1661 // TTL = 1 hits the post-decrement drop path. Same
1662 // setup as `route_packet_drops_when_forward_makes_ttl_zero`.
1663 let routing = RoutingHeader::new(dest_id, 0x5678, 1);
1664 let routing_bytes = routing.to_bytes();
1665 // NetHeader::new params: (session_id, stream_id, sequence, nonce, payload_len, event_count, flags)
1666 let session_id = 0xAAAAu64;
1667 let stream_id = 0x4242u64;
1668 let net = NetHeader::new(
1669 session_id,
1670 stream_id,
1671 1,
1672 [0u8; 12],
1673 0,
1674 0,
1675 super::super::protocol::PacketFlags::NONE,
1676 );
1677 let net_bytes = net.to_bytes();
1678 let mut packet = BytesMut::with_capacity(ROUTING_HEADER_SIZE + HEADER_SIZE);
1679 packet.extend_from_slice(&routing_bytes);
1680 packet.extend_from_slice(&net_bytes);
1681 let from: SocketAddr = "127.0.0.1:5000".parse().unwrap();
1682
1683 let result = router.route_packet(packet.freeze(), from);
1684 assert!(matches!(result, Err(RouterError::TtlExpired)));
1685
1686 let stats = router
1687 .routing_table()
1688 .get_stream_stats(stream_id)
1689 .expect("stream stats present for the dropped packet");
1690 assert_eq!(
1691 stats.get_packets_in(),
1692 0,
1693 "regression: a dropped packet must NOT increment per-stream \
1694 packets_in. Pre-fix this counter was incremented before \
1695 the TTL/loop checks, so drops double-counted as both \
1696 ingressed and dropped — masking delivery rate dashboards."
1697 );
1698 assert_eq!(
1699 stats.get_drops(),
1700 1,
1701 "drop must still increment per-stream packets_dropped"
1702 );
1703 }
1704
1705 #[test]
1706 fn test_regression_fair_scheduler_cleanup_called() {
1707 // Regression: FairScheduler never removed empty stream queues from its
1708 // DashMap. After many unique stream_ids passed through, the map grew
1709 // without bound, causing O(n) iteration in dequeue() where n is the
1710 // number of *ever-seen* streams rather than *active* streams. This
1711 // degraded dequeue latency over time.
1712 //
1713 // Fix: dequeue() calls cleanup_empty() every 1024 iterations,
1714 // removing streams whose queues have been fully drained.
1715 let scheduler = FairScheduler::new(4, 16);
1716
1717 // Enqueue packets across many unique streams
1718 let num_streams = 200u64;
1719 for stream in 0..num_streams {
1720 let packet = QueuedPacket {
1721 data: Bytes::from(vec![0u8; 8]),
1722 dest: "127.0.0.1:9000".parse().unwrap(),
1723 stream_id: stream,
1724 priority: false,
1725 queued_at: Instant::now(),
1726 };
1727 assert!(scheduler.enqueue(packet));
1728 }
1729
1730 assert_eq!(scheduler.stream_count(), num_streams as usize);
1731
1732 // Drain all packets
1733 let mut drained = 0;
1734 while scheduler.dequeue().is_some() {
1735 drained += 1;
1736 }
1737 assert_eq!(drained, num_streams as usize);
1738
1739 // The cleanup triggers every 1024 dequeue calls. We need enough
1740 // dequeue calls (even returning None) to cross the 1024 boundary.
1741 // We already did `num_streams` dequeues above; do more no-op
1742 // dequeues to push past the threshold.
1743 for _ in 0..(1025 - drained) {
1744 let _ = scheduler.dequeue();
1745 }
1746
1747 // After cleanup, all empty stream queues should have been removed
1748 assert_eq!(
1749 scheduler.stream_count(),
1750 0,
1751 "empty stream queues must be cleaned up after enough dequeue \
1752 iterations to prevent unbounded DashMap growth"
1753 );
1754 }
1755
1756 #[test]
1757 fn test_regression_scheduler_sees_streams_added_after_quantum_exhaustion() {
1758 // Regression: dequeue() collected stream keys once, then reused
1759 // the stale snapshot for the retry loop after quantum reset.
1760 // Streams added between the two loops were invisible until the
1761 // next dequeue() call, causing extra latency.
1762 //
1763 // Fix: re-collect keys before the retry loop.
1764 let scheduler = FairScheduler::new(1, 16);
1765
1766 // Stream 0 with 1 packet (quantum = 1, so first pass drains it)
1767 scheduler.enqueue(QueuedPacket {
1768 data: Bytes::from_static(b"s0"),
1769 dest: "127.0.0.1:9000".parse().unwrap(),
1770 stream_id: 0,
1771 priority: false,
1772 queued_at: Instant::now(),
1773 });
1774
1775 // Drain stream 0's quantum
1776 let pkt = scheduler.dequeue().unwrap();
1777 assert_eq!(pkt.stream_id, 0);
1778
1779 // Now add stream 1 while the scheduler is "between rounds"
1780 scheduler.enqueue(QueuedPacket {
1781 data: Bytes::from_static(b"s1"),
1782 dest: "127.0.0.1:9000".parse().unwrap(),
1783 stream_id: 1,
1784 priority: false,
1785 queued_at: Instant::now(),
1786 });
1787
1788 // Next dequeue should find stream 1
1789 let pkt = scheduler.dequeue().unwrap();
1790 assert_eq!(
1791 pkt.stream_id, 1,
1792 "newly added stream should be visible after quantum reset"
1793 );
1794 }
1795
1796 /// PERF_AUDIT §3.1 regression: the active-stream snapshot
1797 /// must mutate exactly when the stream set does:
1798 /// - enqueue → new stream: rebuild (ArcSwap pointer changes).
1799 /// - enqueue → existing stream: NO rebuild (pointer stable).
1800 /// - set_stream_weight → new stream: rebuild.
1801 /// - set_stream_weight → existing stream weight change: NO rebuild.
1802 /// - cleanup_empty actually removes something: rebuild.
1803 /// - cleanup_empty with nothing to remove: NO rebuild.
1804 ///
1805 /// Pre-fix dequeue collected a fresh Vec<u64> from the DashMap
1806 /// on EVERY call (twice on the quantum-reset path). A regression
1807 /// that drops the snapshot-store from any of the three mutation
1808 /// sites would silently leave dequeue iterating a stale set.
1809 #[test]
1810 fn active_stream_snapshot_tracks_stream_set_changes() {
1811 let scheduler = FairScheduler::new(2, 16);
1812 let initial_ptr = Arc::as_ptr(&scheduler.active_streams.load_full());
1813 assert!(scheduler.active_streams.load_full().is_empty());
1814
1815 // enqueue → new stream → rebuild.
1816 scheduler.enqueue(QueuedPacket {
1817 data: Bytes::from_static(b"a"),
1818 dest: "127.0.0.1:9000".parse().unwrap(),
1819 stream_id: 7,
1820 priority: false,
1821 queued_at: Instant::now(),
1822 });
1823 let after_new = scheduler.active_streams.load_full();
1824 assert_ne!(
1825 Arc::as_ptr(&after_new),
1826 initial_ptr,
1827 "vacant-arm enqueue must swap a fresh snapshot in"
1828 );
1829 assert_eq!(&*after_new, &[7u64]);
1830
1831 // enqueue → existing stream → snapshot stable.
1832 let before_existing = Arc::as_ptr(&scheduler.active_streams.load_full());
1833 scheduler.enqueue(QueuedPacket {
1834 data: Bytes::from_static(b"b"),
1835 dest: "127.0.0.1:9000".parse().unwrap(),
1836 stream_id: 7,
1837 priority: false,
1838 queued_at: Instant::now(),
1839 });
1840 assert_eq!(
1841 Arc::as_ptr(&scheduler.active_streams.load_full()),
1842 before_existing,
1843 "occupied-arm enqueue must NOT rebuild the snapshot — a \
1844 regression here defeats the §3.1 fix's steady-state win"
1845 );
1846
1847 // set_stream_weight → new stream → rebuild.
1848 let before_weight_new = Arc::as_ptr(&scheduler.active_streams.load_full());
1849 scheduler.set_stream_weight(11, 3);
1850 let after_weight_new = scheduler.active_streams.load_full();
1851 assert_ne!(
1852 Arc::as_ptr(&after_weight_new),
1853 before_weight_new,
1854 "set_stream_weight on a new stream must rebuild the snapshot"
1855 );
1856 let mut sorted: Vec<u64> = after_weight_new.iter().copied().collect();
1857 sorted.sort_unstable();
1858 assert_eq!(sorted, vec![7u64, 11]);
1859
1860 // set_stream_weight → existing stream → stable.
1861 let before_weight_existing = Arc::as_ptr(&scheduler.active_streams.load_full());
1862 scheduler.set_stream_weight(11, 5);
1863 assert_eq!(
1864 Arc::as_ptr(&scheduler.active_streams.load_full()),
1865 before_weight_existing,
1866 "set_stream_weight on existing stream must NOT rebuild"
1867 );
1868
1869 // cleanup_empty with nothing to remove (stream 7 has pending
1870 // packets) → stable.
1871 let before_clean_noop = Arc::as_ptr(&scheduler.active_streams.load_full());
1872 let removed = scheduler.cleanup_empty();
1873 assert_eq!(
1874 removed, 1,
1875 "stream 11 has no packets and only DashMap refs it"
1876 );
1877 let after_clean = scheduler.active_streams.load_full();
1878 assert_ne!(
1879 Arc::as_ptr(&after_clean),
1880 before_clean_noop,
1881 "cleanup_empty that actually removed an entry MUST rebuild"
1882 );
1883 assert_eq!(&*after_clean, &[7u64]);
1884
1885 // cleanup_empty with nothing removable (stream 7 still has
1886 // packets queued) → stable pointer.
1887 let before_clean_stable = Arc::as_ptr(&scheduler.active_streams.load_full());
1888 let removed = scheduler.cleanup_empty();
1889 assert_eq!(removed, 0);
1890 assert_eq!(
1891 Arc::as_ptr(&scheduler.active_streams.load_full()),
1892 before_clean_stable,
1893 "cleanup_empty that removed nothing must NOT rebuild"
1894 );
1895 }
1896
1897 /// PERF_AUDIT §3.1 lost-wakeup regression: concurrent snapshot
1898 /// rebuilds (enqueue vacant-arm racing cleanup_empty's rebuild)
1899 /// must never leave the snapshot out of sync with the stream
1900 /// map once activity quiesces. Without the version-counter
1901 /// retry, a rebuild that collected keys BEFORE a concurrent
1902 /// insert could store AFTER the inserting thread's own rebuild,
1903 /// wiping the new stream from the snapshot; with no further
1904 /// stream-set change the stream's packets would be stranded
1905 /// forever (occupied-arm enqueues don't rebuild, and cleanup
1906 /// never evicts a non-empty queue). This hammers the race and
1907 /// asserts the quiescent invariant snapshot == live stream set.
1908 #[test]
1909 fn active_stream_snapshot_survives_concurrent_rebuild_races() {
1910 use std::thread;
1911
1912 let scheduler = Arc::new(FairScheduler::new(2, 16));
1913 let mut handles = Vec::new();
1914
1915 // Writers: continuously create fresh streams (vacant-arm
1916 // rebuilds) and drain packets so queues trend toward empty
1917 // and stay eligible for cleanup eviction (cleanup rebuilds).
1918 for t in 0..4u64 {
1919 let sched = Arc::clone(&scheduler);
1920 handles.push(thread::spawn(move || {
1921 for i in 0..500u64 {
1922 let stream_id = t * 100_000 + i;
1923 sched.enqueue(QueuedPacket {
1924 data: Bytes::from_static(b"x"),
1925 dest: "127.0.0.1:9000".parse().unwrap(),
1926 stream_id,
1927 priority: false,
1928 queued_at: Instant::now(),
1929 });
1930 let _ = sched.dequeue();
1931 }
1932 }));
1933 }
1934
1935 // Dedicated cleanup hammer: maximizes rebuild/insert overlap.
1936 {
1937 let sched = Arc::clone(&scheduler);
1938 handles.push(thread::spawn(move || {
1939 for _ in 0..2_000 {
1940 sched.cleanup_empty();
1941 }
1942 }));
1943 }
1944
1945 for h in handles {
1946 h.join().unwrap();
1947 }
1948
1949 // Quiescent invariant: snapshot == live stream set. A lost
1950 // rebuild leaves a stream in the map (with packets queued)
1951 // that the snapshot — and therefore dequeue — never sees.
1952 let mut snapshot: Vec<u64> = scheduler
1953 .active_streams
1954 .load_full()
1955 .iter()
1956 .copied()
1957 .collect();
1958 snapshot.sort_unstable();
1959 let mut live: Vec<u64> = scheduler.streams.iter().map(|e| *e.key()).collect();
1960 live.sort_unstable();
1961 assert_eq!(
1962 snapshot, live,
1963 "active-stream snapshot diverged from the stream map after \
1964 concurrent enqueue/cleanup churn — version-counter retry \
1965 protocol failed to repair a racing rebuild"
1966 );
1967 }
1968
1969 /// Fairness weight: a weight-4 stream should ship 4× the packets per
1970 /// round of a weight-1 stream. With both queues full and one full
1971 /// round of dequeues, we should see ~4:1 ratio before any round
1972 /// reset fires.
1973 #[test]
1974 fn test_fair_scheduler_respects_stream_weight() {
1975 // Base quantum = 1: every stream with weight 1 gets 1 packet per
1976 // round; a weight-4 stream gets 4 packets per round.
1977 let scheduler = FairScheduler::new(1, 64);
1978
1979 scheduler.set_stream_weight(1, 1);
1980 scheduler.set_stream_weight(2, 4);
1981
1982 let dest: SocketAddr = "127.0.0.1:9999".parse().unwrap();
1983 // Fill both streams with 8 packets each.
1984 for stream_id in [1u64, 2u64] {
1985 for _ in 0..8 {
1986 scheduler.enqueue(QueuedPacket {
1987 data: Bytes::from_static(&[0u8; 16]),
1988 dest,
1989 stream_id,
1990 priority: false,
1991 queued_at: Instant::now(),
1992 });
1993 }
1994 }
1995
1996 // Dequeue 5 packets. The weight-4 stream should ship at least
1997 // 4 of those 5 in the first round (its quantum is 4; stream 1's
1998 // quantum is 1). Depending on round-robin start, stream 1 may
1999 // ship 1 packet in the middle.
2000 let mut counts = [0u64; 3];
2001 for _ in 0..5 {
2002 if let Some(pkt) = scheduler.dequeue() {
2003 counts[pkt.stream_id as usize] += 1;
2004 }
2005 }
2006 assert_eq!(counts[0], 0);
2007 assert!(
2008 counts[2] >= 4,
2009 "weight-4 stream should ship >= 4 packets in 5 dequeues; \
2010 saw weight-1={} weight-4={}",
2011 counts[1],
2012 counts[2]
2013 );
2014 assert!(
2015 counts[1] <= 1,
2016 "weight-1 stream should ship <= 1 packet before round reset; \
2017 saw {}",
2018 counts[1]
2019 );
2020 }
2021
2022 /// Anti-starvation: a bulk transfer with a huge backlog must NOT
2023 /// head-of-line-block a competing interactive stream of equal
2024 /// weight. This is the property that justifies routing blob
2025 /// transfers through the scheduler (FairScheduler transport plan) —
2026 /// a 2 MiB transfer (~260 packets) queued ahead of a few
2027 /// interactive packets cannot make the interactive flow wait for
2028 /// the whole transfer to drain; round-robin interleaves them so
2029 /// every interactive packet is serviced within a bounded number of
2030 /// dequeues regardless of how deep the bulk backlog is.
2031 #[test]
2032 fn bulk_backlog_does_not_starve_an_equal_weight_interactive_stream() {
2033 let scheduler = FairScheduler::new(1, 512);
2034 let bulk = 1u64;
2035 let interactive = 2u64;
2036 scheduler.set_stream_weight(bulk, 1);
2037 scheduler.set_stream_weight(interactive, 1);
2038
2039 let dest: SocketAddr = "127.0.0.1:9999".parse().unwrap();
2040 // A transfer-scale backlog on the bulk stream …
2041 let bulk_backlog = 260usize;
2042 for _ in 0..bulk_backlog {
2043 scheduler.enqueue(QueuedPacket {
2044 data: Bytes::from_static(&[0u8; 16]),
2045 dest,
2046 stream_id: bulk,
2047 priority: false,
2048 queued_at: Instant::now(),
2049 });
2050 }
2051 // … and just a handful of interactive packets behind it.
2052 let interactive_count = 4usize;
2053 for _ in 0..interactive_count {
2054 scheduler.enqueue(QueuedPacket {
2055 data: Bytes::from_static(&[0u8; 16]),
2056 dest,
2057 stream_id: interactive,
2058 priority: false,
2059 queued_at: Instant::now(),
2060 });
2061 }
2062
2063 // Drain, recording the dequeue index at which each interactive
2064 // packet emerges.
2065 let mut last_interactive_at = 0usize;
2066 let mut seen_interactive = 0usize;
2067 for idx in 0..(bulk_backlog + interactive_count) {
2068 if let Some(pkt) = scheduler.dequeue() {
2069 if pkt.stream_id == interactive {
2070 seen_interactive += 1;
2071 last_interactive_at = idx;
2072 }
2073 }
2074 }
2075
2076 assert_eq!(
2077 seen_interactive, interactive_count,
2078 "every interactive packet must be delivered"
2079 );
2080 // With quantum 1 and equal weight, round-robin alternates one
2081 // bulk, one interactive per round, so the k-th interactive
2082 // packet emerges by dequeue ~2k — bounded by 2×count, utterly
2083 // independent of the 260-deep bulk backlog. If the scheduler
2084 // had FIFO/head-of-line semantics, the last interactive packet
2085 // would not appear until index ~260.
2086 assert!(
2087 last_interactive_at <= 2 * interactive_count,
2088 "interactive stream starved: last interactive packet at dequeue {} \
2089 (bulk backlog {}); fair interleaving should deliver it by ~{}",
2090 last_interactive_at,
2091 bulk_backlog,
2092 2 * interactive_count
2093 );
2094 }
2095
2096 /// Regression: BUG_REPORT.md #31 — `round_robin_idx` was
2097 /// `fetch_add(1)`-ed unconditionally on every `dequeue` call,
2098 /// so polls that returned `None` (no streams have packets)
2099 /// still rotated the cursor. Under bursty mixed traffic this
2100 /// biased service away from streams that became non-empty
2101 /// mid-pass. The fix advances the cursor only when a packet
2102 /// is actually popped.
2103 #[test]
2104 fn round_robin_idx_advances_only_on_successful_pop() {
2105 let scheduler = FairScheduler::new(1, 64);
2106
2107 // Empty scheduler: many polls must NOT advance the index.
2108 let initial = scheduler.round_robin_idx.load(Ordering::Relaxed);
2109 for _ in 0..1000 {
2110 assert!(scheduler.dequeue().is_none());
2111 }
2112 assert_eq!(
2113 scheduler.round_robin_idx.load(Ordering::Relaxed),
2114 initial,
2115 "empty-poll dequeues must not advance round_robin_idx (#31)"
2116 );
2117
2118 // Now enqueue + drain. Each successful pop should advance
2119 // the index by exactly 1.
2120 for stream in 0..5u64 {
2121 scheduler.enqueue(QueuedPacket {
2122 data: Bytes::from(vec![0u8; 8]),
2123 dest: "127.0.0.1:9000".parse().unwrap(),
2124 stream_id: stream,
2125 priority: false,
2126 queued_at: Instant::now(),
2127 });
2128 }
2129 let before_drain = scheduler.round_robin_idx.load(Ordering::Relaxed);
2130 let mut popped = 0;
2131 while scheduler.dequeue().is_some() {
2132 popped += 1;
2133 }
2134 let after_drain = scheduler.round_robin_idx.load(Ordering::Relaxed);
2135 assert_eq!(popped, 5);
2136 assert_eq!(
2137 after_drain.wrapping_sub(before_drain),
2138 popped as u64,
2139 "round_robin_idx must advance by exactly the number of successful pops (#31)"
2140 );
2141 }
2142}