use std::collections::HashSet;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use tokio::net::UdpSocket;
use crate::protocol::*;
pub async fn run_repeater() -> std::io::Result<()> {
let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, CA_REPEATER_PORT);
let socket = UdpSocket::bind(bind_addr).await?;
let mut clients: HashSet<SocketAddr> = HashSet::new();
let mut buf = [0u8; 4096];
loop {
let (len, src) = socket.recv_from(&mut buf).await?;
if len < CaHeader::SIZE {
continue;
}
let Ok(hdr) = CaHeader::from_bytes(&buf[..len]) else {
continue;
};
match hdr.cmmd {
CA_PROTO_REPEATER_REGISTER => {
clients.insert(src);
let mut confirm = CaHeader::new(CA_PROTO_REPEATER_CONFIRM);
if let SocketAddr::V4(v4) = src {
confirm.available = u32::from_be_bytes(v4.ip().octets());
}
let _ = socket.send_to(&confirm.to_bytes(), src).await;
}
_ => {
let data = &buf[..len];
let mut dead = Vec::new();
for client in &clients {
if *client == src {
continue;
}
if socket.send_to(data, client).await.is_err() {
dead.push(*client);
}
}
for d in dead {
clients.remove(&d);
}
}
}
}
}
pub async fn ensure_repeater() {
if try_register().await.is_ok() {
return;
}
spawn_repeater();
epics_base_rs::runtime::task::sleep(std::time::Duration::from_millis(50)).await;
let _ = try_register().await;
}
async fn try_register() -> Result<(), ()> {
let socket = UdpSocket::bind("0.0.0.0:0").await.map_err(|_| ())?;
let local_ip = match socket.local_addr().ok() {
Some(SocketAddr::V4(v4)) => *v4.ip(),
_ => Ipv4Addr::LOCALHOST,
};
let mut hdr = CaHeader::new(CA_PROTO_REPEATER_REGISTER);
hdr.available = u32::from_be_bytes(local_ip.octets());
let repeater_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, CA_REPEATER_PORT);
socket
.send_to(&hdr.to_bytes(), repeater_addr)
.await
.map_err(|_| ())?;
let mut buf = [0u8; 64];
let result = tokio::time::timeout(std::time::Duration::from_millis(200), async {
loop {
let (len, _) = socket.recv_from(&mut buf).await.map_err(|_| ())?;
if len >= CaHeader::SIZE {
if let Ok(resp) = CaHeader::from_bytes(&buf[..len]) {
if resp.cmmd == CA_PROTO_REPEATER_CONFIRM {
return Ok::<(), ()>(());
}
}
}
}
})
.await;
match result {
Ok(Ok(())) => Ok(()),
_ => Err(()),
}
}
fn spawn_repeater() {
let exe = std::env::current_exe().unwrap_or_default();
let repeater_bin = exe.parent().map(|p| p.join("ca-repeater-rs"));
if let Some(ref bin) = repeater_bin {
if bin.exists() {
use std::process::{Command, Stdio};
if Command::new(bin)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.is_ok()
{
return;
}
}
}
std::thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("repeater runtime");
let _ = rt.block_on(run_repeater());
});
}