mod binding;
pub mod http;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use tokio::sync::{broadcast, RwLock};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
pub use binding::ActiveBinding;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
pub struct UdpDatagram {
pub binding_id: String,
pub src: String,
pub payload: String,
pub received_at: DateTime<Utc>,
}
#[derive(Debug, Clone, serde::Deserialize, utoipa::ToSchema)]
pub struct UdpSendRequest {
pub dest: String,
pub payload: String,
}
#[derive(Debug, Clone, serde::Deserialize, utoipa::ToSchema)]
pub struct UdpBindRequest {
#[serde(default)]
pub port: u16,
#[serde(default = "default_bind_addr")]
pub addr: String,
#[serde(default = "default_lease")]
pub lease_secs: u64,
}
fn default_bind_addr() -> String {
"0.0.0.0".to_string()
}
fn default_lease() -> u64 {
300
}
const MAX_LEASE_SECS: u64 = 86400;
#[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)]
pub struct BindingInfo {
pub id: String,
pub local_addr: String,
pub created_at: DateTime<Utc>,
pub last_heartbeat: DateTime<Utc>,
pub lease_secs: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum UdpError {
#[error("binding not found: {0}")]
NotFound(String),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("invalid address: {0}")]
InvalidAddr(String),
#[error("base64 decode error: {0}")]
Base64(#[from] base64::DecodeError),
}
pub struct UdpRuntime {
bindings: Arc<RwLock<HashMap<String, ActiveBinding>>>,
cancel: CancellationToken,
_reaper_handle: tokio::task::JoinHandle<()>,
}
impl UdpRuntime {
pub fn new(cancel: CancellationToken) -> Self {
let bindings: Arc<RwLock<HashMap<String, ActiveBinding>>> =
Arc::new(RwLock::new(HashMap::new()));
let reaper_bindings = bindings.clone();
let reaper_cancel = cancel.clone();
let reaper_handle = tokio::spawn(async move {
Self::reaper_loop(reaper_bindings, reaper_cancel).await;
});
Self {
bindings,
cancel,
_reaper_handle: reaper_handle,
}
}
pub async fn bind(&self, req: UdpBindRequest) -> Result<BindingInfo, UdpError> {
let bind_addr: SocketAddr = format!("{}:{}", req.addr, req.port)
.parse()
.map_err(|e| UdpError::InvalidAddr(format!("{}", e)))?;
let socket = tokio::net::UdpSocket::bind(bind_addr).await?;
let local_addr = socket.local_addr()?;
let id = Uuid::now_v7().to_string();
let now = Utc::now();
let lease_secs = req.lease_secs.min(MAX_LEASE_SECS);
let active = ActiveBinding::new(
id.clone(),
socket,
local_addr,
now,
lease_secs,
self.cancel.clone(),
);
let info = BindingInfo {
id: id.clone(),
local_addr: local_addr.to_string(),
created_at: now,
last_heartbeat: now,
lease_secs,
};
self.bindings.write().await.insert(id, active);
tracing::info!(binding = %info.id, addr = %info.local_addr, "UDP binding created");
Ok(info)
}
pub async fn unbind(&self, id: &str) -> Result<(), UdpError> {
let binding = self
.bindings
.write()
.await
.remove(id)
.ok_or_else(|| UdpError::NotFound(id.to_string()))?;
binding.shutdown();
tracing::info!(binding = %id, "UDP binding removed");
Ok(())
}
pub async fn subscribe(&self, id: &str) -> Result<broadcast::Receiver<UdpDatagram>, UdpError> {
let bindings = self.bindings.read().await;
let binding = bindings
.get(id)
.ok_or_else(|| UdpError::NotFound(id.to_string()))?;
Ok(binding.subscribe())
}
pub async fn send(&self, id: &str, req: UdpSendRequest) -> Result<usize, UdpError> {
use base64::Engine;
let dest: SocketAddr = req
.dest
.parse()
.map_err(|e| UdpError::InvalidAddr(format!("{}", e)))?;
let payload = base64::engine::general_purpose::STANDARD.decode(&req.payload)?;
let bindings = self.bindings.read().await;
let binding = bindings
.get(id)
.ok_or_else(|| UdpError::NotFound(id.to_string()))?;
let sent = binding.send_to(&payload, dest).await?;
Ok(sent)
}
pub async fn heartbeat(&self, id: &str) -> Result<(), UdpError> {
let bindings = self.bindings.read().await;
let binding = bindings
.get(id)
.ok_or_else(|| UdpError::NotFound(id.to_string()))?;
binding.touch();
Ok(())
}
pub async fn status(&self) -> Vec<BindingInfo> {
let bindings = self.bindings.read().await;
bindings
.values()
.map(|b| BindingInfo {
id: b.id().to_string(),
local_addr: b.local_addr().to_string(),
created_at: b.created_at(),
last_heartbeat: b.last_heartbeat(),
lease_secs: b.lease_secs(),
})
.collect()
}
async fn reaper_loop(
bindings: Arc<RwLock<HashMap<String, ActiveBinding>>>,
cancel: CancellationToken,
) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
let now = Utc::now();
let mut map = bindings.write().await;
let expired: Vec<String> = map
.iter()
.filter(|(_, b)| {
let elapsed = now
.signed_duration_since(b.last_heartbeat())
.num_seconds();
elapsed > b.lease_secs() as i64
})
.map(|(id, _)| id.clone())
.collect();
for id in expired {
if let Some(binding) = map.remove(&id) {
binding.shutdown();
tracing::info!(binding = %id, "Reaped expired UDP binding");
}
}
}
}
}
}
pub async fn shutdown(&self) {
self.cancel.cancel();
let mut map = self.bindings.write().await;
for (_, binding) in map.drain() {
binding.shutdown();
}
tracing::debug!("UDP runtime shut down");
}
}
impl koi_common::capability::Capability for UdpRuntime {
fn name(&self) -> &str {
"udp"
}
fn status(&self) -> koi_common::capability::CapabilityStatus {
let count = self.bindings.try_read().map(|b| b.len()).unwrap_or(0);
let summary = if count == 0 {
"no bindings".to_string()
} else {
format!("{count} binding{}", if count == 1 { "" } else { "s" })
};
koi_common::capability::CapabilityStatus {
name: "udp".to_string(),
summary,
healthy: true,
}
}
}