wireband-edge 0.4.1

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Documentation
//! MQTT connector — bridges a broker into the Wire.Band compression pipeline.
//!
//! Feature-gated: only compiled when `--features mqtt` (the default).

use std::time::{Duration, SystemTime, UNIX_EPOCH};

use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS, TlsConfiguration, Transport};
use serde_json::Value;
use tracing::{debug, error, info};

use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;

fn unix_ts() -> f64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64()
}

// ---------------------------------------------------------------------------
// URL parsing (no external crate needed)
// ---------------------------------------------------------------------------

fn parse_broker_url(url: &str) -> (bool, String, u16) {
    let tls = url.starts_with("mqtts://");
    let without_scheme = url
        .trim_start_matches("mqtts://")
        .trim_start_matches("mqtt://");

    // Strip any path component
    let hostport = without_scheme.split('/').next().unwrap_or(without_scheme);

    let (host, port_str) = hostport
        .rsplit_once(':')
        .unwrap_or((hostport, ""));

    let default_port = if tls { 8883 } else { 1883 };
    let port = port_str.parse::<u16>().unwrap_or(default_port);

    (tls, host.to_string(), port)
}

// ---------------------------------------------------------------------------
// MqttConnector
// ---------------------------------------------------------------------------

/// Bridges an MQTT broker into the Wire.Band compression pipeline.
///
/// Each incoming message is:
///   1. Delta-filtered (optional — skip if numeric value unchanged)
///   2. Classified to a theta IoT symbol via topic path + payload key heuristics
///   3. Encoded as a 2-byte theta prefix + compact JSON frame
///   4. Buffered in [`WireBandClient`] for HTTP flush to the backend
pub struct MqttConnector {
    broker_url:   String,
    client_id:    String,
    username:     Option<String>,
    password:     Option<String>,
    keepalive:    u64,
    classifier:   TopicClassifier,
    delta_filter: DeltaFilter,
}

impl MqttConnector {
    pub fn new(broker_url: impl Into<String>, delta_threshold: f64) -> Self {
        Self {
            broker_url:   broker_url.into(),
            client_id:    "wireband-edge".to_string(),
            username:     None,
            password:     None,
            keepalive:    60,
            classifier:   TopicClassifier::new(),
            delta_filter: DeltaFilter::new(delta_threshold),
        }
    }

    pub fn client_id(mut self, id: impl Into<String>) -> Self {
        self.client_id = id.into();
        self
    }

    pub fn credentials(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
        self.username = Some(username.into());
        self.password = Some(password.into());
        self
    }

    pub fn keepalive(mut self, secs: u64) -> Self {
        self.keepalive = secs;
        self
    }

    /// Access the classifier to register custom topic or keyword overrides.
    pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
        &mut self.classifier
    }

    // -----------------------------------------------------------------------
    // Run
    // -----------------------------------------------------------------------

    /// Connect to the broker, subscribe to `topics`, and stream events into
    /// `client` until the task is cancelled.
    ///
    /// This is the typical entry-point for a gateway binary:
    ///
    /// ```ignore
    /// use wireband_edge::client::{WireBandClient, ClientConfig};
    /// use wireband_edge::mqtt::MqttConnector;
    ///
    /// let client = WireBandClient::new(ClientConfig::default());
    /// client.start();
    ///
    /// MqttConnector::new("mqtt://localhost:1883", 0.02)
    ///     .run(client, vec!["sensors/#".into(), "machines/#".into()])
    ///     .await
    ///     .unwrap();
    /// ```
    pub async fn run(
        mut self,
        client: WireBandClient,
        topics: Vec<String>,
    ) -> Result<()> {
        let (tls, host, port) = parse_broker_url(&self.broker_url);

        let mut options = MqttOptions::new(&self.client_id, &host, port);
        options.set_keep_alive(Duration::from_secs(self.keepalive));

        if let (Some(u), Some(p)) = (self.username.as_deref(), self.password.as_deref()) {
            options.set_credentials(u, p);
        }

        if tls {
            options.set_transport(Transport::Tls(TlsConfiguration::default()));
        }

        let (mqtt_client, mut event_loop) = AsyncClient::new(options, 128);

        for topic in &topics {
            mqtt_client
                .subscribe(topic, QoS::AtMostOnce)
                .await
                .map_err(|e| WireBandError::Mqtt(e.to_string()))?;
        }

        info!(
            broker = %self.broker_url,
            ?topics,
            "MQTT connected"
        );

        loop {
            match event_loop.poll().await {
                Ok(Event::Incoming(Packet::Publish(p))) => {
                    let topic   = p.topic.clone();
                    let raw     = p.payload.to_vec();
                    self.process(&client, topic, raw).await;
                }
                Ok(_) => {} // ConnAck, SubAck, PingResp etc.
                Err(e) => {
                    error!(err = %e, "MQTT connection error");
                    return Err(WireBandError::Mqtt(e.to_string()));
                }
            }
        }
    }

    // -----------------------------------------------------------------------
    // Internal
    // -----------------------------------------------------------------------

    async fn process(&mut self, client: &WireBandClient, topic: String, raw: Vec<u8>) {
        // Decode payload — best-effort JSON, fallback to UTF-8 string
        let payload: Value = serde_json::from_slice(&raw).unwrap_or_else(|_| {
            Value::String(
                std::str::from_utf8(&raw)
                    .map(|s| s.to_string())
                    .unwrap_or_else(|_| format!("<binary:{}>", raw.len())),
            )
        });

        // Delta filter
        if self.delta_filter.should_suppress(&topic, &payload) {
            debug!(topic = %topic, "delta-filtered");
            return;
        }

        let symbol = self.classifier.classify(&topic, &payload);
        let encoded = frame::encode(symbol, &topic, &payload);
        let ts = unix_ts();

        client.buffer_event(topic, symbol, encoded, ts).await;
    }
}