wireband-edge 0.4.1

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Documentation
//! Serial port connector — reads line-oriented data from UART/RS-232/RS-485.
//!
//! Each line is parsed as JSON (or kept as a raw string), classified to a
//! theta symbol, framed, and buffered in [`WireBandClient`] for HTTP flush.
//!
//! Covers: NMEA GPS streams, Modbus ASCII, custom sensor protocols,
//! any device that emits newline-delimited JSON.
//!
//! Feature-gated: `--features serial` (uses `tokio-serial`).

use std::time::{SystemTime, UNIX_EPOCH};

use serde_json::Value;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_serial::SerialPortBuilderExt;
use tracing::{debug, error, info};

use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;

fn unix_ts() -> f64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64()
}

/// Reads newline-delimited data from a serial port and feeds it into
/// the Wire.Band compression pipeline.
///
/// # Example
///
/// ```ignore
/// use wireband_edge::serial::SerialConnector;
///
/// SerialConnector::new("/dev/ttyUSB0", 115_200)
///     .topic_prefix("factory/line1")
///     .delta_threshold(0.01)
///     .run(client)
///     .await?;
/// ```
pub struct SerialConnector {
    port:         String,
    baud:         u32,
    topic_prefix: String,
    classifier:   TopicClassifier,
    delta_filter: DeltaFilter,
}

impl SerialConnector {
    /// Create a connector for `port` (e.g. `/dev/ttyUSB0`, `COM3`) at `baud` rate.
    pub fn new(port: impl Into<String>, baud: u32) -> Self {
        Self {
            port:         port.into(),
            baud,
            topic_prefix: "serial".to_string(),
            classifier:   TopicClassifier::new(),
            delta_filter: DeltaFilter::new(0.0),
        }
    }

    /// MQTT-style topic prefix for events from this port (default: `"serial"`).
    ///
    /// Events will be published to `{prefix}/data`.
    pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
        self.topic_prefix = prefix.into();
        self
    }

    /// Suppress numeric readings that change less than this fraction (0.0 = forward all).
    pub fn delta_threshold(mut self, threshold: f64) -> Self {
        self.delta_filter = DeltaFilter::new(threshold);
        self
    }

    /// Access the topic classifier to register custom keyword/topic mappings.
    pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
        &mut self.classifier
    }

    /// Open the serial port and stream data into `client` until EOF or error.
    pub async fn run(mut self, client: WireBandClient) -> Result<()> {
        let port = tokio_serial::new(&self.port, self.baud)
            .open_native_async()
            .map_err(|e| WireBandError::Serial(e.to_string()))?;

        info!(port = %self.port, baud = self.baud, "Serial connector started");

        let mut lines = BufReader::new(port).lines();

        loop {
            match lines.next_line().await {
                Ok(Some(line)) => {
                    self.process_line(&client, &line).await;
                }
                Ok(None) => {
                    info!(port = %self.port, "Serial port EOF");
                    break;
                }
                Err(e) => {
                    error!(port = %self.port, err = %e, "Serial read error");
                    return Err(WireBandError::Serial(e.to_string()));
                }
            }
        }

        Ok(())
    }

    // -----------------------------------------------------------------------
    // Internal
    // -----------------------------------------------------------------------

    async fn process_line(&mut self, client: &WireBandClient, line: &str) {
        let line = line.trim();
        if line.is_empty() { return; }

        // Parse as JSON; fall back to raw string (covers NMEA, custom protocols)
        let payload: Value = serde_json::from_str(line)
            .unwrap_or_else(|_| Value::String(line.to_string()));

        let topic = format!("{}/data", self.topic_prefix);

        if self.delta_filter.should_suppress(&topic, &payload) {
            debug!(topic = %topic, "Serial line delta-filtered");
            return;
        }

        let symbol  = self.classifier.classify(&topic, &payload);
        let encoded = frame::encode(symbol, &topic, &payload);
        let ts      = unix_ts();

        client.buffer_event(topic, symbol, encoded, ts).await;
        debug!(port = %self.port, symbol = format!("{symbol:#06x}"), "Serial line buffered");
    }
}