use std::collections::HashMap;
#[cfg(feature = "cap-tokens")]
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::Notify;
use crate::protocol::*;
use epics_base_rs::error::CaResult;
pub async fn run_beacon_emitter(
server_port: u16,
beacon_addrs: Vec<SocketAddr>,
max_period: Duration,
reset: Arc<Notify>,
#[cfg(feature = "cap-tokens")] signer: Option<
Arc<crate::server::signed_beacon::SignedBeaconEmitter>,
>,
) -> CaResult<()> {
let socket = UdpSocket::bind("0.0.0.0:0").await?;
socket.set_broadcast(true)?;
let _ = socket.set_multicast_ttl_v4(epics_base_rs::runtime::net::ca_mcast_ttl());
let _ = socket.set_multicast_loop_v4(true);
#[cfg(feature = "cap-tokens")]
let server_ip: u32 = if signer.is_some() {
let probe_dest = beacon_addrs
.first()
.copied()
.unwrap_or(SocketAddr::from((Ipv4Addr::BROADCAST, CA_REPEATER_PORT)));
let probe = std::net::UdpSocket::bind("0.0.0.0:0").ok();
probe
.and_then(|s| {
s.connect(probe_dest).ok()?;
match s.local_addr().ok()? {
SocketAddr::V4(a) if !a.ip().is_unspecified() => {
Some(u32::from_be_bytes(a.ip().octets()))
}
_ => None,
}
})
.unwrap_or(0)
} else {
0
};
let mut beacon_id: u32 = 0;
let mut beacon_counter: u32 = 0;
let initial_interval = Duration::from_millis(20);
let max_interval = max_period.max(initial_interval);
let mut interval = initial_interval;
let mut send_errors: HashMap<SocketAddr, std::io::ErrorKind> = HashMap::new();
if beacon_addrs.is_empty() {
loop {
reset.notified().await;
}
}
loop {
let mut hdr = CaHeader::new(CA_PROTO_RSRV_IS_UP);
hdr.data_type = CA_MINOR_VERSION;
hdr.count = server_port;
hdr.cid = beacon_id;
let bytes = hdr.to_bytes();
for addr in &beacon_addrs {
match socket.send_to(&bytes, addr).await {
Ok(_) => {
if let Some(prev) = send_errors.remove(addr) {
tracing::info!(
target: "epics_ca_rs::beacon",
%addr, prev_error = ?prev,
"CA beacon send recovered"
);
}
}
Err(e) => {
let kind = e.kind();
if send_errors.insert(*addr, kind) != Some(kind) {
tracing::warn!(
target: "epics_ca_rs::beacon",
%addr, error = %e,
"CA beacon send failed"
);
}
}
}
}
#[cfg(feature = "cap-tokens")]
if let Some(ref s) = signer {
s.emit(server_ip, server_port, beacon_id).await;
}
tokio::select! {
() = epics_base_rs::runtime::task::sleep(interval) => {
if interval < max_interval {
interval = (interval * 2).min(max_interval);
}
}
() = reset.notified() => {
interval = initial_interval;
}
}
beacon_id = beacon_counter;
beacon_counter = beacon_counter.wrapping_add(1);
}
}