use crate::error::{Result, RsCtrlError};
use crate::time_sync::TimeSynchronizer;
use serde::{Deserialize, Serialize};
use socket2::{Domain, Protocol, Socket, Type};
use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::{debug, info, warn};
const MULTICAST_ADDR: &str = "224.0.0.100";
const DISCOVERY_PORT: u16 = 9999;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Heartbeat {
pub node_id: String,
pub host: String,
pub port: u16,
pub timestamp: u64,
pub clock_time_ms: u64,
pub is_master: bool,
}
#[derive(Debug, Clone)]
pub struct ServiceRegistry {
nodes: Arc<RwLock<HashMap<String, (String, u16, u64)>>>,
running: Arc<AtomicBool>,
}
impl Default for ServiceRegistry {
fn default() -> Self {
Self {
nodes: Arc::default(),
running: Arc::new(AtomicBool::new(true)),
}
}
}
impl ServiceRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn shutdown(&self) {
self.running.store(false, Ordering::SeqCst);
info!("📡 Discovery shutdown signaled");
}
pub fn register(&self, hb: &Heartbeat) {
if let Ok(mut map) = self.nodes.write() {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
map.insert(hb.node_id.clone(), (hb.host.clone(), hb.port, now_ms));
debug!("📡 Discovered: {} @ {}:{}", hb.node_id, hb.host, hb.port);
} else {
warn!("ServiceRegistry register poisoned, skipping update");
}
}
pub fn get_address(&self, node_id: &str) -> Option<(String, u16)> {
self.nodes
.read()
.ok()
.and_then(|map| map.get(node_id).map(|(h, p, _)| (h.clone(), *p)))
}
pub fn cleanup(&self, timeout_secs: u64) {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let timeout_ms = timeout_secs.saturating_mul(1000);
if let Ok(mut map) = self.nodes.write() {
map.retain(|_, (_, _, ts)| now_ms.saturating_sub(*ts) < timeout_ms);
}
}
}
pub fn start_discovery(
my_id: &str,
my_host: &str,
my_port: u16,
is_master: bool,
time_sync: Option<Arc<TimeSynchronizer>>,
) -> Result<ServiceRegistry> {
let registry = ServiceRegistry::new();
let registry_clone = registry.clone();
let my_id = my_id.to_string();
let my_host = my_host.to_string();
let time_sync_clone = time_sync.clone();
let my_id_for_sender = my_id.clone();
let my_id_for_receiver = my_id.clone();
let addr: SocketAddr = format!("0.0.0.0:{}", DISCOVERY_PORT).parse().unwrap();
let socket2 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
.map_err(|e| RsCtrlError::Discovery(format!("Create discovery socket failed: {e}")))?;
socket2
.set_reuse_address(true)
.map_err(|e| RsCtrlError::Discovery(format!("set_reuse_address failed: {e}")))?;
socket2
.bind(&addr.into())
.map_err(|e| RsCtrlError::Discovery(format!("Bind discovery socket failed: {e}")))?;
let socket: UdpSocket = socket2.into();
let multicast_ip = Ipv4Addr::new(224, 0, 0, 100);
socket.join_multicast_v4(&multicast_ip, &Ipv4Addr::UNSPECIFIED)?;
socket.set_nonblocking(true)?;
let send_socket = socket.try_clone()?;
let broadcast_addr: SocketAddr = format!("{}:{}", MULTICAST_ADDR, DISCOVERY_PORT)
.parse()
.map_err(|e| RsCtrlError::Discovery(format!("Invalid discovery address: {e}")))?;
let sender_running = Arc::clone(®istry.running);
thread::spawn(move || {
let interval = Duration::from_secs(1);
while sender_running.load(Ordering::SeqCst) {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let hb = Heartbeat {
node_id: my_id_for_sender.clone(),
host: my_host.clone(),
port: my_port,
timestamp: now_ms,
clock_time_ms: now_ms,
is_master,
};
match serde_json::to_string(&hb) {
Ok(json) => {
if let Err(e) = send_socket.send_to(json.as_bytes(), broadcast_addr) {
warn!("Heartbeat send failed: {e}");
}
}
Err(e) => warn!("Heartbeat serialization failed: {e}"),
}
thread::sleep(interval);
}
});
let receiver_running = Arc::clone(®istry.running);
thread::spawn(move || {
let mut buf = [0u8; 1024];
while receiver_running.load(Ordering::SeqCst) {
match socket.recv_from(&mut buf) {
Ok((len, _addr)) => {
if let Ok(hb_str) = std::str::from_utf8(&buf[..len]) {
if let Ok(hb) = serde_json::from_str::<Heartbeat>(hb_str) {
if hb.node_id != my_id_for_receiver {
registry_clone.register(&hb);
if hb.is_master {
if let Some(ref sync) = time_sync_clone {
sync.update_from_master(&hb.node_id, hb.clock_time_ms);
}
}
}
}
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(100));
}
Err(e) => {
warn!("UDP recv error: {}", e);
thread::sleep(Duration::from_secs(1));
}
}
registry_clone.cleanup(10);
}
});
info!(
"📡 Discovery started (ID: {}, Master: {})",
my_id, is_master
);
Ok(registry)
}