use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use bairelay_neolink_core::bcudp::xml::{EmptyTag, IpPort, M2cQr, M2dQr, UdpXml};
use tokio::net::UdpSocket;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use crate::packet::{decode_discovery, encode_discovery};
use crate::registry::SessionAnchors;
use crate::route::advertise_ip;
use crate::WakeServerError;
pub(crate) async fn run(
sock: Arc<UdpSocket>,
register_addr: SocketAddr,
bind: IpAddr,
anchors: Arc<SessionAnchors>,
cancel: CancellationToken,
) -> Result<(), WakeServerError> {
let local = sock.local_addr().ok();
tracing::info!(?local, register = %register_addr, %bind, "wake-server middleman listening");
let mut buf = vec![0u8; 4096];
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("wake-server middleman cancelled");
return Ok(());
}
res = sock.recv_from(&mut buf) => {
let (n, src) = match res {
Ok(p) => p,
Err(e) => { warn!(error = %e, "middleman recv_from"); continue; }
};
handle(&sock, register_addr, bind, &anchors, src, &buf[..n]).await;
}
}
}
}
async fn handle(
sock: &UdpSocket,
register_addr: SocketAddr,
bind: IpAddr,
anchors: &SessionAnchors,
src: SocketAddr,
raw: &[u8],
) {
let (tid, payload) = match decode_discovery(raw) {
Ok(v) => v,
Err(e) => {
debug!(%src, error = %e, "middleman: bad packet");
return;
}
};
let advertise = advertise_ip(bind, src).to_string();
let reg_port = register_addr.port();
match payload {
UdpXml::C2mQ(q) => {
debug!(%src, uid = %q.uid, advertise = %advertise, "C2M_Q");
let reply = UdpXml::M2cQr(M2cQr {
reg: Some(IpPort {
ip: advertise.clone(),
port: reg_port,
}),
relay: Some(IpPort {
ip: advertise.clone(),
port: reg_port,
}),
log: Some(IpPort {
ip: advertise.clone(),
port: reg_port,
}),
t: Some(IpPort {
ip: advertise,
port: reg_port,
}),
});
match encode_discovery(tid, reply) {
Ok(bytes) => {
if let Err(e) = sock.send_to(&bytes, src).await {
warn!(%src, error = %e, "middleman: send M2C_Q_R");
} else {
debug!(%src, "middleman -> M2C_Q_R");
}
}
Err(e) => warn!(%src, error = %e, "middleman: encode M2C_Q_R"),
}
}
UdpXml::D2mQ(q) => {
debug!(%src, uid = %q.uid, revision = ?q.revision, advertise = %advertise, "D2M_Q");
let token = rand::random::<u64>();
let ac = rand::random::<u32>();
if !anchors.issue(&q.uid, token, ac, std::time::Instant::now()) {
warn!(
%src, uid = %q.uid,
cap = crate::registry::MAX_MAP_ENTRIES,
"session-anchor map at capacity; rejecting D2M_Q"
);
return;
}
let reply = UdpXml::M2dQr(M2dQr {
reg: IpPort {
ip: advertise.clone(),
port: reg_port,
},
log: IpPort {
ip: advertise,
port: reg_port,
},
timer: EmptyTag::default(),
retry: EmptyTag::default(),
rsp: 0,
token,
ac,
});
match encode_discovery(tid, reply) {
Ok(bytes) => {
if let Err(e) = sock.send_to(&bytes, src).await {
warn!(%src, error = %e, "middleman: send M2D_Q_R");
} else {
debug!(%src, "middleman -> M2D_Q_R");
}
}
Err(e) => warn!(%src, error = %e, "middleman: encode M2D_Q_R"),
}
}
other => debug!(%src, ?other, "middleman: unexpected payload"),
}
}