use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::time::{Duration, Instant};
use epics_base_rs::net::AsyncUdpV4;
use epics_base_rs::runtime::sync::mpsc;
use crate::protocol::*;
use super::CoordRequest;
pub(crate) enum BeaconControl {
ResetServer { server_addr: SocketAddr },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BeaconAnomalyKind {
FirstSighting,
IdMismatch,
#[allow(dead_code)]
PeriodCollapse,
}
struct BeaconState {
last_id: u32,
last_seen: Instant,
period_estimate: Option<Duration>,
count: u64,
}
const BEACON_STALE_THRESHOLD: Duration = Duration::from_secs(180);
const REREGISTER_INTERVAL: Duration = Duration::from_secs(300);
pub(crate) async fn run_beacon_monitor(
coord_tx: mpsc::UnboundedSender<CoordRequest>,
control_rx: mpsc::UnboundedReceiver<BeaconControl>,
) {
run_beacon_monitor_inner(
coord_tx,
control_rx,
#[cfg(feature = "cap-tokens")]
None,
)
.await;
}
#[cfg(feature = "cap-tokens")]
#[allow(dead_code)]
pub(crate) async fn run_beacon_monitor_with_verifier(
coord_tx: mpsc::UnboundedSender<CoordRequest>,
control_rx: mpsc::UnboundedReceiver<BeaconControl>,
verifier: std::sync::Arc<crate::server::signed_beacon::SignedBeaconVerifier>,
) {
run_beacon_monitor_inner(coord_tx, control_rx, Some(verifier)).await;
}
async fn run_beacon_monitor_inner(
coord_tx: mpsc::UnboundedSender<CoordRequest>,
mut control_rx: mpsc::UnboundedReceiver<BeaconControl>,
#[cfg(feature = "cap-tokens")] verifier: Option<
std::sync::Arc<crate::server::signed_beacon::SignedBeaconVerifier>,
>,
) {
let socket = match AsyncUdpV4::bind_single(Ipv4Addr::LOCALHOST, 0, false) {
Ok(s) => s,
Err(_) => return,
};
for attempt in 0..3u32 {
if register_with_repeater(&socket).await.is_ok() {
break;
}
if attempt < 2 {
tokio::time::sleep(Duration::from_millis(200 * (1 << attempt))).await;
}
}
#[cfg(feature = "cap-tokens")]
let mut verified_tuples: HashMap<(u32, u16, u32), std::time::Instant> = HashMap::new();
#[cfg(feature = "cap-tokens")]
let require_signed = !matches!(
epics_base_rs::runtime::env::get("EPICS_CA_BEACON_REQUIRE_SIGNED").as_deref(),
Some("NO" | "no" | "0" | "false" | "FALSE")
);
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let mut buf = [0u8; 4096];
let mut control_rx_open = true;
loop {
let recv_fut = tokio::time::timeout(REREGISTER_INTERVAL, socket.recv_from(&mut buf));
let (len, _src) = tokio::select! {
ctrl = control_rx.recv(), if control_rx_open => {
match ctrl {
Some(BeaconControl::ResetServer { server_addr }) => {
apply_reset_server(&mut servers, server_addr);
}
None => {
control_rx_open = false;
}
}
continue;
}
recv = recv_fut => {
match recv {
Ok(Ok(v)) => v,
Ok(Err(_)) => continue,
Err(_) => {
let _ = register_with_repeater(&socket).await;
continue;
}
}
}
};
if len < CaHeader::SIZE {
continue;
}
let mut offset = 0;
while offset + CaHeader::SIZE <= len {
let Ok(hdr) = CaHeader::from_bytes(&buf[offset..len]) else {
break;
};
let payload_padded = ((hdr.postsize as usize) + 7) & !7;
let frame_len = (CaHeader::SIZE + payload_padded).max(CaHeader::SIZE);
if offset.saturating_add(frame_len) > len {
break;
}
#[cfg_attr(not(feature = "cap-tokens"), allow(unused_variables))]
let frame_start = offset;
offset += frame_len;
#[cfg(feature = "cap-tokens")]
if hdr.cmmd == crate::server::signed_beacon::CA_PROTO_RSRV_BEACON_SIG {
if let Some(ref v) = verifier {
let frame = &buf[frame_start..frame_start + frame_len];
let src_ip = match _src.ip() {
std::net::IpAddr::V4(v) => v,
std::net::IpAddr::V6(_) => {
metrics::counter!("ca_client_signed_beacon_failures_total")
.increment(1);
continue;
}
};
match v.verify(frame) {
Ok((ip, port, beacon_id)) if Ipv4Addr::from(ip) != src_ip => {
tracing::debug!(
announced = %Ipv4Addr::from(ip),
actual = %src_ip,
port, beacon_id,
"signed beacon source-IP mismatch (G3)"
);
metrics::counter!("ca_client_signed_beacon_source_ip_mismatch_total")
.increment(1);
}
Ok((ip, port, beacon_id)) => {
const MAX_VERIFIED_TUPLES: usize = 8192;
if verified_tuples.len() >= MAX_VERIFIED_TUPLES {
let max_age = std::time::Duration::from_secs(v.max_age_secs.max(1));
let now = std::time::Instant::now();
verified_tuples.retain(|_, t| now.duration_since(*t) <= max_age);
}
verified_tuples
.insert((ip, port, beacon_id), std::time::Instant::now());
metrics::counter!("ca_client_signed_beacon_verified_total")
.increment(1);
}
Err(e) => {
tracing::debug!(error = ?e,
"signed beacon companion failed verification");
metrics::counter!("ca_client_signed_beacon_failures_total")
.increment(1);
}
}
}
continue;
}
if hdr.cmmd != CA_PROTO_RSRV_IS_UP {
continue;
}
#[cfg(feature = "cap-tokens")]
if let Some(ref v) = verifier {
let max_age = std::time::Duration::from_secs(v.max_age_secs.max(1));
let now = std::time::Instant::now();
verified_tuples.retain(|_, t| now.duration_since(*t) <= max_age);
let key = (hdr.available, hdr.count, hdr.cid);
if !verified_tuples.contains_key(&key) {
metrics::counter!("ca_client_unsigned_beacon_drops_total").increment(1);
if require_signed {
continue;
}
}
}
handle_beacon(hdr, &mut servers, &coord_tx);
}
}
}
fn apply_reset_server(servers: &mut HashMap<SocketAddr, BeaconState>, circuit_addr: SocketAddr) {
let port = circuit_addr.port();
let inaddr_any = SocketAddr::V4(SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, port));
if let Some(s) = servers.get_mut(&circuit_addr) {
s.period_estimate = None;
s.count = 0;
return;
}
if let Some(s) = servers.get_mut(&inaddr_any) {
s.period_estimate = None;
s.count = 0;
return;
}
let port_keys: Vec<SocketAddr> = servers
.keys()
.filter(|k| k.port() == port)
.copied()
.collect();
if port_keys.len() == 1 {
if let Some(s) = servers.get_mut(&port_keys[0]) {
s.period_estimate = None;
s.count = 0;
}
}
}
fn handle_beacon(
hdr: CaHeader,
servers: &mut HashMap<SocketAddr, BeaconState>,
coord_tx: &mpsc::UnboundedSender<CoordRequest>,
) {
let server_port = if hdr.count != 0 {
hdr.count
} else {
CA_SERVER_PORT
};
let beacon_id = hdr.cid;
let server_ip = Ipv4Addr::from(hdr.available.to_be_bytes());
let server_addr = SocketAddr::V4(SocketAddrV4::new(server_ip, server_port));
let now = Instant::now();
servers.retain(|_, s| now.duration_since(s.last_seen) < BEACON_STALE_THRESHOLD);
const MAX_BEACON_SERVERS: usize = 4096;
let first_sighting = !servers.contains_key(&server_addr);
if first_sighting && servers.len() >= MAX_BEACON_SERVERS {
let cutoff_threshold = Duration::from_secs(15 * 5);
servers.retain(|_, s| now.duration_since(s.last_seen) < cutoff_threshold);
}
let entry = servers.entry(server_addr).or_insert_with(|| BeaconState {
last_id: beacon_id.wrapping_sub(1),
last_seen: now,
period_estimate: None,
count: 0,
});
let actual_interval = now.duration_since(entry.last_seen);
let expected_next_id = entry.last_id.wrapping_add(1);
if !first_sighting && beacon_id == entry.last_id {
return;
}
const BACKWARDS_DUP_WINDOW: u32 = 4;
if !first_sighting {
let advance = beacon_id.wrapping_sub(entry.last_id);
let backwards_dup = advance > u32::MAX - BACKWARDS_DUP_WINDOW;
let small_forward_dup = advance == 2 || advance == 3;
if backwards_dup || small_forward_dup {
entry.last_id = beacon_id;
return;
}
}
const MIN_PERIOD_COLLAPSE_INTERVAL: Duration = Duration::from_millis(50);
let anomaly_kind = if first_sighting {
Some(BeaconAnomalyKind::FirstSighting)
} else if beacon_id != expected_next_id {
Some(BeaconAnomalyKind::IdMismatch)
} else if entry.count > 3
&& actual_interval > MIN_PERIOD_COLLAPSE_INTERVAL
&& entry
.period_estimate
.is_some_and(|est| actual_interval < est / 3)
{
entry.period_estimate = None;
entry.count = 0;
None
} else {
None
};
entry.last_id = beacon_id;
entry.last_seen = now;
entry.count += 1;
if entry.count > 1 {
match entry.period_estimate {
None => {
entry.period_estimate = Some(actual_interval);
}
Some(prev) => {
let alpha = 0.25;
let new_estimate = Duration::from_secs_f64(
prev.as_secs_f64() * (1.0 - alpha) + actual_interval.as_secs_f64() * alpha,
);
entry.period_estimate = Some(new_estimate);
}
}
}
if let Some(kind) = anomaly_kind {
let _ = coord_tx.send(CoordRequest::ForceRescanServer { server_addr, kind });
}
let arrival_anomaly = match anomaly_kind {
None => Some(false),
Some(BeaconAnomalyKind::IdMismatch | BeaconAnomalyKind::PeriodCollapse) => Some(true),
Some(BeaconAnomalyKind::FirstSighting) => None,
};
if let Some(anomaly) = arrival_anomaly {
let _ = coord_tx.send(CoordRequest::BeaconArrival {
server_addr,
anomaly,
});
}
}
async fn register_with_repeater(socket: &AsyncUdpV4) -> Result<(), ()> {
let local_ip = socket
.local_addrs()
.into_iter()
.find_map(|sa| match sa {
SocketAddr::V4(v4) => Some(*v4.ip()),
_ => None,
})
.unwrap_or(Ipv4Addr::LOCALHOST);
let mut hdr = CaHeader::new(CA_PROTO_REPEATER_REGISTER);
hdr.available = u32::from_be_bytes(local_ip.octets());
let repeater_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, CA_REPEATER_PORT));
socket
.send_to(&hdr.to_bytes(), repeater_addr)
.await
.map_err(|_| ())?;
let mut buf = [0u8; 64];
let result = tokio::time::timeout(Duration::from_millis(500), async {
loop {
let (len, _) = socket.recv_from(&mut buf).await.map_err(|_| ())?;
if len >= CaHeader::SIZE {
if let Ok(resp) = CaHeader::from_bytes(&buf[..len]) {
if resp.cmmd == CA_PROTO_REPEATER_CONFIRM {
return Ok::<(), ()>(());
}
}
}
}
})
.await;
match result {
Ok(Ok(())) => Ok(()),
_ => Err(()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn beacon_stale_threshold_is_180s() {
assert_eq!(BEACON_STALE_THRESHOLD, Duration::from_secs(180));
}
#[test]
fn duplicate_beacon_does_not_double_fire_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx);
assert!(matches!(
rx.try_recv(),
Ok(CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::FirstSighting,
..
})
));
assert!(rx.try_recv().is_err());
handle_beacon(hdr, &mut servers, &tx);
assert!(
rx.try_recv().is_err(),
"duplicate same-cid beacon must not fire ForceRescanServer"
);
}
#[test]
fn sub_50ms_restart_via_id_mismatch_still_fires_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx);
assert!(rx.try_recv().is_ok());
hdr.cid = 1;
handle_beacon(hdr, &mut servers, &tx);
assert!(
matches!(
rx.try_recv(),
Ok(CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::IdMismatch,
..
})
),
"id-mismatch restart must fire IdMismatch anomaly even when interval < 50ms"
);
}
#[test]
fn monotonic_id_sub_period_clears_ema_no_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
servers.insert(
server,
BeaconState {
last_id: 99,
last_seen: Instant::now() - Duration::from_millis(200),
period_estimate: Some(Duration::from_secs(15)),
count: 10,
},
);
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
hdr.cid = 100; handle_beacon(hdr, &mut servers, &tx);
while let Ok(msg) = rx.try_recv() {
if let CoordRequest::ForceRescanServer { kind, .. } = msg {
panic!(
"monotonic-id, sub-period interval must NOT fire \
ForceRescanServer ({kind:?}) — it is the IOC's \
`beacon_reset` ramp-up cascade triggered by some \
peer's TCP accept, not a real restart"
);
}
}
let s = servers.get(&server).expect("entry");
assert!(
s.period_estimate.is_none(),
"self-reset must clear period_estimate"
);
assert_eq!(
s.count, 1,
"self-reset zeros count, then +1 for this beacon"
);
assert_eq!(
s.last_id, 100,
"last_id advanced normally — the beacon was accepted"
);
}
#[test]
fn rsrv_rampup_beacons_do_not_fire_period_collapse() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
hdr.cid = 0;
handle_beacon(hdr, &mut servers, &tx);
let mut first_sighting_seen = false;
while let Ok(msg) = rx.try_recv() {
if matches!(
msg,
CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::FirstSighting,
..
}
) {
first_sighting_seen = true;
}
}
assert!(first_sighting_seen, "first beacon must fire FirstSighting");
let intervals_ms = [20u64, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240];
for (i, &ms) in intervals_ms.iter().enumerate() {
let s = servers.get_mut(&server).expect("entry");
s.last_seen = std::time::Instant::now() - Duration::from_millis(ms);
hdr.cid = (i as u32) + 1;
handle_beacon(hdr, &mut servers, &tx);
while let Ok(msg) = rx.try_recv() {
if let CoordRequest::ForceRescanServer { kind, .. } = msg {
assert_ne!(
kind,
BeaconAnomalyKind::PeriodCollapse,
"ramp-up beacon #{} (interval={} ms) must not classify \
as PeriodCollapse — see BeaconState::period_estimate doc",
i + 2,
ms
);
}
}
}
}
#[test]
fn fast_cadence_monotonic_ids_does_not_fire_spurious_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for id in 100..105 {
hdr.cid = id;
handle_beacon(hdr, &mut servers, &tx);
}
let mut search_wakes = 0;
let mut healthy_arrivals = 0;
let mut anomaly_arrivals = 0;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::ForceRescanServer { .. } => search_wakes += 1,
CoordRequest::BeaconArrival { anomaly: false, .. } => healthy_arrivals += 1,
CoordRequest::BeaconArrival { anomaly: true, .. } => anomaly_arrivals += 1,
_ => {}
}
}
assert_eq!(
search_wakes, 1,
"monotonic fast-cadence beacons must wake searches only on first sighting"
);
assert_eq!(
anomaly_arrivals, 0,
"monotonic fast-cadence beacons must not flag the watchdog"
);
assert_eq!(
healthy_arrivals, 4,
"each post-first-sighting healthy beacon must refresh the transport watchdog"
);
}
#[test]
fn id_mismatch_emits_anomaly_beacon_arrival() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx);
while rx.try_recv().is_ok() {}
hdr.cid = 1;
handle_beacon(hdr, &mut servers, &tx);
let mut saw_search_wake = false;
let mut saw_anomaly_arrival = false;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::IdMismatch,
..
} => saw_search_wake = true,
CoordRequest::BeaconArrival { anomaly: true, .. } => saw_anomaly_arrival = true,
_ => {}
}
}
assert!(saw_search_wake, "IdMismatch must wake searches");
assert!(
saw_anomaly_arrival,
"IdMismatch must flag the transport watchdog"
);
}
#[test]
fn first_sighting_does_not_emit_beacon_arrival() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx);
let mut saw_first_sighting = false;
let mut saw_arrival = false;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::FirstSighting,
..
} => saw_first_sighting = true,
CoordRequest::BeaconArrival { .. } => saw_arrival = true,
_ => {}
}
}
assert!(saw_first_sighting, "first sighting must wake searches");
assert!(
!saw_arrival,
"first sighting must not touch the transport watchdog"
);
}
#[test]
fn small_forward_advance_is_dropped_not_classified() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for id in 100..103 {
hdr.cid = id;
handle_beacon(hdr, &mut servers, &tx);
}
while rx.try_recv().is_ok() {}
hdr.cid = 104;
handle_beacon(hdr, &mut servers, &tx);
assert!(
rx.try_recv().is_err(),
"advance=2 must be silently dropped, not classified as anomaly"
);
hdr.cid = 107;
handle_beacon(hdr, &mut servers, &tx);
assert!(rx.try_recv().is_err(), "advance=3 must be silently dropped");
hdr.cid = 108;
handle_beacon(hdr, &mut servers, &tx);
let mut saw_arrival_healthy = false;
let mut saw_anomaly = false;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::BeaconArrival { anomaly: false, .. } => saw_arrival_healthy = true,
CoordRequest::BeaconArrival { anomaly: true, .. }
| CoordRequest::ForceRescanServer { .. } => saw_anomaly = true,
_ => {}
}
}
assert!(
saw_arrival_healthy,
"after dropped dups, advance=1 must classify as healthy"
);
assert!(
!saw_anomaly,
"monotonic recovery from drop sequence must not fire anomaly"
);
}
#[test]
fn small_backwards_advance_is_dropped() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for id in 100..103 {
hdr.cid = id;
handle_beacon(hdr, &mut servers, &tx);
}
while rx.try_recv().is_ok() {}
hdr.cid = 101;
handle_beacon(hdr, &mut servers, &tx);
assert!(
rx.try_recv().is_err(),
"backwards-by-1 (within 256) must drop"
);
}
#[test]
fn stale_prune_drops_idle_entries_only() {
let now = Instant::now();
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let fresh: SocketAddr = "127.0.0.1:5064".parse().unwrap();
let stale: SocketAddr = "127.0.0.1:5065".parse().unwrap();
servers.insert(
fresh,
BeaconState {
last_id: 0,
last_seen: now - Duration::from_secs(10),
period_estimate: Some(Duration::from_secs(15)),
count: 5,
},
);
servers.insert(
stale,
BeaconState {
last_id: 0,
last_seen: now - Duration::from_secs(300),
period_estimate: Some(Duration::from_secs(15)),
count: 5,
},
);
servers.retain(|_, s| now.duration_since(s.last_seen) < BEACON_STALE_THRESHOLD);
assert!(
servers.contains_key(&fresh),
"fresh entry must survive prune"
);
assert!(
!servers.contains_key(&stale),
"180-s-idle entry must be pruned"
);
}
#[test]
fn reset_on_connect_breaks_period_collapse_cascade_after_reconnect() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
servers.insert(
server,
BeaconState {
last_id: 999,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
apply_reset_server(&mut servers, server);
let s = servers.get(&server).expect("entry survives reset");
assert!(
s.period_estimate.is_none(),
"ResetServer must clear period_estimate"
);
assert_eq!(s.count, 0, "ResetServer must zero count");
assert_eq!(
s.last_id, 999,
"ResetServer must preserve last_id (dedup still works)"
);
let intervals_ms = [20u64, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240];
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for (i, &ms) in intervals_ms.iter().enumerate() {
let s = servers.get_mut(&server).expect("entry");
s.last_seen = Instant::now() - Duration::from_millis(ms);
hdr.cid = 1000 + (i as u32);
handle_beacon(hdr, &mut servers, &tx);
while let Ok(msg) = rx.try_recv() {
if let CoordRequest::ForceRescanServer { kind, .. } = msg {
assert_ne!(
kind,
BeaconAnomalyKind::PeriodCollapse,
"ramp-up beacon #{} (interval={} ms) after \
ResetServer must not classify as PeriodCollapse \
— the cascade is the archiver-rs reconnect noise \
this fix targets",
i + 1,
ms
);
}
}
}
}
#[test]
fn peer_connect_ramp_up_does_not_fire_period_collapse() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
servers.insert(
server,
BeaconState {
last_id: 999,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
let intervals_ms = [20u64, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240];
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for (i, &ms) in intervals_ms.iter().enumerate() {
let s = servers.get_mut(&server).expect("entry");
s.last_seen = Instant::now() - Duration::from_millis(ms);
hdr.cid = 1000 + (i as u32);
handle_beacon(hdr, &mut servers, &tx);
while let Ok(msg) = rx.try_recv() {
if let CoordRequest::ForceRescanServer { kind, .. } = msg {
assert_ne!(
kind,
BeaconAnomalyKind::PeriodCollapse,
"peer-connect ramp-up beacon #{} (interval={} ms) \
must NOT classify as PeriodCollapse — \
the self-reset path in handle_beacon absorbs it",
i + 1,
ms
);
}
}
}
let s = servers.get(&server).expect("entry");
assert_eq!(s.last_id, 1009, "last_id must track ramp-up ids");
assert!(
s.period_estimate.is_some(),
"EMA must be reseeded after the cascade"
);
}
#[test]
fn reset_unknown_server_is_noop() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
apply_reset_server(&mut servers, server);
assert!(servers.is_empty());
}
#[test]
fn reset_matches_inaddr_any_announced_entry() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let inaddr_any: SocketAddr = "0.0.0.0:5064".parse().unwrap();
servers.insert(
inaddr_any,
BeaconState {
last_id: 999,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
apply_reset_server(&mut servers, circuit);
let s = servers.get(&inaddr_any).expect("entry preserved");
assert!(
s.period_estimate.is_none(),
"INADDR_ANY-keyed entry must be reset by port-match"
);
assert_eq!(s.count, 0);
assert_eq!(s.last_id, 999, "last_id preserved across reset");
}
#[test]
fn reset_matches_multihomed_announced_entry() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let nic_a: SocketAddr = "10.0.0.1:5064".parse().unwrap();
servers.insert(
nic_a,
BeaconState {
last_id: 42,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 100,
},
);
let nic_b: SocketAddr = "10.0.0.2:5064".parse().unwrap();
apply_reset_server(&mut servers, nic_b);
let s = servers.get(&nic_a).expect("entry preserved");
assert!(s.period_estimate.is_none());
assert_eq!(s.count, 0);
}
#[test]
fn reset_exact_does_not_cascade_to_inaddr_any_other_ioc() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let target: SocketAddr = "10.0.0.5:5064".parse().unwrap();
let inaddr_any: SocketAddr = "0.0.0.0:5064".parse().unwrap();
servers.insert(
target,
BeaconState {
last_id: 1,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
servers.insert(
inaddr_any,
BeaconState {
last_id: 2,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 500,
},
);
apply_reset_server(&mut servers, target);
let t = servers.get(&target).expect("target preserved");
assert!(t.period_estimate.is_none(), "exact-match target reset");
assert_eq!(t.count, 0);
let i = servers.get(&inaddr_any).expect("inaddr-any preserved");
assert_eq!(
i.period_estimate,
Some(Duration::from_secs(15)),
"INADDR_ANY entry from a different IOC must not be touched \
after an exact-match hit"
);
assert_eq!(i.count, 500);
}
#[test]
fn reset_does_not_blind_other_same_port_ioc() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let target: SocketAddr = "10.0.0.5:5064".parse().unwrap();
let neighbour: SocketAddr = "10.0.0.7:5064".parse().unwrap();
servers.insert(
target,
BeaconState {
last_id: 1000,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 1000,
},
);
servers.insert(
neighbour,
BeaconState {
last_id: 2000,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 5000,
},
);
apply_reset_server(&mut servers, target);
let t = servers.get(&target).expect("target preserved");
assert!(t.period_estimate.is_none(), "exact-match target reset");
assert_eq!(t.count, 0);
let n = servers.get(&neighbour).expect("neighbour preserved");
assert_eq!(
n.period_estimate,
Some(Duration::from_secs(15)),
"unrelated same-port IOC must NOT have its EMA cleared — \
that would disable PeriodCollapse on its next restart"
);
assert_eq!(n.count, 5000, "neighbour count untouched");
}
#[test]
fn reset_skips_when_ambiguous_no_exact_no_inaddr_any() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let a: SocketAddr = "10.0.0.7:5064".parse().unwrap();
let b: SocketAddr = "10.0.0.9:5064".parse().unwrap();
servers.insert(
a,
BeaconState {
last_id: 1,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 100,
},
);
servers.insert(
b,
BeaconState {
last_id: 2,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 200,
},
);
let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
apply_reset_server(&mut servers, circuit);
for key in [a, b] {
let s = servers.get(&key).expect("entry preserved");
assert_eq!(
s.period_estimate,
Some(Duration::from_secs(15)),
"ambiguous fallback must not blind {key}"
);
}
}
#[test]
fn reset_via_inaddr_any_does_not_touch_unrelated_real_ip_entries() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let inaddr_any: SocketAddr = "0.0.0.0:5064".parse().unwrap();
let unrelated: SocketAddr = "10.0.0.7:5064".parse().unwrap();
servers.insert(
inaddr_any,
BeaconState {
last_id: 1,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 100,
},
);
servers.insert(
unrelated,
BeaconState {
last_id: 2,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 200,
},
);
let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
apply_reset_server(&mut servers, circuit);
let i = servers.get(&inaddr_any).expect("inaddr-any preserved");
assert!(i.period_estimate.is_none(), "INADDR_ANY entry reset");
let u = servers.get(&unrelated).expect("unrelated preserved");
assert_eq!(
u.period_estimate,
Some(Duration::from_secs(15)),
"unrelated same-port real-IP entry must not be touched"
);
assert_eq!(u.count, 200);
}
#[test]
fn reset_leaves_other_port_entries_alone() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let other: SocketAddr = "0.0.0.0:5065".parse().unwrap();
servers.insert(
other,
BeaconState {
last_id: 7,
last_seen: Instant::now(),
period_estimate: Some(Duration::from_secs(15)),
count: 50,
},
);
let circuit: SocketAddr = "10.0.0.5:5064".parse().unwrap();
apply_reset_server(&mut servers, circuit);
let s = servers.get(&other).expect("entry preserved");
assert_eq!(
s.period_estimate,
Some(Duration::from_secs(15)),
"different-port entry must not be touched"
);
assert_eq!(s.count, 50);
}
}