use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use bairelay_neolink_core::bcudp::xml::{
D2rHb, D2rR, HbTimer, IpPort, R2cCr, R2cT, R2dC, R2dDcr, R2dHbr, R2dRr, UdpXml,
};
use tokio::net::UdpSocket;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::config::RuntimeConfig;
use crate::packet::{decode_discovery, encode_discovery, random_sid};
use crate::registry::{CameraRegistry, SessionAnchors};
use crate::WakeServerError;
const WAKE_BURST_COUNT: usize = 10;
const WAKE_BURST_INTERVAL: Duration = Duration::from_millis(100);
pub(crate) async fn run(
sock: Arc<UdpSocket>,
registry: Arc<CameraRegistry>,
anchors: Arc<SessionAnchors>,
cfg: RuntimeConfig,
cancel: CancellationToken,
) -> Result<(), WakeServerError> {
let local = sock.local_addr().ok();
info!(?local, "wake-server register listening");
let stale = Duration::from_millis(cfg.stale_after_ms);
let mut buf = vec![0u8; 4096];
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!("wake-server register cancelled");
return Ok(());
}
res = sock.recv_from(&mut buf) => {
let (n, src) = match res {
Ok(p) => p,
Err(e) => { warn!(error = %e, "register recv_from"); continue; }
};
handle(&sock, ®istry, &anchors, &cfg, stale, src, &buf[..n], &cancel).await;
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle(
sock: &Arc<UdpSocket>,
registry: &Arc<CameraRegistry>,
anchors: &Arc<SessionAnchors>,
cfg: &RuntimeConfig,
stale_after: Duration,
src: SocketAddr,
raw: &[u8],
cancel: &CancellationToken,
) {
let (tid, payload) = match decode_discovery(raw) {
Ok(v) => v,
Err(e) => {
debug!(%src, error = %e, "register: bad packet");
return;
}
};
match payload {
UdpXml::D2rHb(hb) => {
handle_heartbeat(sock, registry, anchors, cfg, stale_after, src, tid, hb).await
}
UdpXml::C2rC(c) => {
handle_connect(sock, registry, cfg, stale_after, src, tid, c, cancel).await
}
UdpXml::D2rR(r) => handle_register(sock, anchors, src, tid, r).await,
UdpXml::D2rDisc(disc) => {
debug!(%src, sid = disc.sid, "D2R_DISC");
let reply = UdpXml::R2dDcr(R2dDcr {
sid: disc.sid,
rsp: 0,
});
send_reply(sock, src, tid, reply, "register: encode R2D_DC_R").await;
}
UdpXml::D2rCr(cr) => {
debug!(%src, sid = cr.sid, rsp = cr.rsp, "D2R_C_R (camera awake)");
}
UdpXml::C2rCfm(cfm) => {
debug!(%src, sid = cfm.sid, conn = %cfm.conn, "C2R_CFM");
}
other => debug!(%src, ?other, "register: unhandled payload"),
}
}
async fn handle_register(
sock: &UdpSocket,
anchors: &Arc<SessionAnchors>,
src: SocketAddr,
tid: u32,
r: D2rR,
) {
let now = std::time::Instant::now();
let lookup_ttl = Duration::from_secs(60 * 60);
let ac = match anchors.lookup(&r.uid, now, lookup_ttl) {
Some(a) if a.token == r.token => a.ac,
Some(a) => {
debug!(
%src, uid = %r.uid,
expected_token = a.token, got_token = r.token,
"D2R_R token mismatch; dropping packet"
);
return;
}
None => {
debug!(%src, uid = %r.uid, "D2R_R for unseen UID; dropping packet");
return;
}
};
debug!(%src, uid = %r.uid, ac, "D2R_R");
let reply = UdpXml::R2dRr(R2dRr { rsp: -4, ac });
send_reply(sock, src, tid, reply, "register: encode R2D_R_R").await;
}
#[allow(clippy::too_many_arguments)]
async fn handle_connect(
sock: &Arc<UdpSocket>,
registry: &CameraRegistry,
cfg: &RuntimeConfig,
stale_after: Duration,
src: SocketAddr,
tid: u32,
c: bairelay_neolink_core::bcudp::xml::C2rC,
cancel: &CancellationToken,
) {
debug!(%src, uid = %c.uid, cid = c.cid, "C2R_C");
let now = tokio::time::Instant::now().into_std();
let cam = registry.lookup_fresh(&c.uid, now, stale_after);
match cam {
None => {
debug!(%src, uid = %c.uid, "C2R_C for unknown or stale UID");
let reply = UdpXml::R2cCr(R2cCr {
dev: None,
dmap: None,
relay: None,
relayt: None,
nat: "NULL".into(),
sid: None,
rsp: -1,
ac: 0,
});
send_reply(sock, src, tid, reply, "register: encode R2C_C_R(unknown)").await;
}
Some(entry) => {
let sid = random_sid();
let burst_sock = Arc::clone(sock);
let cam_addr = entry.addr;
let cli_src_ip = src.ip().to_string();
if c.cli.ip != cli_src_ip {
warn!(
%src, claimed_cli_ip = %c.cli.ip, actual_src_ip = %cli_src_ip,
"C2R_C cli.ip mismatch with UDP source; substituting actual source IP",
);
}
let cli = IpPort {
ip: cli_src_ip.clone(),
port: c.cli.port,
};
let cmap = IpPort {
ip: cli_src_ip,
port: src.port(),
};
let relay_ip_for_camera = crate::route::advertise_ip(cfg.bind, cam_addr).to_string();
let relay_ip_for_client = crate::route::advertise_ip(cfg.bind, src).to_string();
let relay = IpPort {
ip: relay_ip_for_camera,
port: cfg.register_port,
};
let cid = c.cid;
let cancel = cancel.clone();
tokio::spawn(async move {
let burst = tokio::spawn(async move {
for i in 0..WAKE_BURST_COUNT {
let wake = UdpXml::R2dC(R2dC {
cli: cli.clone(),
cmap: cmap.clone(),
relay: relay.clone(),
sid,
cid,
});
match encode_discovery(tid, wake) {
Ok(bytes) => {
if let Err(e) = burst_sock.send_to(&bytes, cam_addr).await {
warn!(%cam_addr, i, error = %e, "wake burst send failed; aborting");
break;
}
debug!(%cam_addr, i, "R2D_C ->");
}
Err(e) => {
warn!(error = %e, "wake burst encode failed");
break;
}
}
if i + 1 < WAKE_BURST_COUNT {
tokio::select! {
_ = sleep(WAKE_BURST_INTERVAL) => {}
_ = cancel.cancelled() => {
debug!(%cam_addr, "wake burst cancelled mid-flight");
break;
}
}
}
}
});
if let Err(e) = burst.await {
if e.is_panic() {
warn!(
%cam_addr, sid, cid,
"wake burst task panicked; camera will not receive R2D_C — operator should check logs",
);
}
}
});
let cam_ip = entry.addr.ip().to_string();
let cam_port = entry.addr.port();
let reply = UdpXml::R2cCr(R2cCr {
dev: Some(IpPort {
ip: cam_ip.clone(),
port: cam_port,
}),
dmap: Some(IpPort {
ip: cam_ip.clone(),
port: cam_port,
}),
relay: Some(IpPort {
ip: relay_ip_for_client,
port: cfg.register_port,
}),
relayt: None,
nat: "NULL".into(),
sid: Some(sid),
rsp: 0,
ac: 0,
});
send_reply(sock, src, tid, reply, "register: encode R2C_C_R").await;
let r2t = UdpXml::R2cT(R2cT {
dev: Some(IpPort {
ip: cam_ip.clone(),
port: cam_port,
}),
dmap: Some(IpPort {
ip: cam_ip,
port: cam_port,
}),
sid,
cid: c.cid,
});
send_reply(sock, src, tid, r2t, "register: encode R2C_T").await;
}
}
}
async fn send_reply(
sock: &UdpSocket,
dst: SocketAddr,
tid: u32,
payload: UdpXml,
err_ctx: &'static str,
) {
match encode_discovery(tid, payload) {
Ok(bytes) => {
if let Err(e) = sock.send_to(&bytes, dst).await {
warn!(%dst, error = %e, "register: send_to");
} else {
debug!(%dst, "register -> reply");
}
}
Err(e) => warn!(error = %e, "{err_ctx}"),
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_heartbeat(
sock: &UdpSocket,
registry: &CameraRegistry,
anchors: &Arc<SessionAnchors>,
cfg: &RuntimeConfig,
stale_after: Duration,
src: SocketAddr,
tid: u32,
hb: D2rHb,
) {
debug!(%src, uid = %hb.uid, token = hb.token, "D2R_HB");
let now = tokio::time::Instant::now().into_std();
match registry.upsert(&hb.uid, src, hb.token, now) {
Some(true) => info!(%src, uid = %hb.uid, "wake-server registered camera"),
Some(false) => {} None => {
warn!(
%src, uid = %hb.uid,
cap = crate::registry::MAX_MAP_ENTRIES,
"camera registry at capacity; rejecting D2R_HB"
);
return;
}
}
for evicted in registry.purge_stale(now, stale_after) {
info!(uid = %evicted, "wake-server deregistered stale camera");
}
for evicted in anchors.purge_stale(now, stale_after) {
debug!(uid = %evicted, "wake-server purged stale session anchor");
}
if hb.needrsp == Some(1) {
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let reply = UdpXml::R2dHbr(R2dHbr {
rsp: 0,
time_t: now_secs,
timer: HbTimer {
hb: cfg.heartbeat_ms,
},
});
match encode_discovery(tid, reply) {
Ok(bytes) => {
if let Err(e) = sock.send_to(&bytes, src).await {
warn!(%src, error = %e, "register: send R2D_HB_R");
} else {
debug!(%src, "register -> R2D_HB_R");
}
}
Err(e) => warn!(%src, error = %e, "register: encode R2D_HB_R"),
}
}
}