net-mesh 0.21.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
//! Automatic rerouting policy.
//!
//! Watches the failure detector and updates the routing table when a peer
//! dies. When a peer recovers, restores the original route if it's better
//! than the current alternate.
//!
//! This module is wired into `MeshNode` via the `FailureDetector`'s
//! `on_failure` and `on_recovery` callbacks.

use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use dashmap::DashMap;

use super::behavior::proximity::ProximityGraph;
use super::route::RoutingTable;

/// Saved original route before reroute, for recovery.
///
/// We key on the peer's stable `node_id` rather than the original
/// `next_hop: SocketAddr`. After a NAT rebind / peer reconnect on
/// a different port / mobile-network change, `peer_addrs` reflects
/// the NEW address; an addr-keyed filter would return empty,
/// nothing would be restored, and the saved entry would persist
/// indefinitely. `on_recovery` re-resolves the current addr from
/// `peer_addrs[failed_node_id]` at recovery time, surviving NAT
/// rebinds.
struct SavedRoute {
    /// Node ID of the peer whose failure caused this reroute. The
    /// concrete `next_hop` SocketAddr at the time of saving may
    /// have changed (NAT rebind), so we re-resolve from
    /// `peer_addrs[failed_node_id]` at recovery time.
    failed_node_id: u64,
    /// The alternate we rerouted to
    #[allow(dead_code)]
    alternate: SocketAddr,
}

/// Policy that automatically reroutes traffic when peers fail.
///
/// When a peer is marked as failed by the `FailureDetector`:
/// 1. Find all routes whose next-hop is the failed peer's address
/// 2. For each, find an alternate peer (any other connected peer)
/// 3. Update the routing table to use the alternate
///
/// When the peer recovers:
/// 1. Restore the original routes (direct path is typically better)
pub struct ReroutePolicy {
    /// Routing table to update
    routing_table: Arc<RoutingTable>,
    /// Connected peers (node_id → addr mapping)
    peer_addrs: Arc<DashMap<u64, SocketAddr>>,
    /// Proximity graph for multi-hop alternate selection
    proximity_graph: Option<Arc<ProximityGraph>>,
    /// Saved original routes for recovery (dest_node_id → saved route)
    saved_routes: DashMap<u64, SavedRoute>,
    /// Total reroutes performed
    pub reroute_count: AtomicU64,
    /// Total recoveries performed
    pub recovery_count: AtomicU64,
}

/// Convert a u64 node_id to a 32-byte graph NodeId.
fn node_id_to_graph_id(node_id: u64) -> [u8; 32] {
    let mut id = [0u8; 32];
    id[0..8].copy_from_slice(&node_id.to_le_bytes());
    id
}

/// Extract u64 node_id from a 32-byte graph NodeId.
#[expect(
    clippy::unwrap_used,
    reason = "input is &[u8; 32]; slicing [0..8] then .try_into::<[u8; 8]>() is statically infallible"
)]
fn graph_id_to_node_id(id: &[u8; 32]) -> u64 {
    u64::from_le_bytes(id[0..8].try_into().unwrap())
}

impl ReroutePolicy {
    /// Create a new reroute policy.
    pub fn new(
        routing_table: Arc<RoutingTable>,
        peer_addrs: Arc<DashMap<u64, SocketAddr>>,
    ) -> Self {
        Self {
            routing_table,
            peer_addrs,
            proximity_graph: None,
            saved_routes: DashMap::new(),
            reroute_count: AtomicU64::new(0),
            recovery_count: AtomicU64::new(0),
        }
    }

    /// Set the proximity graph for multi-hop alternate selection.
    pub fn with_proximity_graph(mut self, graph: Arc<ProximityGraph>) -> Self {
        self.proximity_graph = Some(graph);
        self
    }

    /// Called when the failure detector marks a peer as failed.
    ///
    /// Finds all routes through the failed peer and reroutes them
    /// through an alternate peer. The original routes are saved
    /// for restoration on recovery.
    pub fn on_failure(&self, failed_node_id: u64) {
        // Resolve failed node's address
        let failed_addr = match self.peer_addrs.get(&failed_node_id) {
            Some(addr) => *addr,
            None => return, // unknown node, nothing to reroute
        };

        // Find all routes whose next-hop is the failed peer
        let affected: Vec<u64> = self
            .routing_table
            .all_routes()
            .into_iter()
            .filter(|(_, entry)| entry.next_hop == failed_addr)
            .map(|(dest_id, _)| dest_id)
            .collect();

        if affected.is_empty() {
            return;
        }

        // Pick an alternate per destination so that a heterogeneous
        // topology doesn't blackhole traffic through a peer that happens
        // to reach some but not all affected destinations.
        //
        // Resolution order, per destination:
        //   1. Routing table: `lookup_alternate(dest, failed_addr)`.
        //      Today's table stores one entry per destination, so this
        //      returns `None` when the affected entry *is* the
        //      failed-peer entry — forward-compat scaffolding for a
        //      future multi-route table.
        //   2. Proximity graph: `find_graph_alternate_for(...)` BFS.
        //   3. Last-resort fallback: any direct peer that isn't the
        //      failed one. Best-effort — if the fallback peer can't
        //      actually reach `dest_id`, the packet is dropped rather
        //      than blackholed; the failure detector will mark that
        //      peer dead next cycle if it's unreachable.
        //
        // `saved_routes` preserves the *original* next_hop so that
        // recovery can restore the pre-failure route. Use
        // `entry().or_insert(...)`: if the same destination already has
        // a saved route from a prior failure, keep that original —
        // overwriting would substitute a relay's addr for the true
        // next_hop and corrupt recovery.
        let mut rerouted = 0usize;
        for dest_id in &affected {
            let alt_addr = self
                .routing_table
                .lookup_alternate(*dest_id, failed_addr)
                .or_else(|| self.find_graph_alternate_for(failed_node_id, *dest_id))
                .or_else(|| {
                    self.peer_addrs
                        .iter()
                        .find(|e| *e.key() != failed_node_id)
                        .map(|e| *e.value())
                });
            let alt_addr = match alt_addr {
                Some(a) => a,
                None => continue, // truly nothing to try
            };

            self.saved_routes
                .entry(*dest_id)
                .and_modify(|existing| existing.alternate = alt_addr)
                .or_insert(SavedRoute {
                    failed_node_id,
                    alternate: alt_addr,
                });
            self.routing_table.add_route(*dest_id, alt_addr);
            rerouted += 1;
        }

        if rerouted > 0 {
            self.reroute_count.fetch_add(1, Ordering::Relaxed);
            tracing::info!(
                failed_node = format!("{:#x}", failed_node_id),
                affected_routes = affected.len(),
                rerouted,
                "auto-rerouted routes away from failed peer"
            );
        }
    }

    /// Pick an alternate for a single destination via the proximity graph.
    ///
    /// Queries `path_to(dest)`. If a path exists whose first hop is both
    /// not the failed node AND is a directly-connected peer of ours,
    /// that hop is the alternate. If the first hop IS the failed node,
    /// tries the next hop. Falls back to any direct peer reachable from
    /// the graph's snapshot if no path works.
    ///
    /// Returns the address of the best alternate, or None if the graph
    /// has no suggestions.
    fn find_graph_alternate_for(&self, failed_node_id: u64, dest_id: u64) -> Option<SocketAddr> {
        let graph = self.proximity_graph.as_ref()?;
        let dest_graph_id = node_id_to_graph_id(dest_id);

        if let Some(path) = graph.path_to(&dest_graph_id) {
            // path[0] is self; scan forward for the first hop that
            // (a) isn't the failed node, and (b) is a directly-connected
            // peer we can send UDP to.
            for hop in path.iter().skip(1) {
                let nid = graph_id_to_node_id(hop);
                if nid == failed_node_id {
                    continue;
                }
                if let Some(addr) = self.peer_addrs.get(&nid) {
                    return Some(*addr);
                }
            }
        }

        // Fallback: any direct peer from the graph that isn't the failed
        // node. Not topology-aware for this specific destination, but
        // better than nothing.
        for node in graph.all_nodes() {
            let nid = graph_id_to_node_id(&node.node_id);
            if nid != failed_node_id && nid != 0 {
                if let Some(addr) = self.peer_addrs.get(&nid) {
                    return Some(*addr);
                }
            }
        }
        None
    }

    /// Called when the failure detector marks a peer as recovered.
    ///
    /// Restores original routes that were rerouted when this peer failed.
    /// The direct path is typically better (fewer hops, lower latency)
    /// than the alternate.
    ///
    /// The filter is `entry.failed_node_id == recovered_node_id`
    /// rather than `entry.next_hop == recovered_addr`. An addr-based
    /// filter would miss every reroute when the peer reconnected
    /// from a different SocketAddr (NAT rebind, mobile network
    /// change, reconnect on different port) — `saved_routes` would
    /// accumulate indefinitely across mobile / NAT-changing peers,
    /// and routes would stay pinned to alternates after the peer
    /// had actually recovered. The identity-based filter survives
    /// addr changes; `recovered_addr` is re-resolved from
    /// `peer_addrs` at recovery time so the restored route uses the
    /// current addr.
    pub fn on_recovery(&self, recovered_node_id: u64) {
        let recovered_addr = match self.peer_addrs.get(&recovered_node_id) {
            Some(addr) => *addr,
            None => return,
        };

        // Find routes that were rerouted away from this peer.
        let to_restore: Vec<u64> = self
            .saved_routes
            .iter()
            .filter(|e| e.value().failed_node_id == recovered_node_id)
            .map(|e| *e.key())
            .collect();

        if to_restore.is_empty() {
            return;
        }

        // Restore original routes — using the CURRENT addr, which
        // may differ from the addr at on_failure time if the peer
        // rebinds.
        for dest_id in &to_restore {
            self.routing_table.add_route(*dest_id, recovered_addr);
            self.saved_routes.remove(dest_id);
        }

        self.recovery_count.fetch_add(1, Ordering::Relaxed);

        tracing::info!(
            recovered_node = format!("{:#x}", recovered_node_id),
            restored_routes = to_restore.len(),
            "restored {} routes to recovered peer",
            to_restore.len()
        );
    }

    /// Number of active reroutes (routes currently using alternates).
    pub fn active_reroutes(&self) -> usize {
        self.saved_routes.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;

    fn make_routing_table() -> Arc<RoutingTable> {
        Arc::new(RoutingTable::new(0x1111))
    }

    #[test]
    fn test_reroute_on_failure() {
        let rt = make_routing_table();
        let peers = Arc::new(DashMap::new());

        let addr_b: SocketAddr = "127.0.0.1:2000".parse().unwrap();
        let addr_c: SocketAddr = "127.0.0.1:3000".parse().unwrap();

        peers.insert(0x2222u64, addr_b);
        peers.insert(0x3333u64, addr_c);

        // Route to 0x4444 goes through B
        rt.add_route(0x4444, addr_b);

        let policy = ReroutePolicy::new(rt.clone(), peers);

        // B fails
        policy.on_failure(0x2222);

        // Route should now go through C
        let next_hop = rt.lookup(0x4444).unwrap();
        assert_eq!(next_hop, addr_c, "should reroute to C");
        assert_eq!(policy.active_reroutes(), 1);
        assert_eq!(policy.reroute_count.load(Ordering::Relaxed), 1);
    }

    #[test]
    fn test_recovery_restores_original() {
        let rt = make_routing_table();
        let peers = Arc::new(DashMap::new());

        let addr_b: SocketAddr = "127.0.0.1:2000".parse().unwrap();
        let addr_c: SocketAddr = "127.0.0.1:3000".parse().unwrap();

        peers.insert(0x2222u64, addr_b);
        peers.insert(0x3333u64, addr_c);

        rt.add_route(0x4444, addr_b);

        let policy = ReroutePolicy::new(rt.clone(), peers);

        // B fails → reroute to C
        policy.on_failure(0x2222);
        assert_eq!(rt.lookup(0x4444).unwrap(), addr_c);

        // B recovers → restore to B
        policy.on_recovery(0x2222);
        assert_eq!(rt.lookup(0x4444).unwrap(), addr_b);
        assert_eq!(policy.active_reroutes(), 0);
        assert_eq!(policy.recovery_count.load(Ordering::Relaxed), 1);
    }

    #[test]
    fn test_no_alternate_does_nothing() {
        let rt = make_routing_table();
        let peers = Arc::new(DashMap::new());

        let addr_b: SocketAddr = "127.0.0.1:2000".parse().unwrap();
        peers.insert(0x2222u64, addr_b);

        rt.add_route(0x4444, addr_b);

        let policy = ReroutePolicy::new(rt.clone(), peers);

        // B fails but there's no alternate
        policy.on_failure(0x2222);

        // Route unchanged (still points to B — no better option)
        assert_eq!(rt.lookup(0x4444).unwrap(), addr_b);
        assert_eq!(policy.active_reroutes(), 0);
    }

    /// Regression: `on_failure` used to `insert` into `saved_routes`,
    /// which meant a second failure (e.g., the alternate itself going
    /// down) would overwrite the original `next_hop` with the alternate's
    /// address. On recovery, the route would then be "restored" to the
    /// wrong peer — the alternate's old address, not the true original.
    ///
    /// Fix: use `entry().or_insert(...)` so the original next_hop is
    /// preserved across repeated failures. Only `alternate` is updated.
    #[test]
    fn test_regression_repeated_failures_preserve_original_next_hop() {
        let rt = make_routing_table();
        let peers = Arc::new(DashMap::new());

        let addr_b: SocketAddr = "127.0.0.1:2000".parse().unwrap();
        let addr_c: SocketAddr = "127.0.0.1:3000".parse().unwrap();
        let addr_d: SocketAddr = "127.0.0.1:4000".parse().unwrap();

        peers.insert(0x2222u64, addr_b);
        peers.insert(0x3333u64, addr_c);
        peers.insert(0x4444u64, addr_d);

        // Route to 0x5555 originally goes through B.
        rt.add_route(0x5555, addr_b);

        let policy = ReroutePolicy::new(rt.clone(), peers.clone());

        // B fails — route is rerouted to an alternate (C or D).
        policy.on_failure(0x2222);
        let first_alt = rt.lookup(0x5555).unwrap();
        assert_ne!(first_alt, addr_b);

        // The alternate also fails. Temporarily remove B from the
        // peer_addrs map before triggering its reroute, so the second
        // on_failure is *forced* to pick something other than B. Without
        // this, the reroute logic is free to land back on B — making a
        // passing test meaningless because the routing table would end
        // up pointing at B even with the buggy code.
        let first_alt_node_id = *peers
            .iter()
            .find(|e| *e.value() == first_alt)
            .unwrap()
            .key();
        peers.remove(&0x2222u64);
        policy.on_failure(first_alt_node_id);
        let second_alt = rt.lookup(0x5555).unwrap();
        assert_ne!(
            second_alt, addr_b,
            "second reroute must pick a non-B alternate"
        );
        peers.insert(0x2222u64, addr_b);

        // B recovers. The original route must be restored to B, not to
        // the alternate that transiently held `next_hop` before the fix.
        policy.on_recovery(0x2222);
        let restored = rt.lookup(0x5555).unwrap();
        assert_eq!(
            restored, addr_b,
            "recovery must restore the true original next_hop (B), not a \
             previously chosen alternate"
        );
    }

    // ========================================================================
    // on_recovery must match by node_id, not next_hop addr,
    // so NAT rebinds / reconnects on different ports still restore.
    // ========================================================================

    /// A peer fails, then recovers from a DIFFERENT SocketAddr (NAT
    /// rebind, reconnect on different port, mobile network change).
    /// `on_recovery` must restore the saved routes to the new addr.
    /// Pre-fix the filter `entry.next_hop == recovered_addr` missed
    /// because the saved `next_hop` held the OLD addr while
    /// `recovered_addr` was the NEW one — no routes restored, and
    /// the saved entry leaked.
    #[test]
    fn on_recovery_restores_routes_after_nat_rebind() {
        let rt = make_routing_table();
        let peers = Arc::new(DashMap::new());

        let addr_b_old: SocketAddr = "127.0.0.1:2000".parse().unwrap();
        let addr_b_new: SocketAddr = "127.0.0.1:2999".parse().unwrap(); // post-rebind
        let addr_c: SocketAddr = "127.0.0.1:3000".parse().unwrap();

        peers.insert(0x2222u64, addr_b_old);
        peers.insert(0x3333u64, addr_c);

        // Route to 0x5555 originally goes through B.
        rt.add_route(0x5555, addr_b_old);

        let policy = ReroutePolicy::new(rt.clone(), peers.clone());

        // B fails — route is rerouted to C.
        policy.on_failure(0x2222);
        assert_eq!(
            rt.lookup(0x5555).unwrap(),
            addr_c,
            "reroute must pick the alternate after failure"
        );
        assert_eq!(policy.active_reroutes(), 1);

        // B comes back from a different SocketAddr (NAT rebind):
        // peer_addrs now reflects the new addr.
        peers.insert(0x2222u64, addr_b_new);
        policy.on_recovery(0x2222);

        // Pre-fix: nothing happens (filter on next_hop addr fails).
        // Post-fix: the route is restored to the NEW addr because
        //  the filter is on node_id, and the addr is re-resolved
        //  from peer_addrs.
        assert_eq!(
            rt.lookup(0x5555).unwrap(),
            addr_b_new,
            "recovery after NAT rebind must restore the route to the NEW addr"
        );
        assert_eq!(
            policy.active_reroutes(),
            0,
            "saved_routes entry must be dropped after recovery (no leak)"
        );
    }

    /// Variant: peer fails, several saved routes through it, then
    /// reconnects from a different addr — ALL saved routes must
    /// restore. Pre-fix the addr-based filter missed all of them
    /// and `saved_routes` leaked entries linearly with the number
    /// of dest_ids that were ever rerouted through a NAT-changing
    /// peer.
    #[test]
    fn on_recovery_restores_multiple_routes_after_nat_rebind() {
        let rt = make_routing_table();
        let peers = Arc::new(DashMap::new());

        let addr_b_old: SocketAddr = "127.0.0.1:2000".parse().unwrap();
        let addr_b_new: SocketAddr = "127.0.0.1:9999".parse().unwrap();
        let addr_c: SocketAddr = "127.0.0.1:3000".parse().unwrap();

        peers.insert(0x2222u64, addr_b_old);
        peers.insert(0x3333u64, addr_c);

        rt.add_route(0x4444, addr_b_old);
        rt.add_route(0x5555, addr_b_old);

        let policy = ReroutePolicy::new(rt.clone(), peers.clone());
        policy.on_failure(0x2222);
        assert_eq!(policy.active_reroutes(), 2);

        peers.insert(0x2222u64, addr_b_new);
        policy.on_recovery(0x2222);

        assert_eq!(rt.lookup(0x4444).unwrap(), addr_b_new);
        assert_eq!(rt.lookup(0x5555).unwrap(), addr_b_new);
        assert_eq!(
            policy.active_reroutes(),
            0,
            "all saved_routes entries must be dropped after recovery",
        );
    }

    #[test]
    fn test_multiple_routes_through_failed_peer() {
        let rt = make_routing_table();
        let peers = Arc::new(DashMap::new());

        let addr_b: SocketAddr = "127.0.0.1:2000".parse().unwrap();
        let addr_c: SocketAddr = "127.0.0.1:3000".parse().unwrap();

        peers.insert(0x2222u64, addr_b);
        peers.insert(0x3333u64, addr_c);

        // Two routes through B
        rt.add_route(0x4444, addr_b);
        rt.add_route(0x5555, addr_b);
        // One route through C (unaffected)
        rt.add_route(0x6666, addr_c);

        let policy = ReroutePolicy::new(rt.clone(), peers);

        policy.on_failure(0x2222);

        // Both B routes should be rerouted to C
        assert_eq!(rt.lookup(0x4444).unwrap(), addr_c);
        assert_eq!(rt.lookup(0x5555).unwrap(), addr_c);
        // C route unchanged
        assert_eq!(rt.lookup(0x6666).unwrap(), addr_c);
        assert_eq!(policy.active_reroutes(), 2);
    }
}