use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::time::{Duration, Instant};
use epics_base_rs::net::AsyncUdpV4;
use epics_base_rs::runtime::sync::mpsc;
use crate::protocol::*;
use super::CoordRequest;
pub(crate) enum BeaconControl {
ResetServer { server_addr: SocketAddr },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BeaconAnomalyKind {
FirstSighting,
IdMismatch,
#[allow(dead_code)]
PeriodCollapse,
}
struct BeaconState {
last_id: u32,
last_seen: Instant,
period_estimate: Option<Duration>,
count: u64,
}
const BEACON_STALE_THRESHOLD: Duration = Duration::from_secs(180);
const REREGISTER_INTERVAL: Duration = Duration::from_secs(300);
pub(crate) async fn run_beacon_monitor(
coord_tx: mpsc::UnboundedSender<CoordRequest>,
control_rx: mpsc::UnboundedReceiver<BeaconControl>,
) {
run_beacon_monitor_inner(
coord_tx,
control_rx,
#[cfg(feature = "cap-tokens")]
None,
)
.await;
}
#[cfg(feature = "cap-tokens")]
#[allow(dead_code)]
pub(crate) async fn run_beacon_monitor_with_verifier(
coord_tx: mpsc::UnboundedSender<CoordRequest>,
control_rx: mpsc::UnboundedReceiver<BeaconControl>,
verifier: std::sync::Arc<crate::server::signed_beacon::SignedBeaconVerifier>,
) {
run_beacon_monitor_inner(coord_tx, control_rx, Some(verifier)).await;
}
async fn run_beacon_monitor_inner(
coord_tx: mpsc::UnboundedSender<CoordRequest>,
mut control_rx: mpsc::UnboundedReceiver<BeaconControl>,
#[cfg(feature = "cap-tokens")] verifier: Option<
std::sync::Arc<crate::server::signed_beacon::SignedBeaconVerifier>,
>,
) {
let socket = match AsyncUdpV4::bind_single(Ipv4Addr::LOCALHOST, 0, false) {
Ok(s) => s,
Err(_) => return,
};
if let Err(e) = socket.enable_so_rxq_ovfl() {
tracing::trace!(
target: "epics_ca_rs::client::beacon_monitor",
error = %e,
"SO_RXQ_OVFL enable failed (non-fatal)"
);
}
let mut prev_drops_beacon: u32 = 0;
for attempt in 0..3u32 {
if register_with_repeater(&socket).await.is_ok() {
break;
}
if attempt < 2 {
tokio::time::sleep(Duration::from_millis(200 * (1 << attempt))).await;
}
}
#[cfg(feature = "cap-tokens")]
let mut verified_tuples: HashMap<(u32, u16, u32), std::time::Instant> = HashMap::new();
#[cfg(feature = "cap-tokens")]
let require_signed = !matches!(
epics_base_rs::runtime::env::get("EPICS_CA_BEACON_REQUIRE_SIGNED").as_deref(),
Some("NO" | "no" | "0" | "false" | "FALSE")
);
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let ignored_servers: std::collections::HashSet<Ipv4Addr> =
super::epics_rs_client_ignore().into_iter().collect();
let mut buf = [0u8; 4096];
let mut control_rx_open = true;
loop {
let recv_fut = tokio::time::timeout(
REREGISTER_INTERVAL,
socket.recv_with_meta_with_drops(&mut buf),
);
let (meta, drops) = tokio::select! {
ctrl = control_rx.recv(), if control_rx_open => {
match ctrl {
Some(BeaconControl::ResetServer { server_addr }) => {
apply_reset_server(&mut servers, server_addr);
}
None => {
control_rx_open = false;
}
}
continue;
}
recv = recv_fut => {
match recv {
Ok(Ok(v)) => v,
Ok(Err(_)) => continue,
Err(_) => {
let _ = register_with_repeater(&socket).await;
continue;
}
}
}
};
if drops != 0 && drops != prev_drops_beacon {
tracing::debug!(
target: "epics_ca_rs::client::beacon_monitor",
prev = prev_drops_beacon,
drops,
"CA beacon RX socket buffer overflow"
);
}
prev_drops_beacon = drops;
let len = meta.n;
if len < CaHeader::SIZE {
continue;
}
let mut offset = 0;
while offset + CaHeader::SIZE <= len {
let Ok(hdr) = CaHeader::from_bytes(&buf[offset..len]) else {
break;
};
if (hdr.postsize as usize) & 0x7 != 0 {
break;
}
let payload_padded = hdr.postsize as usize;
let frame_len = (CaHeader::SIZE + payload_padded).max(CaHeader::SIZE);
if offset.saturating_add(frame_len) > len {
break;
}
#[cfg_attr(not(feature = "cap-tokens"), allow(unused_variables))]
let frame_start = offset;
offset += frame_len;
#[cfg(feature = "cap-tokens")]
if hdr.cmmd == crate::server::signed_beacon::CA_PROTO_RSRV_BEACON_SIG {
if let Some(ref v) = verifier {
let frame = &buf[frame_start..frame_start + frame_len];
let src_ip = match meta.src.ip() {
std::net::IpAddr::V4(v) => v,
std::net::IpAddr::V6(_) => {
metrics::counter!("ca_client_signed_beacon_failures_total")
.increment(1);
continue;
}
};
let via_repeater = src_ip.is_loopback();
match v.verify(frame) {
Ok((ip, port, beacon_id))
if !via_repeater && Ipv4Addr::from(ip) != src_ip =>
{
tracing::debug!(
announced = %Ipv4Addr::from(ip),
actual = %src_ip,
port, beacon_id,
"signed beacon source-IP mismatch (G3)"
);
metrics::counter!("ca_client_signed_beacon_source_ip_mismatch_total")
.increment(1);
}
Ok((ip, port, beacon_id)) => {
const MAX_VERIFIED_TUPLES: usize = 8192;
if verified_tuples.len() >= MAX_VERIFIED_TUPLES {
let max_age = std::time::Duration::from_secs(v.max_age_secs.max(1));
let now = std::time::Instant::now();
verified_tuples.retain(|_, t| now.duration_since(*t) <= max_age);
}
verified_tuples
.insert((ip, port, beacon_id), std::time::Instant::now());
metrics::counter!("ca_client_signed_beacon_verified_total")
.increment(1);
}
Err(e) => {
tracing::debug!(error = ?e,
"signed beacon companion failed verification");
metrics::counter!("ca_client_signed_beacon_failures_total")
.increment(1);
}
}
}
continue;
}
if hdr.cmmd != CA_PROTO_RSRV_IS_UP {
continue;
}
#[cfg(feature = "cap-tokens")]
if let Some(ref v) = verifier {
let max_age = std::time::Duration::from_secs(v.max_age_secs.max(1));
let now = std::time::Instant::now();
verified_tuples.retain(|_, t| now.duration_since(*t) <= max_age);
let lookup_ip_u32 = if hdr.available != 0 {
hdr.available
} else {
match meta.src.ip() {
std::net::IpAddr::V4(v) => u32::from_be_bytes(v.octets()),
std::net::IpAddr::V6(_) => 0,
}
};
let key = (lookup_ip_u32, hdr.count, hdr.cid);
if !verified_tuples.contains_key(&key) {
metrics::counter!("ca_client_unsigned_beacon_drops_total").increment(1);
if require_signed {
continue;
}
}
}
handle_beacon(hdr, &mut servers, &coord_tx, &ignored_servers);
}
}
}
fn apply_reset_server(servers: &mut HashMap<SocketAddr, BeaconState>, circuit_addr: SocketAddr) {
let port = circuit_addr.port();
let inaddr_any = SocketAddr::V4(SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, port));
if let Some(s) = servers.get_mut(&circuit_addr) {
s.period_estimate = None;
s.count = 0;
return;
}
if let Some(s) = servers.get_mut(&inaddr_any) {
s.period_estimate = None;
s.count = 0;
return;
}
let port_keys: Vec<SocketAddr> = servers
.keys()
.filter(|k| k.port() == port)
.copied()
.collect();
if port_keys.len() == 1 {
if let Some(s) = servers.get_mut(&port_keys[0]) {
s.period_estimate = None;
s.count = 0;
}
}
}
fn handle_beacon(
hdr: CaHeader,
servers: &mut HashMap<SocketAddr, BeaconState>,
coord_tx: &mpsc::UnboundedSender<CoordRequest>,
ignored_servers: &std::collections::HashSet<Ipv4Addr>,
) {
let server_port = if hdr.count != 0 {
hdr.count
} else {
epics_base_rs::runtime::env::get("EPICS_CA_SERVER_PORT")
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(CA_SERVER_PORT)
};
let beacon_id = hdr.cid;
let server_ip = Ipv4Addr::from(hdr.available.to_be_bytes());
if !server_ip.is_unspecified() && ignored_servers.contains(&server_ip) {
return;
}
let server_addr = SocketAddr::V4(SocketAddrV4::new(server_ip, server_port));
let now = Instant::now();
servers.retain(|_, s| now.duration_since(s.last_seen) < BEACON_STALE_THRESHOLD);
const MAX_BEACON_SERVERS: usize = 4096;
let first_sighting = !servers.contains_key(&server_addr);
if first_sighting && servers.len() >= MAX_BEACON_SERVERS {
let cutoff_threshold = Duration::from_secs(15 * 5);
servers.retain(|_, s| now.duration_since(s.last_seen) < cutoff_threshold);
}
let entry = servers.entry(server_addr).or_insert_with(|| BeaconState {
last_id: beacon_id.wrapping_sub(1),
last_seen: now,
period_estimate: None,
count: 0,
});
let actual_interval = now.duration_since(entry.last_seen);
let expected_next_id = entry.last_id.wrapping_add(1);
if !first_sighting && beacon_id == entry.last_id {
return;
}
const BACKWARDS_DUP_WINDOW: u32 = 4;
if !first_sighting {
let advance = beacon_id.wrapping_sub(entry.last_id);
let backwards_dup = advance > u32::MAX - BACKWARDS_DUP_WINDOW;
let small_forward_dup = advance == 2 || advance == 3;
if backwards_dup || small_forward_dup {
entry.last_id = beacon_id;
return;
}
}
const MIN_PERIOD_COLLAPSE_INTERVAL: Duration = Duration::from_millis(50);
let anomaly_kind = if first_sighting {
Some(BeaconAnomalyKind::FirstSighting)
} else if beacon_id != expected_next_id {
Some(BeaconAnomalyKind::IdMismatch)
} else if entry.count > 3
&& actual_interval > MIN_PERIOD_COLLAPSE_INTERVAL
&& entry
.period_estimate
.is_some_and(|est| actual_interval < est / 3)
{
entry.period_estimate = None;
entry.count = 0;
None
} else {
None
};
entry.last_id = beacon_id;
entry.last_seen = now;
entry.count += 1;
if entry.count > 1 {
match entry.period_estimate {
None => {
entry.period_estimate = Some(actual_interval);
}
Some(prev) => {
let alpha = 0.25;
let new_estimate = Duration::from_secs_f64(
prev.as_secs_f64() * (1.0 - alpha) + actual_interval.as_secs_f64() * alpha,
);
entry.period_estimate = Some(new_estimate);
}
}
}
if let Some(kind) = anomaly_kind {
let _ = coord_tx.send(CoordRequest::ForceRescanServer { server_addr, kind });
}
let arrival_anomaly = match anomaly_kind {
None => Some(false),
Some(BeaconAnomalyKind::IdMismatch | BeaconAnomalyKind::PeriodCollapse) => Some(true),
Some(BeaconAnomalyKind::FirstSighting) => None,
};
if let Some(anomaly) = arrival_anomaly {
let _ = coord_tx.send(CoordRequest::BeaconArrival {
server_addr,
anomaly,
});
}
}
async fn register_with_repeater(socket: &AsyncUdpV4) -> Result<(), ()> {
let local_ip = socket
.local_addrs()
.into_iter()
.find_map(|sa| match sa {
SocketAddr::V4(v4) => Some(*v4.ip()),
_ => None,
})
.unwrap_or(Ipv4Addr::LOCALHOST);
let mut hdr = CaHeader::new(CA_PROTO_REPEATER_REGISTER);
hdr.available = u32::from_be_bytes(local_ip.octets());
let repeater_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, repeater_port()));
socket
.send_to(&hdr.to_bytes(), repeater_addr)
.await
.map_err(|_| ())?;
let mut buf = [0u8; 64];
let result = tokio::time::timeout(Duration::from_millis(500), async {
loop {
let (meta, _drops) = socket
.recv_with_meta_with_drops(&mut buf)
.await
.map_err(|_| ())?;
let len = meta.n;
if len >= CaHeader::SIZE {
if let Ok(resp) = CaHeader::from_bytes(&buf[..len]) {
if resp.cmmd == CA_PROTO_REPEATER_CONFIRM {
return Ok::<(), ()>(());
}
}
}
}
})
.await;
match result {
Ok(Ok(())) => Ok(()),
_ => Err(()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn beacon_stale_threshold_is_180s() {
assert_eq!(BEACON_STALE_THRESHOLD, Duration::from_secs(180));
}
#[test]
fn duplicate_beacon_does_not_double_fire_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
assert!(matches!(
rx.try_recv(),
Ok(CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::FirstSighting,
..
})
));
assert!(rx.try_recv().is_err());
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
assert!(
rx.try_recv().is_err(),
"duplicate same-cid beacon must not fire ForceRescanServer"
);
}
#[test]
fn sub_50ms_restart_via_id_mismatch_still_fires_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
assert!(rx.try_recv().is_ok());
hdr.cid = 1;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
assert!(
matches!(
rx.try_recv(),
Ok(CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::IdMismatch,
..
})
),
"id-mismatch restart must fire IdMismatch anomaly even when interval < 50ms"
);
}
#[test]
fn monotonic_id_sub_period_clears_ema_no_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
servers.insert(
server,
BeaconState {
last_id: 99,
last_seen: Instant::now() - Duration::from_millis(200),
period_estimate: Some(Duration::from_secs(15)),
count: 10,
},
);
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
hdr.cid = 100; handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
while let Ok(msg) = rx.try_recv() {
if let CoordRequest::ForceRescanServer { kind, .. } = msg {
panic!(
"monotonic-id, sub-period interval must NOT fire \
ForceRescanServer ({kind:?}) — it is the IOC's \
`beacon_reset` ramp-up cascade triggered by some \
peer's TCP accept, not a real restart"
);
}
}
let s = servers.get(&server).expect("entry");
assert!(
s.period_estimate.is_none(),
"self-reset must clear period_estimate"
);
assert_eq!(
s.count, 1,
"self-reset zeros count, then +1 for this beacon"
);
assert_eq!(
s.last_id, 100,
"last_id advanced normally — the beacon was accepted"
);
}
#[test]
fn rsrv_rampup_beacons_do_not_fire_period_collapse() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
hdr.cid = 0;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
let mut first_sighting_seen = false;
while let Ok(msg) = rx.try_recv() {
if matches!(
msg,
CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::FirstSighting,
..
}
) {
first_sighting_seen = true;
}
}
assert!(first_sighting_seen, "first beacon must fire FirstSighting");
let intervals_ms = [20u64, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240];
for (i, &ms) in intervals_ms.iter().enumerate() {
let s = servers.get_mut(&server).expect("entry");
s.last_seen = std::time::Instant::now() - Duration::from_millis(ms);
hdr.cid = (i as u32) + 1;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
while let Ok(msg) = rx.try_recv() {
if let CoordRequest::ForceRescanServer { kind, .. } = msg {
assert_ne!(
kind,
BeaconAnomalyKind::PeriodCollapse,
"ramp-up beacon #{} (interval={} ms) must not classify \
as PeriodCollapse — see BeaconState::period_estimate doc",
i + 2,
ms
);
}
}
}
}
#[test]
fn fast_cadence_monotonic_ids_does_not_fire_spurious_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for id in 100..105 {
hdr.cid = id;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
}
let mut search_wakes = 0;
let mut healthy_arrivals = 0;
let mut anomaly_arrivals = 0;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::ForceRescanServer { .. } => search_wakes += 1,
CoordRequest::BeaconArrival { anomaly: false, .. } => healthy_arrivals += 1,
CoordRequest::BeaconArrival { anomaly: true, .. } => anomaly_arrivals += 1,
_ => {}
}
}
assert_eq!(
search_wakes, 1,
"monotonic fast-cadence beacons must wake searches only on first sighting"
);
assert_eq!(
anomaly_arrivals, 0,
"monotonic fast-cadence beacons must not flag the watchdog"
);
assert_eq!(
healthy_arrivals, 4,
"each post-first-sighting healthy beacon must refresh the transport watchdog"
);
}
#[test]
fn id_mismatch_emits_anomaly_beacon_arrival() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
while rx.try_recv().is_ok() {}
hdr.cid = 1;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
let mut saw_search_wake = false;
let mut saw_anomaly_arrival = false;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::IdMismatch,
..
} => saw_search_wake = true,
CoordRequest::BeaconArrival { anomaly: true, .. } => saw_anomaly_arrival = true,
_ => {}
}
}
assert!(saw_search_wake, "IdMismatch must wake searches");
assert!(
saw_anomaly_arrival,
"IdMismatch must flag the transport watchdog"
);
}
#[test]
fn first_sighting_does_not_emit_beacon_arrival() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
let mut saw_first_sighting = false;
let mut saw_arrival = false;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::FirstSighting,
..
} => saw_first_sighting = true,
CoordRequest::BeaconArrival { .. } => saw_arrival = true,
_ => {}
}
}
assert!(saw_first_sighting, "first sighting must wake searches");
assert!(
!saw_arrival,
"first sighting must not touch the transport watchdog"
);
}
#[test]
fn small_forward_advance_is_dropped_not_classified() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for id in 100..103 {
hdr.cid = id;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
}
while rx.try_recv().is_ok() {}
hdr.cid = 104;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
assert!(
rx.try_recv().is_err(),
"advance=2 must be silently dropped, not classified as anomaly"
);
hdr.cid = 107;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
assert!(rx.try_recv().is_err(), "advance=3 must be silently dropped");
hdr.cid = 108;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
let mut saw_arrival_healthy = false;
let mut saw_anomaly = false;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::BeaconArrival { anomaly: false, .. } => saw_arrival_healthy = true,
CoordRequest::BeaconArrival { anomaly: true, .. }
| CoordRequest::ForceRescanServer { .. } => saw_anomaly = true,
_ => {}
}
}
assert!(
saw_arrival_healthy,
"after dropped dups, advance=1 must classify as healthy"
);
assert!(
!saw_anomaly,
"monotonic recovery from drop sequence must not fire anomaly"
);
}
#[test]
fn small_backwards_advance_is_dropped() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for id in 100..103 {
hdr.cid = id;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
}
while rx.try_recv().is_ok() {}
hdr.cid = 101;
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
assert!(
rx.try_recv().is_err(),
"backwards-by-1 (within 256) must drop"
);
}
#[test]
fn stale_prune_drops_idle_entries_only() {
let base = Instant::now();
let now = base + Duration::from_secs(300);
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let fresh: SocketAddr = "127.0.0.1:5064".parse().unwrap();
let stale: SocketAddr = "127.0.0.1:5065".parse().unwrap();
servers.insert(
fresh,
BeaconState {
last_id: 0,
last_seen: now - Duration::from_secs(10),
period_estimate: Some(Duration::from_secs(15)),
count: 5,
},
);
servers.insert(
stale,
BeaconState {
last_id: 0,
last_seen: now - Duration::from_secs(300),
period_estimate: Some(Duration::from_secs(15)),
count: 5,
},
);
servers.retain(|_, s| now.duration_since(s.last_seen) < BEACON_STALE_THRESHOLD);
assert!(
servers.contains_key(&fresh),
"fresh entry must survive prune"
);
assert!(
!servers.contains_key(&stale),
"180-s-idle entry must be pruned"
);
}
#[test]
fn reset_on_connect_breaks_period_collapse_cascade_after_reconnect() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
servers.insert(
server,
BeaconState {
last_id: 999,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
apply_reset_server(&mut servers, server);
let s = servers.get(&server).expect("entry survives reset");
assert!(
s.period_estimate.is_none(),
"ResetServer must clear period_estimate"
);
assert_eq!(s.count, 0, "ResetServer must zero count");
assert_eq!(
s.last_id, 999,
"ResetServer must preserve last_id (dedup still works)"
);
let intervals_ms = [20u64, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240];
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for (i, &ms) in intervals_ms.iter().enumerate() {
let s = servers.get_mut(&server).expect("entry");
s.last_seen = Instant::now() - Duration::from_millis(ms);
hdr.cid = 1000 + (i as u32);
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
while let Ok(msg) = rx.try_recv() {
if let CoordRequest::ForceRescanServer { kind, .. } = msg {
assert_ne!(
kind,
BeaconAnomalyKind::PeriodCollapse,
"ramp-up beacon #{} (interval={} ms) after \
ResetServer must not classify as PeriodCollapse \
— the cascade is the archiver-rs reconnect noise \
this fix targets",
i + 1,
ms
);
}
}
}
}
#[test]
fn peer_connect_ramp_up_does_not_fire_period_collapse() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
servers.insert(
server,
BeaconState {
last_id: 999,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
let intervals_ms = [20u64, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240];
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for (i, &ms) in intervals_ms.iter().enumerate() {
let s = servers.get_mut(&server).expect("entry");
s.last_seen = Instant::now() - Duration::from_millis(ms);
hdr.cid = 1000 + (i as u32);
handle_beacon(hdr, &mut servers, &tx, &std::collections::HashSet::new());
while let Ok(msg) = rx.try_recv() {
if let CoordRequest::ForceRescanServer { kind, .. } = msg {
assert_ne!(
kind,
BeaconAnomalyKind::PeriodCollapse,
"peer-connect ramp-up beacon #{} (interval={} ms) \
must NOT classify as PeriodCollapse — \
the self-reset path in handle_beacon absorbs it",
i + 1,
ms
);
}
}
}
let s = servers.get(&server).expect("entry");
assert_eq!(s.last_id, 1009, "last_id must track ramp-up ids");
assert!(
s.period_estimate.is_some(),
"EMA must be reseeded after the cascade"
);
}
#[test]
fn reset_unknown_server_is_noop() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
apply_reset_server(&mut servers, server);
assert!(servers.is_empty());
}
#[test]
fn reset_matches_inaddr_any_announced_entry() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let inaddr_any: SocketAddr = "0.0.0.0:5064".parse().unwrap();
servers.insert(
inaddr_any,
BeaconState {
last_id: 999,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
apply_reset_server(&mut servers, circuit);
let s = servers.get(&inaddr_any).expect("entry preserved");
assert!(
s.period_estimate.is_none(),
"INADDR_ANY-keyed entry must be reset by port-match"
);
assert_eq!(s.count, 0);
assert_eq!(s.last_id, 999, "last_id preserved across reset");
}
#[test]
fn reset_matches_multihomed_announced_entry() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let nic_a: SocketAddr = "10.0.0.1:5064".parse().unwrap();
servers.insert(
nic_a,
BeaconState {
last_id: 42,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 100,
},
);
let nic_b: SocketAddr = "10.0.0.2:5064".parse().unwrap();
apply_reset_server(&mut servers, nic_b);
let s = servers.get(&nic_a).expect("entry preserved");
assert!(s.period_estimate.is_none());
assert_eq!(s.count, 0);
}
#[test]
fn reset_exact_does_not_cascade_to_inaddr_any_other_ioc() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let target: SocketAddr = "10.0.0.5:5064".parse().unwrap();
let inaddr_any: SocketAddr = "0.0.0.0:5064".parse().unwrap();
servers.insert(
target,
BeaconState {
last_id: 1,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
servers.insert(
inaddr_any,
BeaconState {
last_id: 2,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 500,
},
);
apply_reset_server(&mut servers, target);
let t = servers.get(&target).expect("target preserved");
assert!(t.period_estimate.is_none(), "exact-match target reset");
assert_eq!(t.count, 0);
let i = servers.get(&inaddr_any).expect("inaddr-any preserved");
assert_eq!(
i.period_estimate,
Some(Duration::from_secs(15)),
"INADDR_ANY entry from a different IOC must not be touched \
after an exact-match hit"
);
assert_eq!(i.count, 500);
}
#[test]
fn reset_does_not_blind_other_same_port_ioc() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let target: SocketAddr = "10.0.0.5:5064".parse().unwrap();
let neighbour: SocketAddr = "10.0.0.7:5064".parse().unwrap();
servers.insert(
target,
BeaconState {
last_id: 1000,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
servers.insert(
neighbour,
BeaconState {
last_id: 2000,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 5000,
},
);
apply_reset_server(&mut servers, target);
let t = servers.get(&target).expect("target preserved");
assert!(t.period_estimate.is_none(), "exact-match target reset");
assert_eq!(t.count, 0);
let n = servers.get(&neighbour).expect("neighbour preserved");
assert_eq!(
n.period_estimate,
Some(Duration::from_secs(15)),
"unrelated same-port IOC must NOT have its EMA cleared — \
that would disable PeriodCollapse on its next restart"
);
assert_eq!(n.count, 5000, "neighbour count untouched");
}
#[test]
fn reset_skips_when_ambiguous_no_exact_no_inaddr_any() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let a: SocketAddr = "10.0.0.7:5064".parse().unwrap();
let b: SocketAddr = "10.0.0.9:5064".parse().unwrap();
servers.insert(
a,
BeaconState {
last_id: 1,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 100,
},
);
servers.insert(
b,
BeaconState {
last_id: 2,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 200,
},
);
let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
apply_reset_server(&mut servers, circuit);
for key in [a, b] {
let s = servers.get(&key).expect("entry preserved");
assert_eq!(
s.period_estimate,
Some(Duration::from_secs(15)),
"ambiguous fallback must not blind {key}"
);
}
}
#[test]
fn reset_via_inaddr_any_does_not_touch_unrelated_real_ip_entries() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let inaddr_any: SocketAddr = "0.0.0.0:5064".parse().unwrap();
let unrelated: SocketAddr = "10.0.0.7:5064".parse().unwrap();
servers.insert(
inaddr_any,
BeaconState {
last_id: 1,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 100,
},
);
servers.insert(
unrelated,
BeaconState {
last_id: 2,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 200,
},
);
let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
apply_reset_server(&mut servers, circuit);
let i = servers.get(&inaddr_any).expect("inaddr-any preserved");
assert!(i.period_estimate.is_none(), "INADDR_ANY entry reset");
let u = servers.get(&unrelated).expect("unrelated preserved");
assert_eq!(
u.period_estimate,
Some(Duration::from_secs(15)),
"unrelated same-port real-IP entry must not be touched"
);
assert_eq!(u.count, 200);
}
#[test]
fn reset_leaves_other_port_entries_alone() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let other: SocketAddr = "0.0.0.0:5065".parse().unwrap();
servers.insert(
other,
BeaconState {
last_id: 7,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 50,
},
);
let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
apply_reset_server(&mut servers, circuit);
let s = servers.get(&other).expect("entry preserved");
assert_eq!(
s.period_estimate,
Some(Duration::from_secs(15)),
"different-port entry must not be touched"
);
assert_eq!(s.count, 50);
}
#[cfg(feature = "cap-tokens")]
#[test]
fn verified_tuple_key_matches_via_repeater() {
use crate::server::signed_beacon::{SignedBeaconEmitter, SignedBeaconVerifier};
use ed25519_dalek::SigningKey;
use rand_core::OsRng;
use std::net::Ipv4Addr;
use std::time::SystemTime;
let mut csprng = OsRng;
let signing_key = SigningKey::generate(&mut csprng);
let socket = std::sync::Arc::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap() }),
);
let server_ip_u32 = u32::from_be_bytes([10, 0, 0, 5]);
let server_port: u16 = 5064;
let beacon_id: u32 = 42;
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let emitter = SignedBeaconEmitter::new(signing_key.clone(), socket, vec![]);
let packet = emitter.build_packet(server_ip_u32, server_port, beacon_id, ts);
let mut verifier = SignedBeaconVerifier::new();
verifier.trust(signing_key.verifying_key());
let (verified_ip, verified_port, verified_bid) =
verifier.verify(&packet).expect("signature verifies");
let meta_src_via_repeater = Ipv4Addr::LOCALHOST;
assert!(
meta_src_via_repeater.is_loopback(),
"topology precondition: client beacon socket binds to LOCALHOST"
);
let mut verified_tuples: HashMap<(u32, u16, u32), Instant> = HashMap::new();
verified_tuples.insert((verified_ip, verified_port, verified_bid), Instant::now());
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.data_type = 0;
hdr.count = server_port;
hdr.cid = beacon_id;
hdr.available = server_ip_u32;
let lookup_ip_u32 = if hdr.available != 0 {
hdr.available
} else {
u32::from_be_bytes(meta_src_via_repeater.octets())
};
let key = (lookup_ip_u32, hdr.count, hdr.cid);
assert!(
verified_tuples.contains_key(&key),
"regression: regular beacon with hdr.available rewritten by \
the repeater must hit the companion-inserted tuple"
);
let r7_key = (
u32::from_be_bytes(meta_src_via_repeater.octets()),
hdr.count,
hdr.cid,
);
assert!(
!verified_tuples.contains_key(&r7_key),
"documents the earlier failure mode: meta.src=127.0.0.1 key never matches"
);
}
#[cfg(feature = "cap-tokens")]
#[test]
fn verified_tuple_key_falls_back_to_src_for_direct_lan() {
use crate::server::signed_beacon::{SignedBeaconEmitter, SignedBeaconVerifier};
use ed25519_dalek::SigningKey;
use rand_core::OsRng;
use std::net::Ipv4Addr;
use std::time::SystemTime;
let mut csprng = OsRng;
let signing_key = SigningKey::generate(&mut csprng);
let socket = std::sync::Arc::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap() }),
);
let server_ip = Ipv4Addr::new(10, 0, 0, 5);
let server_ip_u32 = u32::from_be_bytes(server_ip.octets());
let server_port: u16 = 5064;
let beacon_id: u32 = 99;
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let emitter = SignedBeaconEmitter::new(signing_key.clone(), socket, vec![]);
let packet = emitter.build_packet(server_ip_u32, server_port, beacon_id, ts);
let mut verifier = SignedBeaconVerifier::new();
verifier.trust(signing_key.verifying_key());
let (verified_ip, verified_port, verified_bid) =
verifier.verify(&packet).expect("signature verifies");
let mut verified_tuples: HashMap<(u32, u16, u32), Instant> = HashMap::new();
verified_tuples.insert((verified_ip, verified_port, verified_bid), Instant::now());
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.data_type = 0;
hdr.count = server_port;
hdr.cid = beacon_id;
hdr.available = 0;
let meta_src = server_ip;
let lookup_ip_u32 = if hdr.available != 0 {
hdr.available
} else {
u32::from_be_bytes(meta_src.octets())
};
let key = (lookup_ip_u32, hdr.count, hdr.cid);
assert!(
verified_tuples.contains_key(&key),
"direct-LAN fallback: meta.src.ip() lookup must hit when \
hdr.available is zero"
);
}
}