wireband-edge 0.4.1

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Documentation
//! CoAP server connector — receives sensor data via CoAP PUT/POST.
//!
//! Binds a UDP socket (default port 5683), parses incoming CoAP packets via
//! `coap-lite`, extracts the URI path as the Wire.Band topic, parses the
//! payload as JSON, and buffers frames into [`WireBandClient`].
//!
//! Covers constrained devices (microcontrollers, sensors) that speak CoAP
//! instead of MQTT.
//!
//! Feature-gated: `--features coap`.

use std::net::SocketAddr;
use std::time::{SystemTime, UNIX_EPOCH};

use coap_lite::{CoapOption, MessageClass, Packet, RequestType};
use tokio::net::UdpSocket;
use tracing::{debug, error, info, warn};

use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;

fn unix_ts() -> f64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64()
}

fn hex_encode(data: &[u8]) -> String {
    data.iter().map(|b| format!("{b:02x}")).collect()
}

/// CoAP UDP server: receives PUT/POST from constrained sensors and feeds the
/// Wire.Band pipeline.
///
/// # Wire format
///
/// The CoAP URI path becomes the Wire.Band topic:
/// `PUT coap://gateway/sensors/zone-a/temperature` → topic `coap/sensors/zone-a/temperature`
///
/// Payload must be JSON; non-JSON payloads are forwarded as `{"raw": "<hex>"}`.
///
/// # Example
///
/// ```ignore
/// use wireband_edge::coap::CoapServer;
///
/// CoapServer::new("0.0.0.0:5683")
///     .topic_prefix("factory/coap")
///     .delta_threshold(0.01)
///     .run(client)
///     .await?;
/// ```
pub struct CoapServer {
    bind_addr:    String,
    topic_prefix: String,
    classifier:   TopicClassifier,
    delta_filter: DeltaFilter,
}

impl CoapServer {
    /// Create a CoAP server bound to `addr` (e.g. `"0.0.0.0:5683"`).
    pub fn new(addr: impl Into<String>) -> Self {
        Self {
            bind_addr:    addr.into(),
            topic_prefix: "coap".to_string(),
            classifier:   TopicClassifier::new(),
            delta_filter: DeltaFilter::new(0.0),
        }
    }

    /// Topic prefix prepended to the CoAP URI path (default: `"coap"`).
    pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
        self.topic_prefix = prefix.into();
        self
    }

    /// Suppress numeric readings that change less than this fraction (0.0 = forward all).
    pub fn delta_threshold(mut self, threshold: f64) -> Self {
        self.delta_filter = DeltaFilter::new(threshold);
        self
    }

    /// Access the topic classifier to register custom keyword/topic mappings.
    pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
        &mut self.classifier
    }

    /// Bind the UDP socket and process incoming CoAP messages until an I/O error.
    pub async fn run(mut self, client: WireBandClient) -> Result<()> {
        let socket = UdpSocket::bind(&self.bind_addr).await
            .map_err(|e| WireBandError::Coap(
                format!("CoAP bind {}: {e}", self.bind_addr)
            ))?;

        info!(addr = %self.bind_addr, "CoAP server listening");

        let mut buf = vec![0u8; 4096];
        loop {
            match socket.recv_from(&mut buf).await {
                Ok((len, src)) => {
                    self.handle(&client, &buf[..len], src).await;
                }
                Err(e) => {
                    error!(err = %e, "CoAP recv error");
                    return Err(WireBandError::Coap(e.to_string()));
                }
            }
        }
    }

    async fn handle(&mut self, client: &WireBandClient, raw: &[u8], src: SocketAddr) {
        let packet = match Packet::from_bytes(raw) {
            Ok(p)  => p,
            Err(e) => { warn!(src = %src, err = ?e, "CoAP: parse failed"); return; }
        };

        // Accept PUT and POST only
        let is_write = matches!(
            packet.header.code,
            MessageClass::Request(RequestType::Put) | MessageClass::Request(RequestType::Post)
        );
        if !is_write {
            debug!(src = %src, code = ?packet.header.code, "CoAP: ignoring non-write request");
            return;
        }

        // Build topic from URI-Path option segments
        let path_parts: Vec<String> = packet
            .get_option(CoapOption::UriPath)
            .map(|ll| {
                ll.iter()
                    .filter_map(|b| std::str::from_utf8(b).ok())
                    .map(String::from)
                    .collect()
            })
            .unwrap_or_default();

        let uri_path = if path_parts.is_empty() { "data".to_string() } else { path_parts.join("/") };
        let topic    = format!("{}/{}", self.topic_prefix, uri_path);

        // Parse payload: JSON preferred, hex fallback
        let payload: serde_json::Value = std::str::from_utf8(&packet.payload)
            .ok()
            .and_then(|s| serde_json::from_str(s).ok())
            .unwrap_or_else(|| serde_json::json!({ "raw": hex_encode(&packet.payload) }));

        if self.delta_filter.should_suppress(&topic, &payload) {
            debug!(topic = %topic, "CoAP: packet delta-filtered");
            return;
        }

        let symbol  = self.classifier.classify(&topic, &payload);
        let encoded = frame::encode(symbol, &topic, &payload);
        client.buffer_event(topic.clone(), symbol, encoded, unix_ts()).await;
        debug!(src = %src, topic = %topic, symbol = format!("{symbol:#06x}"), "CoAP buffered");
    }
}