use crate::dedup::ConcurrentDedup;
use crate::endpoint_core::EndpointCore;
use crate::error::{Result, RouterError};
use crate::filter::EndpointFilters;
use crate::framing::MavlinkFrame;
use crate::router::{EndpointId, RoutedMessage};
use crate::routing::RoutingTable;
use bytes::Bytes;
use dashmap::DashMap;
use tokio::sync::broadcast::{self, error::RecvError};
use mavlink::MavlinkVersion;
use parking_lot::RwLock;
use std::io::Cursor;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, trace, warn};
#[allow(clippy::too_many_arguments)]
pub async fn run(
id: usize,
address: String,
mode: crate::config::EndpointMode,
bus_tx: broadcast::Sender<RoutedMessage>,
bus_rx: broadcast::Receiver<RoutedMessage>,
routing_table: Arc<RwLock<RoutingTable>>,
dedup: ConcurrentDedup,
filters: EndpointFilters,
token: CancellationToken,
cleanup_ttl_secs: u64,
) -> Result<()> {
let core = EndpointCore {
id: EndpointId(id),
bus_tx: bus_tx.clone(),
routing_table: routing_table.clone(),
dedup: dedup.clone(),
filters: filters.clone(),
update_routing: true,
};
let (bind_addr, target_addr) = if mode == crate::config::EndpointMode::Server {
(address.clone(), None)
} else {
let mut addrs = address.to_socket_addrs().map_err(|e| {
RouterError::config(format!("Invalid remote address '{}': {}", address, e))
})?;
let target = addrs.next().ok_or_else(|| {
RouterError::config(format!("Could not resolve remote address '{}'", address))
})?;
("0.0.0.0:0".to_string(), Some(target))
};
let socket = UdpSocket::bind(&bind_addr)
.await
.map_err(|e| RouterError::network(&bind_addr, e))?;
let r = Arc::new(socket);
let s = r.clone();
let clients: Arc<DashMap<SocketAddr, Instant>> = Arc::new(DashMap::new());
let clients_recv = clients.clone();
let r_socket = r.clone();
let core_rx = core.clone();
let recv_loop = async move {
let mut buf = [0u8; 65535];
loop {
match r_socket.recv_from(&mut buf).await {
Ok((len, addr)) => {
clients_recv.insert(addr, Instant::now());
let mut cursor = Cursor::new(&buf[..len]);
let res = mavlink::read_v2_msg::<mavlink::common::MavMessage, _>(&mut cursor)
.map(|(h, m)| (h, m, MavlinkVersion::V2))
.or_else(|_| {
cursor.set_position(0);
mavlink::read_v1_msg::<mavlink::common::MavMessage, _>(&mut cursor)
.map(|(h, m)| (h, m, MavlinkVersion::V1))
});
if let Ok((header, message, version)) = res {
let packet_len = cursor.position() as usize;
let raw_bytes = Bytes::copy_from_slice(&buf[..packet_len]);
core_rx.handle_incoming_frame(MavlinkFrame {
header,
message,
version,
raw_bytes,
});
} else {
trace!(
"UDP: Failed to parse MAVLink packet from {} ({} bytes)",
addr,
len
);
}
}
Err(e) => {
error!("UDP recv error: {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
}
};
let clients_send = clients.clone();
let s_socket = s.clone();
let mut bus_rx_loop = bus_rx;
let core_tx = core.clone();
let send_loop = async move {
let mut targets: Vec<SocketAddr> = Vec::new();
loop {
match bus_rx_loop.recv().await {
Ok(msg) => {
if !core_tx.check_outgoing(&msg) {
continue;
}
let packet_data = msg.serialized_bytes.clone();
if let Some(target) = target_addr {
if let Err(e) = s_socket.send_to(&packet_data, target).await {
warn!("UDP send error to target: {}", e);
}
} else {
targets.clear();
targets.extend(clients_send.iter().map(|r| *r.key()));
for target in &targets {
if let Err(e) = s_socket.send_to(&packet_data, target).await {
warn!("UDP broadcast error to {}: {}", target, e);
}
}
}
}
Err(RecvError::Lagged(n)) => {
warn!("UDP Sender lagged: missed {} messages", n);
}
Err(RecvError::Closed) => break,
}
}
};
let clients_cleanup = clients.clone();
let cleanup_loop = async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
let now = Instant::now();
let ttl = Duration::from_secs(cleanup_ttl_secs);
let initial_len = clients_cleanup.len();
clients_cleanup.retain(|_addr, last_seen| now.duration_since(*last_seen) < ttl);
let dropped = initial_len - clients_cleanup.len();
if dropped > 0 {
debug!("UDP Cleanup: removed {} stale clients", dropped);
}
}
};
tokio::select! {
_ = recv_loop => Ok(()),
_ = send_loop => Ok(()),
_ = cleanup_loop => Ok(()),
_ = token.cancelled() => Ok(()),
}
}