use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::time::{Duration, Instant};
use epics_base_rs::net::AsyncUdpV4;
use epics_base_rs::runtime::sync::mpsc;
use crate::protocol::*;
use super::CoordRequest;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BeaconAnomalyKind {
FirstSighting,
IdMismatch,
PeriodCollapse,
}
struct BeaconState {
last_id: u32,
last_seen: Instant,
period_estimate: Duration,
count: u64,
}
const BEACON_STALE_THRESHOLD: Duration = Duration::from_secs(180);
const REREGISTER_INTERVAL: Duration = Duration::from_secs(300);
pub(crate) async fn run_beacon_monitor(coord_tx: mpsc::UnboundedSender<CoordRequest>) {
run_beacon_monitor_inner(
coord_tx,
#[cfg(feature = "cap-tokens")]
None,
)
.await;
}
#[cfg(feature = "cap-tokens")]
#[allow(dead_code)]
pub(crate) async fn run_beacon_monitor_with_verifier(
coord_tx: mpsc::UnboundedSender<CoordRequest>,
verifier: std::sync::Arc<crate::server::signed_beacon::SignedBeaconVerifier>,
) {
run_beacon_monitor_inner(coord_tx, Some(verifier)).await;
}
async fn run_beacon_monitor_inner(
coord_tx: mpsc::UnboundedSender<CoordRequest>,
#[cfg(feature = "cap-tokens")] verifier: Option<
std::sync::Arc<crate::server::signed_beacon::SignedBeaconVerifier>,
>,
) {
let socket = match AsyncUdpV4::bind_single(Ipv4Addr::LOCALHOST, 0, false) {
Ok(s) => s,
Err(_) => return,
};
for attempt in 0..3u32 {
if register_with_repeater(&socket).await.is_ok() {
break;
}
if attempt < 2 {
tokio::time::sleep(Duration::from_millis(200 * (1 << attempt))).await;
}
}
#[cfg(feature = "cap-tokens")]
let mut verified_tuples: HashMap<(u32, u16, u32), std::time::Instant> = HashMap::new();
#[cfg(feature = "cap-tokens")]
let require_signed = !matches!(
epics_base_rs::runtime::env::get("EPICS_CA_BEACON_REQUIRE_SIGNED").as_deref(),
Some("NO" | "no" | "0" | "false" | "FALSE")
);
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let mut buf = [0u8; 4096];
loop {
let recv = tokio::time::timeout(REREGISTER_INTERVAL, socket.recv_from(&mut buf)).await;
let (len, _src) = match recv {
Ok(Ok(v)) => v,
Ok(Err(_)) => continue,
Err(_) => {
let _ = register_with_repeater(&socket).await;
continue;
}
};
if len < CaHeader::SIZE {
continue;
}
let mut offset = 0;
while offset + CaHeader::SIZE <= len {
let Ok(hdr) = CaHeader::from_bytes(&buf[offset..len]) else {
break;
};
let payload_padded = ((hdr.postsize as usize) + 7) & !7;
let frame_len = (CaHeader::SIZE + payload_padded).max(CaHeader::SIZE);
if offset.saturating_add(frame_len) > len {
break;
}
#[cfg_attr(not(feature = "cap-tokens"), allow(unused_variables))]
let frame_start = offset;
offset += frame_len;
#[cfg(feature = "cap-tokens")]
if hdr.cmmd == crate::server::signed_beacon::CA_PROTO_RSRV_BEACON_SIG {
if let Some(ref v) = verifier {
let frame = &buf[frame_start..frame_start + frame_len];
let src_ip = match _src.ip() {
std::net::IpAddr::V4(v) => v,
std::net::IpAddr::V6(_) => {
metrics::counter!("ca_client_signed_beacon_failures_total")
.increment(1);
continue;
}
};
match v.verify(frame) {
Ok((ip, port, beacon_id)) if Ipv4Addr::from(ip) != src_ip => {
tracing::debug!(
announced = %Ipv4Addr::from(ip),
actual = %src_ip,
port, beacon_id,
"signed beacon source-IP mismatch (G3)"
);
metrics::counter!("ca_client_signed_beacon_source_ip_mismatch_total")
.increment(1);
}
Ok((ip, port, beacon_id)) => {
const MAX_VERIFIED_TUPLES: usize = 8192;
if verified_tuples.len() >= MAX_VERIFIED_TUPLES {
let max_age = std::time::Duration::from_secs(v.max_age_secs.max(1));
let now = std::time::Instant::now();
verified_tuples.retain(|_, t| now.duration_since(*t) <= max_age);
}
verified_tuples
.insert((ip, port, beacon_id), std::time::Instant::now());
metrics::counter!("ca_client_signed_beacon_verified_total")
.increment(1);
}
Err(e) => {
tracing::debug!(error = ?e,
"signed beacon companion failed verification");
metrics::counter!("ca_client_signed_beacon_failures_total")
.increment(1);
}
}
}
continue;
}
if hdr.cmmd != CA_PROTO_RSRV_IS_UP {
continue;
}
#[cfg(feature = "cap-tokens")]
if let Some(ref v) = verifier {
let max_age = std::time::Duration::from_secs(v.max_age_secs.max(1));
let now = std::time::Instant::now();
verified_tuples.retain(|_, t| now.duration_since(*t) <= max_age);
let key = (hdr.available, hdr.count, hdr.cid);
if !verified_tuples.contains_key(&key) {
metrics::counter!("ca_client_unsigned_beacon_drops_total").increment(1);
if require_signed {
continue;
}
}
}
handle_beacon(hdr, &mut servers, &coord_tx);
}
}
}
fn handle_beacon(
hdr: CaHeader,
servers: &mut HashMap<SocketAddr, BeaconState>,
coord_tx: &mpsc::UnboundedSender<CoordRequest>,
) {
let server_port = if hdr.count != 0 {
hdr.count
} else {
CA_SERVER_PORT
};
let beacon_id = hdr.cid;
let server_ip = Ipv4Addr::from(hdr.available.to_be_bytes());
let server_addr = SocketAddr::V4(SocketAddrV4::new(server_ip, server_port));
let now = Instant::now();
servers.retain(|_, s| now.duration_since(s.last_seen) < BEACON_STALE_THRESHOLD);
const MAX_BEACON_SERVERS: usize = 4096;
let first_sighting = !servers.contains_key(&server_addr);
if first_sighting && servers.len() >= MAX_BEACON_SERVERS {
let cutoff_threshold = Duration::from_secs(15 * 5);
servers.retain(|_, s| now.duration_since(s.last_seen) < cutoff_threshold);
}
let entry = servers.entry(server_addr).or_insert_with(|| BeaconState {
last_id: beacon_id.wrapping_sub(1),
last_seen: now,
period_estimate: Duration::from_secs(15),
count: 0,
});
let actual_interval = now.duration_since(entry.last_seen);
let expected_next_id = entry.last_id.wrapping_add(1);
if !first_sighting && beacon_id == entry.last_id {
return;
}
const BACKWARDS_DUP_WINDOW: u32 = 4;
if !first_sighting {
let advance = beacon_id.wrapping_sub(entry.last_id);
let backwards_dup = advance > u32::MAX - BACKWARDS_DUP_WINDOW;
let small_forward_dup = advance == 2 || advance == 3;
if backwards_dup || small_forward_dup {
entry.last_id = beacon_id;
return;
}
}
const MIN_PERIOD_COLLAPSE_INTERVAL: Duration = Duration::from_millis(50);
let anomaly_kind = if first_sighting {
Some(BeaconAnomalyKind::FirstSighting)
} else if beacon_id != expected_next_id {
Some(BeaconAnomalyKind::IdMismatch)
} else if entry.count > 3
&& actual_interval > MIN_PERIOD_COLLAPSE_INTERVAL
&& actual_interval < entry.period_estimate / 3
{
Some(BeaconAnomalyKind::PeriodCollapse)
} else {
None
};
entry.last_id = beacon_id;
entry.last_seen = now;
entry.count += 1;
if entry.count > 1 {
let alpha = 0.25;
let new_estimate = Duration::from_secs_f64(
entry.period_estimate.as_secs_f64() * (1.0 - alpha)
+ actual_interval.as_secs_f64() * alpha,
);
entry.period_estimate = new_estimate;
}
if let Some(kind) = anomaly_kind {
let _ = coord_tx.send(CoordRequest::ForceRescanServer { server_addr, kind });
}
let arrival_anomaly = match anomaly_kind {
None => Some(false),
Some(BeaconAnomalyKind::IdMismatch | BeaconAnomalyKind::PeriodCollapse) => Some(true),
Some(BeaconAnomalyKind::FirstSighting) => None,
};
if let Some(anomaly) = arrival_anomaly {
let _ = coord_tx.send(CoordRequest::BeaconArrival {
server_addr,
anomaly,
});
}
}
async fn register_with_repeater(socket: &AsyncUdpV4) -> Result<(), ()> {
let local_ip = socket
.local_addrs()
.into_iter()
.find_map(|sa| match sa {
SocketAddr::V4(v4) => Some(*v4.ip()),
_ => None,
})
.unwrap_or(Ipv4Addr::LOCALHOST);
let mut hdr = CaHeader::new(CA_PROTO_REPEATER_REGISTER);
hdr.available = u32::from_be_bytes(local_ip.octets());
let repeater_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, CA_REPEATER_PORT));
socket
.send_to(&hdr.to_bytes(), repeater_addr)
.await
.map_err(|_| ())?;
let mut buf = [0u8; 64];
let result = tokio::time::timeout(Duration::from_millis(500), async {
loop {
let (len, _) = socket.recv_from(&mut buf).await.map_err(|_| ())?;
if len >= CaHeader::SIZE {
if let Ok(resp) = CaHeader::from_bytes(&buf[..len]) {
if resp.cmmd == CA_PROTO_REPEATER_CONFIRM {
return Ok::<(), ()>(());
}
}
}
}
})
.await;
match result {
Ok(Ok(())) => Ok(()),
_ => Err(()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[test]
fn beacon_stale_threshold_is_180s() {
assert_eq!(BEACON_STALE_THRESHOLD, Duration::from_secs(180));
}
#[test]
fn duplicate_beacon_does_not_double_fire_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx);
assert!(matches!(
rx.try_recv(),
Ok(CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::FirstSighting,
..
})
));
assert!(rx.try_recv().is_err());
handle_beacon(hdr, &mut servers, &tx);
assert!(
rx.try_recv().is_err(),
"duplicate same-cid beacon must not fire ForceRescanServer"
);
}
#[test]
fn sub_50ms_restart_via_id_mismatch_still_fires_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx);
assert!(rx.try_recv().is_ok());
hdr.cid = 1;
handle_beacon(hdr, &mut servers, &tx);
assert!(
matches!(
rx.try_recv(),
Ok(CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::IdMismatch,
..
})
),
"id-mismatch restart must fire IdMismatch anomaly even when interval < 50ms"
);
}
#[test]
fn period_collapse_classifies_as_period_collapse() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
servers.insert(
server,
BeaconState {
last_id: 99,
last_seen: Instant::now() - Duration::from_millis(200),
period_estimate: Duration::from_secs(15),
count: 10,
},
);
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
hdr.cid = 100; handle_beacon(hdr, &mut servers, &tx);
assert!(
matches!(
rx.try_recv(),
Ok(CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::PeriodCollapse,
..
})
),
"monotonic-id, sub-period interval must classify as PeriodCollapse"
);
}
#[test]
fn fast_cadence_monotonic_ids_does_not_fire_spurious_anomaly() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for id in 100..105 {
hdr.cid = id;
handle_beacon(hdr, &mut servers, &tx);
}
let mut search_wakes = 0;
let mut healthy_arrivals = 0;
let mut anomaly_arrivals = 0;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::ForceRescanServer { .. } => search_wakes += 1,
CoordRequest::BeaconArrival { anomaly: false, .. } => healthy_arrivals += 1,
CoordRequest::BeaconArrival { anomaly: true, .. } => anomaly_arrivals += 1,
_ => {}
}
}
assert_eq!(
search_wakes, 1,
"monotonic fast-cadence beacons must wake searches only on first sighting"
);
assert_eq!(
anomaly_arrivals, 0,
"monotonic fast-cadence beacons must not flag the watchdog"
);
assert_eq!(
healthy_arrivals, 4,
"each post-first-sighting healthy beacon must refresh the transport watchdog"
);
}
#[test]
fn id_mismatch_emits_anomaly_beacon_arrival() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx);
while rx.try_recv().is_ok() {}
hdr.cid = 1;
handle_beacon(hdr, &mut servers, &tx);
let mut saw_search_wake = false;
let mut saw_anomaly_arrival = false;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::IdMismatch,
..
} => saw_search_wake = true,
CoordRequest::BeaconArrival { anomaly: true, .. } => saw_anomaly_arrival = true,
_ => {}
}
}
assert!(saw_search_wake, "IdMismatch must wake searches");
assert!(
saw_anomaly_arrival,
"IdMismatch must flag the transport watchdog"
);
}
#[test]
fn first_sighting_does_not_emit_beacon_arrival() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.cid = 100;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
handle_beacon(hdr, &mut servers, &tx);
let mut saw_first_sighting = false;
let mut saw_arrival = false;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::ForceRescanServer {
kind: BeaconAnomalyKind::FirstSighting,
..
} => saw_first_sighting = true,
CoordRequest::BeaconArrival { .. } => saw_arrival = true,
_ => {}
}
}
assert!(saw_first_sighting, "first sighting must wake searches");
assert!(
!saw_arrival,
"first sighting must not touch the transport watchdog"
);
}
#[test]
fn small_forward_advance_is_dropped_not_classified() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for id in 100..103 {
hdr.cid = id;
handle_beacon(hdr, &mut servers, &tx);
}
while rx.try_recv().is_ok() {}
hdr.cid = 104;
handle_beacon(hdr, &mut servers, &tx);
assert!(
rx.try_recv().is_err(),
"advance=2 must be silently dropped, not classified as anomaly"
);
hdr.cid = 107;
handle_beacon(hdr, &mut servers, &tx);
assert!(rx.try_recv().is_err(), "advance=3 must be silently dropped");
hdr.cid = 108;
handle_beacon(hdr, &mut servers, &tx);
let mut saw_arrival_healthy = false;
let mut saw_anomaly = false;
while let Ok(msg) = rx.try_recv() {
match msg {
CoordRequest::BeaconArrival { anomaly: false, .. } => saw_arrival_healthy = true,
CoordRequest::BeaconArrival { anomaly: true, .. }
| CoordRequest::ForceRescanServer { .. } => saw_anomaly = true,
_ => {}
}
}
assert!(
saw_arrival_healthy,
"after dropped dups, advance=1 must classify as healthy"
);
assert!(
!saw_anomaly,
"monotonic recovery from drop sequence must not fire anomaly"
);
}
#[test]
fn small_backwards_advance_is_dropped() {
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let (tx, mut rx) = mpsc::unbounded_channel::<CoordRequest>();
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.count = 5064;
hdr.available = u32::from_be_bytes([127, 0, 0, 1]);
for id in 100..103 {
hdr.cid = id;
handle_beacon(hdr, &mut servers, &tx);
}
while rx.try_recv().is_ok() {}
hdr.cid = 101;
handle_beacon(hdr, &mut servers, &tx);
assert!(
rx.try_recv().is_err(),
"backwards-by-1 (within 256) must drop"
);
}
#[test]
fn stale_prune_drops_idle_entries_only() {
let now = Instant::now();
let mut servers: HashMap<SocketAddr, BeaconState> = HashMap::new();
let fresh: SocketAddr = "127.0.0.1:5064".parse().unwrap();
let stale: SocketAddr = "127.0.0.1:5065".parse().unwrap();
servers.insert(
fresh,
BeaconState {
last_id: 0,
last_seen: now - Duration::from_secs(10),
period_estimate: Duration::from_secs(15),
count: 5,
},
);
servers.insert(
stale,
BeaconState {
last_id: 0,
last_seen: now - Duration::from_secs(300),
period_estimate: Duration::from_secs(15),
count: 5,
},
);
servers.retain(|_, s| now.duration_since(s.last_seen) < BEACON_STALE_THRESHOLD);
assert!(
servers.contains_key(&fresh),
"fresh entry must survive prune"
);
assert!(
!servers.contains_key(&stale),
"180-s-idle entry must be pruned"
);
}
}