use std::net::SocketAddr;
use std::time::{SystemTime, UNIX_EPOCH};
use coap_lite::{CoapOption, MessageClass, Packet, RequestType};
use tokio::net::UdpSocket;
use tracing::{debug, error, info, warn};
use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;
fn unix_ts() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
fn hex_encode(data: &[u8]) -> String {
data.iter().map(|b| format!("{b:02x}")).collect()
}
pub struct CoapServer {
bind_addr: String,
topic_prefix: String,
classifier: TopicClassifier,
delta_filter: DeltaFilter,
}
impl CoapServer {
pub fn new(addr: impl Into<String>) -> Self {
Self {
bind_addr: addr.into(),
topic_prefix: "coap".to_string(),
classifier: TopicClassifier::new(),
delta_filter: DeltaFilter::new(0.0),
}
}
pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
self.topic_prefix = prefix.into();
self
}
pub fn delta_threshold(mut self, threshold: f64) -> Self {
self.delta_filter = DeltaFilter::new(threshold);
self
}
pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
&mut self.classifier
}
pub async fn run(mut self, client: WireBandClient) -> Result<()> {
let socket = UdpSocket::bind(&self.bind_addr).await
.map_err(|e| WireBandError::Coap(
format!("CoAP bind {}: {e}", self.bind_addr)
))?;
info!(addr = %self.bind_addr, "CoAP server listening");
let mut buf = vec![0u8; 4096];
loop {
match socket.recv_from(&mut buf).await {
Ok((len, src)) => {
self.handle(&client, &buf[..len], src).await;
}
Err(e) => {
error!(err = %e, "CoAP recv error");
return Err(WireBandError::Coap(e.to_string()));
}
}
}
}
async fn handle(&mut self, client: &WireBandClient, raw: &[u8], src: SocketAddr) {
let packet = match Packet::from_bytes(raw) {
Ok(p) => p,
Err(e) => { warn!(src = %src, err = ?e, "CoAP: parse failed"); return; }
};
let is_write = matches!(
packet.header.code,
MessageClass::Request(RequestType::Put) | MessageClass::Request(RequestType::Post)
);
if !is_write {
debug!(src = %src, code = ?packet.header.code, "CoAP: ignoring non-write request");
return;
}
let path_parts: Vec<String> = packet
.get_option(CoapOption::UriPath)
.map(|ll| {
ll.iter()
.filter_map(|b| std::str::from_utf8(b).ok())
.map(String::from)
.collect()
})
.unwrap_or_default();
let uri_path = if path_parts.is_empty() { "data".to_string() } else { path_parts.join("/") };
let topic = format!("{}/{}", self.topic_prefix, uri_path);
let payload: serde_json::Value = std::str::from_utf8(&packet.payload)
.ok()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or_else(|| serde_json::json!({ "raw": hex_encode(&packet.payload) }));
if self.delta_filter.should_suppress(&topic, &payload) {
debug!(topic = %topic, "CoAP: packet delta-filtered");
return;
}
let symbol = self.classifier.classify(&topic, &payload);
let encoded = frame::encode(symbol, &topic, &payload);
client.buffer_event(topic.clone(), symbol, encoded, unix_ts()).await;
debug!(src = %src, topic = %topic, symbol = format!("{symbol:#06x}"), "CoAP buffered");
}
}