use std::net::SocketAddr;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use super::state::{FoldEntry, FoldState, MergeAction, NoIndex, NodeId};
use super::{FoldKind, SignedAnnouncement};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct RouteAnnouncement {
pub destination: NodeId,
pub next_hop: SocketAddr,
pub metric: u32,
pub via: NodeId,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RoutingQuery {
Lookup(NodeId),
AllDestinations,
ViaNextHop(SocketAddr),
}
pub type RouteRow = (NodeId, RouteAnnouncement);
#[derive(Debug)]
pub struct RoutingFold;
impl FoldKind for RoutingFold {
const KIND_ID: u16 = 2;
const CHANNEL_PREFIX: &'static str = "fold:route:";
const DEFAULT_TTL: Duration = Duration::from_secs(300);
type Key = NodeId;
type Payload = RouteAnnouncement;
type Query = RoutingQuery;
type Result = Vec<RouteRow>;
type Index = NoIndex;
fn key_for(_publisher: NodeId, payload: &Self::Payload) -> Self::Key {
payload.destination
}
fn build_index() -> NoIndex {
NoIndex
}
fn merge(
existing: Option<&FoldEntry<Self>>,
incoming: &SignedAnnouncement<Self::Payload>,
) -> MergeAction {
let Some(entry) = existing else {
return MergeAction::Insert;
};
if entry.node_id == incoming.node_id && incoming.generation <= entry.generation {
return MergeAction::Reject;
}
if incoming.payload.metric <= entry.payload.metric {
MergeAction::Replace
} else {
MergeAction::Reject
}
}
fn query(state: &FoldState<Self>, _index: &NoIndex, query: RoutingQuery) -> Vec<RouteRow> {
match query {
RoutingQuery::Lookup(dest) => state
.entries
.get(&dest)
.map(|e| vec![(dest, e.payload.clone())])
.unwrap_or_default(),
RoutingQuery::AllDestinations => state
.entries
.iter()
.map(|(k, e)| (*k, e.payload.clone()))
.collect(),
RoutingQuery::ViaNextHop(addr) => state
.entries
.iter()
.filter(|(_, e)| e.payload.next_hop == addr)
.map(|(k, e)| (*k, e.payload.clone()))
.collect(),
}
}
}
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use super::*;
use crate::adapter::net::behavior::fold::{
ApplyOutcome, EnvelopeMeta, Fold, FoldRegistry, SignedAnnouncement,
};
use crate::adapter::net::identity::EntityKeypair;
fn addr(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
}
fn sign_route(
keypair: &EntityKeypair,
publisher_node: NodeId,
generation: u64,
dest: NodeId,
next_hop: SocketAddr,
metric: u32,
via: NodeId,
) -> SignedAnnouncement<RouteAnnouncement> {
SignedAnnouncement::sign(
keypair,
RoutingFold::KIND_ID,
0,
publisher_node,
generation,
EnvelopeMeta::default(),
RouteAnnouncement {
destination: dest,
next_hop,
metric,
via,
},
)
.expect("sign succeeds")
}
fn new_fold() -> Fold<RoutingFold> {
Fold::with_sweep_interval(Duration::ZERO)
}
#[test]
fn first_announcement_installs_the_route() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let outcome = fold
.apply(sign_route(&kp, 0xAA, 1, 0x42, addr(7000), 1, 0xAA))
.expect("apply");
assert_eq!(outcome, ApplyOutcome::Inserted);
let q = fold.query(RoutingQuery::Lookup(0x42));
assert_eq!(q.len(), 1);
assert_eq!(q[0].1.next_hop, addr(7000));
assert_eq!(q[0].1.metric, 1);
}
#[test]
fn lower_metric_replaces_existing_route_regardless_of_publisher() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
fold.apply(sign_route(&kp_a, 0xAA, 1, 0x42, addr(7000), 5, 0xAA))
.expect("first");
let outcome = fold
.apply(sign_route(&kp_b, 0xBB, 1, 0x42, addr(8000), 2, 0xBB))
.expect("better");
assert_eq!(outcome, ApplyOutcome::Replaced);
let q = fold.query(RoutingQuery::Lookup(0x42));
assert_eq!(q[0].1.metric, 2);
assert_eq!(q[0].1.next_hop, addr(8000));
}
#[test]
fn equal_metric_replaces_to_refresh_expires_at() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
fold.apply(sign_route(&kp_a, 0xAA, 1, 0x42, addr(7000), 3, 0xAA))
.expect("first");
let outcome = fold
.apply(sign_route(&kp_b, 0xBB, 1, 0x42, addr(8000), 3, 0xBB))
.expect("equal");
assert_eq!(outcome, ApplyOutcome::Replaced);
let q = fold.query(RoutingQuery::Lookup(0x42));
assert_eq!(q[0].1.next_hop, addr(8000));
}
#[test]
fn higher_metric_does_not_overwrite_existing_route() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
fold.apply(sign_route(&kp_a, 0xAA, 1, 0x42, addr(7000), 1, 0xAA))
.expect("direct");
let outcome = fold
.apply(sign_route(&kp_b, 0xBB, 1, 0x42, addr(8000), 5, 0xBB))
.expect("indirect");
assert_eq!(outcome, ApplyOutcome::Rejected);
let q = fold.query(RoutingQuery::Lookup(0x42));
assert_eq!(q[0].1.metric, 1);
assert_eq!(q[0].1.next_hop, addr(7000));
}
#[test]
fn same_publisher_stale_generation_is_rejected_even_with_lower_metric() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_route(&kp, 0xAA, 5, 0x42, addr(7000), 3, 0xAA))
.expect("gen=5");
let outcome = fold
.apply(sign_route(&kp, 0xAA, 3, 0x42, addr(7000), 1, 0xAA))
.expect("stale gen=3");
assert_eq!(outcome, ApplyOutcome::Rejected);
let q = fold.query(RoutingQuery::Lookup(0x42));
assert_eq!(q[0].1.metric, 3);
}
#[test]
fn same_publisher_higher_generation_lower_metric_replaces() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_route(&kp, 0xAA, 1, 0x42, addr(7000), 5, 0xAA))
.expect("first");
let outcome = fold
.apply(sign_route(&kp, 0xAA, 2, 0x42, addr(7000), 1, 0xAA))
.expect("better");
assert_eq!(outcome, ApplyOutcome::Replaced);
assert_eq!(fold.query(RoutingQuery::Lookup(0x42))[0].1.metric, 1);
}
#[test]
fn query_all_destinations_returns_every_installed_route() {
let fold = new_fold();
let kp = EntityKeypair::generate();
for (dest, port) in [(0x10, 7001), (0x11, 7002), (0x12, 7003)] {
fold.apply(sign_route(&kp, 0xAA, 1, dest, addr(port), 1, 0xAA))
.unwrap();
}
let mut dests: Vec<_> = fold
.query(RoutingQuery::AllDestinations)
.into_iter()
.map(|(d, _)| d)
.collect();
dests.sort();
assert_eq!(dests, vec![0x10, 0x11, 0x12]);
}
#[test]
fn query_via_next_hop_finds_routes_through_a_specific_address() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_route(&kp, 0xAA, 1, 0x10, addr(7000), 1, 0xAA))
.unwrap();
fold.apply(sign_route(&kp, 0xAA, 1, 0x11, addr(7000), 1, 0xAA))
.unwrap();
fold.apply(sign_route(&kp, 0xAA, 1, 0x12, addr(7000), 1, 0xAA))
.unwrap();
fold.apply(sign_route(&kp, 0xAA, 1, 0x13, addr(8000), 1, 0xAA))
.unwrap();
let via_7000: std::collections::HashSet<_> = fold
.query(RoutingQuery::ViaNextHop(addr(7000)))
.into_iter()
.map(|(d, _)| d)
.collect();
assert_eq!(via_7000, [0x10, 0x11, 0x12].into_iter().collect());
let via_8000: Vec<_> = fold
.query(RoutingQuery::ViaNextHop(addr(8000)))
.into_iter()
.map(|(d, _)| d)
.collect();
assert_eq!(via_8000, vec![0x13]);
let via_unknown = fold.query(RoutingQuery::ViaNextHop(addr(9999)));
assert!(via_unknown.is_empty());
}
#[test]
fn runtime_ttl_sweeps_stale_routes() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let ann = SignedAnnouncement::sign(
&kp,
RoutingFold::KIND_ID,
0,
0xAA,
1,
EnvelopeMeta {
ttl_secs: Some(0),
..Default::default()
},
RouteAnnouncement {
destination: 0x42,
next_hop: addr(7000),
metric: 1,
via: 0xAA,
},
)
.unwrap();
fold.apply(ann).unwrap();
assert_eq!(fold.metrics().entries(), 1);
std::thread::sleep(Duration::from_millis(10));
let n = fold.sweep_expired_now();
assert_eq!(n, 1);
assert_eq!(fold.metrics().expiries(), 1);
assert!(fold.query(RoutingQuery::Lookup(0x42)).is_empty());
}
#[test]
fn routing_fold_plugs_into_registry_and_dispatches_signed_envelopes() {
let registry = FoldRegistry::new();
let fold: Arc<Fold<RoutingFold>> = Arc::new(new_fold());
registry.register(fold.clone());
let kp = EntityKeypair::generate();
let ann = sign_route(&kp, 0xAA, 1, 0x42, addr(7000), 1, 0xAA);
let bytes = ann.encode().expect("encode");
let outcome = registry.dispatch(&bytes, kp.entity_id()).expect("dispatch");
assert_eq!(outcome, ApplyOutcome::Inserted);
}
}