net-mesh 0.23.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
//! Net Proxy for zero-copy multi-hop packet forwarding.
//!
//! The proxy handles:
//! - Zero-copy packet forwarding (no decryption)
//! - TTL enforcement and hop counting
//! - Per-hop latency tracking
//! - Bandwidth metering

use bytes::{Bytes, BytesMut};
use dashmap::DashMap;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::net::UdpSocket;

use super::route::{RoutingHeader, ROUTING_HEADER_SIZE};

/// Proxy configuration
#[derive(Debug, Clone)]
pub struct ProxyConfig {
    /// Local node ID
    pub local_id: u64,
    /// Bind address
    pub bind_addr: SocketAddr,
    /// Maximum packet size
    pub max_packet_size: usize,
    /// Enable hop latency tracking
    pub track_latency: bool,
}

impl Default for ProxyConfig {
    fn default() -> Self {
        Self {
            local_id: 0,
            bind_addr: SocketAddr::from(([0, 0, 0, 0], 0)),
            max_packet_size: 65535,
            track_latency: true,
        }
    }
}

impl ProxyConfig {
    /// Create a new proxy config
    pub fn new(local_id: u64, bind_addr: SocketAddr) -> Self {
        Self {
            local_id,
            bind_addr,
            ..Default::default()
        }
    }
}

/// Per-hop statistics
#[derive(Debug, Default)]
pub struct HopStats {
    /// Packets forwarded
    pub packets_forwarded: AtomicU64,
    /// Packets dropped (TTL, no route, etc.)
    pub packets_dropped: AtomicU64,
    /// Bytes forwarded
    pub bytes_forwarded: AtomicU64,
    /// Total forwarding latency (nanoseconds)
    total_latency_ns: AtomicU64,
    /// Latency sample count
    latency_samples: AtomicU64,
}

impl HopStats {
    /// Create new hop stats
    pub fn new() -> Self {
        Self::default()
    }

    /// Record a forwarded packet
    #[inline]
    pub fn record_forward(&self, bytes: u64, latency_ns: u64) {
        self.packets_forwarded.fetch_add(1, Ordering::Relaxed);
        self.bytes_forwarded.fetch_add(bytes, Ordering::Relaxed);
        if latency_ns > 0 {
            self.total_latency_ns
                .fetch_add(latency_ns, Ordering::Relaxed);
            self.latency_samples.fetch_add(1, Ordering::Relaxed);
        }
    }

    /// Record a dropped packet
    #[inline]
    pub fn record_drop(&self) {
        self.packets_dropped.fetch_add(1, Ordering::Relaxed);
    }

    /// Get average forwarding latency in nanoseconds
    pub fn avg_latency_ns(&self) -> u64 {
        let samples = self.latency_samples.load(Ordering::Relaxed);
        if samples == 0 {
            return 0;
        }
        self.total_latency_ns.load(Ordering::Relaxed) / samples
    }

    /// Get forwarded packet count
    pub fn forwarded(&self) -> u64 {
        self.packets_forwarded.load(Ordering::Relaxed)
    }

    /// Get dropped packet count
    pub fn dropped(&self) -> u64 {
        self.packets_dropped.load(Ordering::Relaxed)
    }
}

/// Proxy statistics
#[derive(Debug, Clone, Default)]
pub struct ProxyStats {
    /// Total packets received
    pub packets_received: u64,
    /// Total packets forwarded
    pub packets_forwarded: u64,
    /// Total packets dropped
    pub packets_dropped: u64,
    /// Total bytes forwarded
    pub bytes_forwarded: u64,
    /// Average forwarding latency (nanoseconds)
    pub avg_latency_ns: u64,
    /// Active routes
    pub routes: usize,
}

/// Proxy errors
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProxyError {
    /// Packet too small to contain routing header
    PacketTooSmall,
    /// Invalid routing header
    InvalidHeader,
    /// TTL expired
    TtlExpired,
    /// No route to destination
    NoRoute,
    /// Send failed
    SendFailed(String),
}

impl std::fmt::Display for ProxyError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::PacketTooSmall => write!(f, "packet too small"),
            Self::InvalidHeader => write!(f, "invalid routing header"),
            Self::TtlExpired => write!(f, "TTL expired"),
            Self::NoRoute => write!(f, "no route to destination"),
            Self::SendFailed(e) => write!(f, "send failed: {}", e),
        }
    }
}

impl std::error::Error for ProxyError {}

/// Result of forwarding a packet
#[derive(Debug)]
pub enum ForwardResult {
    /// Packet forwarded to next hop
    Forwarded {
        /// Next hop address
        next_hop: SocketAddr,
        /// The packet with updated routing header, ready to send
        packet: Bytes,
        /// Forwarding latency in nanoseconds
        latency_ns: u64,
    },
    /// Packet is for local delivery
    Local(Bytes),
    /// Packet dropped
    Dropped(ProxyError),
}

/// Net Proxy for zero-copy multi-hop forwarding.
///
/// The proxy forwards packets without decrypting the payload,
/// only reading and updating the routing header.
pub struct NetProxy {
    /// Configuration
    #[allow(dead_code)]
    config: ProxyConfig,
    /// UDP socket
    socket: Arc<UdpSocket>,
    /// Next-hop routing table (dest_id -> next_hop)
    next_hop: DashMap<u64, SocketAddr>,
    /// Local node ID
    local_id: u64,
    /// Per-destination hop stats
    hop_stats: DashMap<u64, HopStats>,
    /// Global stats
    packets_received: AtomicU64,
    packets_forwarded: AtomicU64,
    packets_dropped: AtomicU64,
    bytes_forwarded: AtomicU64,
    total_latency_ns: AtomicU64,
    latency_samples: AtomicU64,
}

impl NetProxy {
    /// Create a new proxy
    pub async fn new(config: ProxyConfig) -> std::io::Result<Self> {
        let socket = UdpSocket::bind(config.bind_addr).await?;
        let local_id = config.local_id;

        Ok(Self {
            config,
            socket: Arc::new(socket),
            next_hop: DashMap::new(),
            local_id,
            hop_stats: DashMap::new(),
            packets_received: AtomicU64::new(0),
            packets_forwarded: AtomicU64::new(0),
            packets_dropped: AtomicU64::new(0),
            bytes_forwarded: AtomicU64::new(0),
            total_latency_ns: AtomicU64::new(0),
            latency_samples: AtomicU64::new(0),
        })
    }

    /// Get local address
    pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
        self.socket.local_addr()
    }

    /// Add a route to the next-hop table
    pub fn add_route(&self, dest_id: u64, next_hop: SocketAddr) {
        self.next_hop.insert(dest_id, next_hop);
    }

    /// Remove a route.
    ///
    /// Drops both the next_hop entry and the matching `hop_stats`
    /// entry. Removing only the former would let `hop_stats` grow
    /// indefinitely (memory ∝ total-distinct-dest-ids-ever-seen,
    /// not active dest count) for a peer churning through many
    /// destinations.
    pub fn remove_route(&self, dest_id: u64) {
        self.next_hop.remove(&dest_id);
        self.hop_stats.remove(&dest_id);
    }

    /// Lookup next hop for destination
    pub fn lookup(&self, dest_id: u64) -> Option<SocketAddr> {
        self.next_hop.get(&dest_id).map(|r| *r)
    }

    /// Forward a packet (zero-copy, no decryption).
    ///
    /// This is the hot path - it only reads and updates the routing header,
    /// then forwards the entire packet to the next hop.
    pub fn forward(&self, data: Bytes) -> ForwardResult {
        let start = Instant::now();
        let len = data.len() as u64;

        self.packets_received.fetch_add(1, Ordering::Relaxed);

        // Validate packet size
        if data.len() < ROUTING_HEADER_SIZE {
            self.packets_dropped.fetch_add(1, Ordering::Relaxed);
            return ForwardResult::Dropped(ProxyError::PacketTooSmall);
        }

        // Parse routing header (only first 16 bytes)
        let header = match RoutingHeader::from_bytes(&data[..ROUTING_HEADER_SIZE]) {
            Some(h) => h,
            None => {
                self.packets_dropped.fetch_add(1, Ordering::Relaxed);
                return ForwardResult::Dropped(ProxyError::InvalidHeader);
            }
        };

        // Check if local delivery
        if header.dest_id == self.local_id {
            return ForwardResult::Local(data.slice(ROUTING_HEADER_SIZE..));
        }

        // Check TTL
        if header.is_expired() {
            self.packets_dropped.fetch_add(1, Ordering::Relaxed);
            self.record_hop_drop(header.dest_id);
            return ForwardResult::Dropped(ProxyError::TtlExpired);
        }

        // Lookup next hop
        let next_hop = match self.lookup(header.dest_id) {
            Some(addr) => addr,
            None => {
                self.packets_dropped.fetch_add(1, Ordering::Relaxed);
                self.record_hop_drop(header.dest_id);
                return ForwardResult::Dropped(ProxyError::NoRoute);
            }
        };

        // Update routing header (decrement TTL, increment hop count)
        let mut new_header = header;
        new_header.forward();
        // Drop here if forwarding made the TTL 0 — the next hop
        // would just drop it on its own `is_expired()` check,
        // wasting bandwidth and a queue slot.
        if new_header.is_expired() {
            self.packets_dropped.fetch_add(1, Ordering::Relaxed);
            self.record_hop_drop(header.dest_id);
            return ForwardResult::Dropped(ProxyError::TtlExpired);
        }

        // Build forwarded packet with updated header
        let mut fwd_data = BytesMut::with_capacity(data.len());
        new_header.write_to(&mut fwd_data);
        fwd_data.extend_from_slice(&data[ROUTING_HEADER_SIZE..]);

        let latency_ns = start.elapsed().as_nanos() as u64;

        // Update stats
        self.packets_forwarded.fetch_add(1, Ordering::Relaxed);
        self.bytes_forwarded.fetch_add(len, Ordering::Relaxed);
        self.total_latency_ns
            .fetch_add(latency_ns, Ordering::Relaxed);
        self.latency_samples.fetch_add(1, Ordering::Relaxed);
        self.record_hop_forward(header.dest_id, len, latency_ns);

        ForwardResult::Forwarded {
            next_hop,
            packet: fwd_data.freeze(),
            latency_ns,
        }
    }

    /// Forward and send a packet in one operation
    pub async fn forward_and_send(&self, data: Bytes) -> Result<ForwardResult, ProxyError> {
        match self.forward(data) {
            ForwardResult::Forwarded {
                next_hop,
                ref packet,
                latency_ns,
            } => {
                let packet_len = packet.len() as u64;
                match self.socket.send_to(packet, next_hop).await {
                    Ok(_) => Ok(ForwardResult::Forwarded {
                        next_hop,
                        packet: packet.clone(),
                        latency_ns,
                    }),
                    Err(e) => {
                        // Roll back the telemetry counters
                        // `forward()` bumped speculatively. Pre-fix
                        // `packets_forwarded` / `bytes_forwarded` /
                        // `total_latency_ns` / `latency_samples` all
                        // counted the prepared packet as if it had
                        // shipped, even when the kernel rejected the
                        // send below — operators reading the
                        // forwarded-packet rate saw a count that
                        // included would-be sends that never crossed
                        // the wire. Saturating-sub guards against an
                        // arithmetic underflow if the counter was
                        // somehow reset between the bump and this
                        // rollback.
                        self.packets_forwarded
                            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
                                Some(v.saturating_sub(1))
                            })
                            .ok();
                        self.bytes_forwarded
                            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
                                Some(v.saturating_sub(packet_len))
                            })
                            .ok();
                        self.total_latency_ns
                            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
                                Some(v.saturating_sub(latency_ns))
                            })
                            .ok();
                        self.latency_samples
                            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
                                Some(v.saturating_sub(1))
                            })
                            .ok();
                        Err(ProxyError::SendFailed(e.to_string()))
                    }
                }
            }
            ForwardResult::Local(payload) => Ok(ForwardResult::Local(payload)),
            ForwardResult::Dropped(e) => Err(e),
        }
    }

    /// Send data to a destination
    pub async fn send_to(&self, data: &[u8], dest: SocketAddr) -> std::io::Result<usize> {
        self.socket.send_to(data, dest).await
    }

    /// Receive a packet
    pub async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
        self.socket.recv_from(buf).await
    }

    /// Get proxy statistics
    pub fn stats(&self) -> ProxyStats {
        let samples = self.latency_samples.load(Ordering::Relaxed);
        let avg_latency = self
            .total_latency_ns
            .load(Ordering::Relaxed)
            .checked_div(samples)
            .unwrap_or(0);

        ProxyStats {
            packets_received: self.packets_received.load(Ordering::Relaxed),
            packets_forwarded: self.packets_forwarded.load(Ordering::Relaxed),
            packets_dropped: self.packets_dropped.load(Ordering::Relaxed),
            bytes_forwarded: self.bytes_forwarded.load(Ordering::Relaxed),
            avg_latency_ns: avg_latency,
            routes: self.next_hop.len(),
        }
    }

    /// Reset statistics
    pub fn reset_stats(&self) {
        self.packets_received.store(0, Ordering::Relaxed);
        self.packets_forwarded.store(0, Ordering::Relaxed);
        self.packets_dropped.store(0, Ordering::Relaxed);
        self.bytes_forwarded.store(0, Ordering::Relaxed);
        self.total_latency_ns.store(0, Ordering::Relaxed);
        self.latency_samples.store(0, Ordering::Relaxed);
    }

    /// Get hop stats for a destination
    pub fn hop_stats(&self, dest_id: u64) -> Option<(u64, u64, u64)> {
        self.hop_stats
            .get(&dest_id)
            .map(|s| (s.forwarded(), s.dropped(), s.avg_latency_ns()))
    }

    fn record_hop_forward(&self, dest_id: u64, bytes: u64, latency_ns: u64) {
        self.hop_stats
            .entry(dest_id)
            .or_default()
            .record_forward(bytes, latency_ns);
    }

    fn record_hop_drop(&self, dest_id: u64) {
        self.hop_stats.entry(dest_id).or_default().record_drop();
    }

    /// Get route count
    pub fn route_count(&self) -> usize {
        self.next_hop.len()
    }
}

impl std::fmt::Debug for NetProxy {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("NetProxy")
            .field("local_id", &format!("{:016x}", self.local_id))
            .field("routes", &self.next_hop.len())
            .field(
                "packets_forwarded",
                &self.packets_forwarded.load(Ordering::Relaxed),
            )
            .finish()
    }
}

/// Multi-hop packet builder for creating routed packets
pub struct MultiHopPacketBuilder {
    /// Source node ID
    src_id: u32,
}

impl MultiHopPacketBuilder {
    /// Create a new multi-hop packet builder
    pub fn new(src_id: u32) -> Self {
        Self { src_id }
    }

    /// Build a packet with routing header
    pub fn build(&self, dest_id: u64, ttl: u8, payload: &[u8]) -> Bytes {
        let mut buf = BytesMut::with_capacity(ROUTING_HEADER_SIZE + payload.len());

        let header = RoutingHeader::new(dest_id, self.src_id, ttl);
        header.write_to(&mut buf);
        buf.extend_from_slice(payload);

        buf.freeze()
    }

    /// Build a priority packet
    pub fn build_priority(&self, dest_id: u64, ttl: u8, payload: &[u8]) -> Bytes {
        let mut buf = BytesMut::with_capacity(ROUTING_HEADER_SIZE + payload.len());

        let header = RoutingHeader::priority(dest_id, self.src_id, ttl);
        header.write_to(&mut buf);
        buf.extend_from_slice(payload);

        buf.freeze()
    }

    /// Build a control packet
    pub fn build_control(&self, dest_id: u64, ttl: u8, payload: &[u8]) -> Bytes {
        let mut buf = BytesMut::with_capacity(ROUTING_HEADER_SIZE + payload.len());

        let header = RoutingHeader::control(dest_id, self.src_id, ttl);
        header.write_to(&mut buf);
        buf.extend_from_slice(payload);

        buf.freeze()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_forward_result() {
        // Test packet building
        let builder = MultiHopPacketBuilder::new(0xABCD);
        let packet = builder.build(0x1234, 8, b"hello world");

        assert_eq!(packet.len(), ROUTING_HEADER_SIZE + 11);

        // Parse header back
        let header = RoutingHeader::from_bytes(&packet[..ROUTING_HEADER_SIZE]).unwrap();
        assert_eq!(header.dest_id, 0x1234);
        assert_eq!(header.src_id, 0xABCD);
        assert_eq!(header.ttl, 8);
        assert_eq!(header.hop_count, 0);
    }

    #[test]
    fn test_priority_packet() {
        let builder = MultiHopPacketBuilder::new(0x1111);
        let packet = builder.build_priority(0x2222, 4, b"urgent");

        let header = RoutingHeader::from_bytes(&packet[..ROUTING_HEADER_SIZE]).unwrap();
        assert!(header.flags.is_priority());
    }

    #[test]
    fn test_control_packet() {
        let builder = MultiHopPacketBuilder::new(0x1111);
        let packet = builder.build_control(0x2222, 2, b"ping");

        let header = RoutingHeader::from_bytes(&packet[..ROUTING_HEADER_SIZE]).unwrap();
        assert!(header.flags.is_control());
    }

    #[tokio::test]
    async fn test_proxy_creation() {
        let config = ProxyConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
        let proxy = NetProxy::new(config).await.unwrap();

        assert_eq!(proxy.route_count(), 0);
        assert_eq!(proxy.stats().packets_received, 0);
    }

    #[tokio::test]
    async fn test_proxy_routing() {
        let config = ProxyConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
        let proxy = NetProxy::new(config).await.unwrap();

        let dest: SocketAddr = "127.0.0.1:9001".parse().unwrap();
        proxy.add_route(0x5678, dest);

        assert_eq!(proxy.lookup(0x5678), Some(dest));
        assert_eq!(proxy.lookup(0x9999), None);
    }

    #[tokio::test]
    async fn test_proxy_forward() {
        let config = ProxyConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
        let proxy = NetProxy::new(config).await.unwrap();

        let next_hop: SocketAddr = "127.0.0.1:9001".parse().unwrap();
        proxy.add_route(0x5678, next_hop);

        // Build a packet
        let builder = MultiHopPacketBuilder::new(0xABCD);
        let packet = builder.build(0x5678, 8, b"test payload");

        // Forward it
        match proxy.forward(packet) {
            ForwardResult::Forwarded { next_hop: addr, .. } => {
                assert_eq!(addr, next_hop);
            }
            _ => panic!("expected forwarded"),
        }

        let stats = proxy.stats();
        assert_eq!(stats.packets_received, 1);
        assert_eq!(stats.packets_forwarded, 1);
        assert_eq!(stats.packets_dropped, 0);
    }

    #[tokio::test]
    async fn test_proxy_local_delivery() {
        let config = ProxyConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
        let proxy = NetProxy::new(config).await.unwrap();

        // Build a packet destined for local node
        let builder = MultiHopPacketBuilder::new(0xABCD);
        let packet = builder.build(0x1234, 8, b"local payload");

        match proxy.forward(packet) {
            ForwardResult::Local(payload) => {
                assert_eq!(&payload[..], b"local payload");
            }
            _ => panic!("expected local delivery"),
        }
    }

    #[tokio::test]
    async fn test_proxy_ttl_expired() {
        let config = ProxyConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
        let proxy = NetProxy::new(config).await.unwrap();

        proxy.add_route(0x5678, "127.0.0.1:9001".parse().unwrap());

        // Build a packet with TTL=0
        let builder = MultiHopPacketBuilder::new(0xABCD);
        let packet = builder.build(0x5678, 0, b"expired");

        match proxy.forward(packet) {
            ForwardResult::Dropped(ProxyError::TtlExpired) => {}
            _ => panic!("expected TTL expired"),
        }

        assert_eq!(proxy.stats().packets_dropped, 1);
    }

    #[tokio::test]
    async fn test_proxy_no_route() {
        let config = ProxyConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
        let proxy = NetProxy::new(config).await.unwrap();

        // Build a packet with no route
        let builder = MultiHopPacketBuilder::new(0xABCD);
        let packet = builder.build(0x9999, 8, b"no route");

        match proxy.forward(packet) {
            ForwardResult::Dropped(ProxyError::NoRoute) => {}
            _ => panic!("expected no route"),
        }
    }

    /// Regression for BUG_AUDIT_2026_04_30_CORE.md #116: pre-fix
    /// `remove_route(dest_id)` only deleted the next_hop entry
    /// and left `hop_stats[dest_id]` in place. A peer churning
    /// through many destinations grew `hop_stats` linearly with
    /// total-distinct-dest-ids-ever-seen, not active dest count
    /// — unbounded memory growth. Post-fix `remove_route` also
    /// drops the matching `hop_stats` entry.
    #[tokio::test]
    async fn remove_route_also_drops_hop_stats() {
        let config = ProxyConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
        let proxy = NetProxy::new(config).await.unwrap();

        let next_hop: SocketAddr = "127.0.0.1:9001".parse().unwrap();
        proxy.add_route(0x5678, next_hop);

        // Force hop_stats to populate by recording activity.
        proxy.record_hop_forward(0x5678, 100, 1000);
        proxy.record_hop_drop(0x5678);
        assert!(
            proxy.hop_stats(0x5678).is_some(),
            "hop_stats must be present after recording activity"
        );

        // Remove the route — pre-fix would leave the hop_stats
        // entry behind. Post-fix it's dropped.
        proxy.remove_route(0x5678);

        assert!(
            proxy.hop_stats(0x5678).is_none(),
            "hop_stats entry must be dropped along with the route — \
             pre-fix this leaked memory linearly with churned destinations"
        );
    }

    #[test]
    fn test_hop_stats() {
        let stats = HopStats::new();

        stats.record_forward(100, 1000);
        stats.record_forward(200, 2000);
        stats.record_drop();

        assert_eq!(stats.forwarded(), 2);
        assert_eq!(stats.dropped(), 1);
        assert_eq!(stats.avg_latency_ns(), 1500);
    }

    #[tokio::test]
    async fn test_forward_returns_packet_data() {
        // Regression: forward() built fwd_data with the updated routing header
        // but discarded it, returning only next_hop and latency. Callers
        // (including forward_and_send) had no packet to actually transmit.
        let config = ProxyConfig::new(0x1234, "127.0.0.1:0".parse().unwrap());
        let proxy = NetProxy::new(config).await.unwrap();

        let next_hop: SocketAddr = "127.0.0.1:9001".parse().unwrap();
        proxy.add_route(0x5678, next_hop);

        let builder = MultiHopPacketBuilder::new(0xABCD);
        let packet = builder.build(0x5678, 4, b"payload");

        match proxy.forward(packet) {
            ForwardResult::Forwarded {
                next_hop: addr,
                packet: fwd_packet,
                ..
            } => {
                assert_eq!(addr, next_hop);

                // The forwarded packet must contain the updated routing header
                let header = RoutingHeader::from_bytes(&fwd_packet[..ROUTING_HEADER_SIZE]).unwrap();
                assert_eq!(header.ttl, 3, "TTL should be decremented");
                assert_eq!(header.hop_count, 1, "hop_count should be incremented");

                // Payload must be preserved after the routing header
                assert_eq!(&fwd_packet[ROUTING_HEADER_SIZE..], b"payload");
            }
            _ => panic!("expected forwarded"),
        }
    }
}