wireband-edge 0.4.0

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Documentation
//! BLE connector — streams GATT notifications from BLE peripherals.
//!
//! Scans for a device whose advertised name contains a filter string,
//! connects, subscribes to a specific characteristic UUID, and pipes
//! notification payloads (parsed as JSON, or raw hex) into the Wire.Band
//! compression pipeline.
//!
//! Feature-gated: `--features ble` (uses `btleplug`).

use std::time::{Duration, SystemTime, UNIX_EPOCH};

use btleplug::api::{Central, Manager as _, Peripheral as _, ScanFilter};
use btleplug::platform::Manager;
use futures::StreamExt;
use tracing::{debug, info, warn};
use uuid::Uuid;

use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;

fn unix_ts() -> f64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64()
}

fn hex_encode(data: &[u8]) -> String {
    data.iter().map(|b| format!("{b:02x}")).collect()
}

/// Streams GATT characteristic notifications from a BLE peripheral into
/// the Wire.Band compression pipeline.
///
/// # Example
///
/// ```ignore
/// use std::time::Duration;
/// use wireband_edge::ble::BleConnector;
///
/// BleConnector::new("temp-sensor", "6e400003-b5a3-f393-e0a9-e50e24dcca9e")?
///     .scan_duration(Duration::from_secs(10))
///     .topic_prefix("factory/ble")
///     .run(client)
///     .await?;
/// ```
pub struct BleConnector {
    name_filter:   String,
    char_uuid:     Uuid,
    scan_duration: Duration,
    topic_prefix:  String,
    classifier:    TopicClassifier,
    delta_filter:  DeltaFilter,
}

impl BleConnector {
    /// Create a connector that scans for BLE devices whose advertised name
    /// contains `name_filter`, then subscribes to `char_uuid` (standard UUID
    /// string, e.g. `"6e400003-b5a3-f393-e0a9-e50e24dcca9e"`).
    pub fn new(name_filter: impl Into<String>, char_uuid: impl AsRef<str>) -> Result<Self> {
        let uuid = char_uuid.as_ref().parse::<Uuid>()
            .map_err(|e| WireBandError::Ble(format!("invalid UUID: {e}")))?;
        Ok(Self {
            name_filter:   name_filter.into(),
            char_uuid:     uuid,
            scan_duration: Duration::from_secs(10),
            topic_prefix:  "ble".to_string(),
            classifier:    TopicClassifier::new(),
            delta_filter:  DeltaFilter::new(0.0),
        })
    }

    /// How long to scan for peripherals before giving up (default: 10 s).
    pub fn scan_duration(mut self, d: Duration) -> Self {
        self.scan_duration = d;
        self
    }

    /// Topic prefix for events from this peripheral (default: `"ble"`).
    ///
    /// Events are published to `{prefix}/{device_name}/data`.
    pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
        self.topic_prefix = prefix.into();
        self
    }

    /// Suppress numeric readings that change less than this fraction (0.0 = forward all).
    pub fn delta_threshold(mut self, threshold: f64) -> Self {
        self.delta_filter = DeltaFilter::new(threshold);
        self
    }

    /// Access the topic classifier to register custom keyword/topic mappings.
    pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
        &mut self.classifier
    }

    /// Scan → connect → subscribe → stream notifications into `client` until
    /// the connection drops or an error occurs.
    pub async fn run(mut self, client: WireBandClient) -> Result<()> {
        let manager = Manager::new().await
            .map_err(|e| WireBandError::Ble(format!("BLE manager init: {e}")))?;

        let adapters = manager.adapters().await
            .map_err(|e| WireBandError::Ble(format!("BLE list adapters: {e}")))?;

        let adapter = adapters.into_iter().next()
            .ok_or_else(|| WireBandError::Ble("no BLE adapter found".into()))?;

        info!(filter = %self.name_filter, "BLE: scanning for peripheral...");

        adapter.start_scan(ScanFilter::default()).await
            .map_err(|e| WireBandError::Ble(format!("BLE scan start: {e}")))?;

        tokio::time::sleep(self.scan_duration).await;
        adapter.stop_scan().await.ok();

        let peripherals = adapter.peripherals().await
            .map_err(|e| WireBandError::Ble(format!("BLE list peripherals: {e}")))?;

        // Find first peripheral whose name contains the filter string
        let mut periph_opt = None;
        for p in peripherals {
            let props = p.properties().await.ok().flatten();
            let name  = props.as_ref().and_then(|pr| pr.local_name.as_deref()).unwrap_or("");
            if name.contains(self.name_filter.as_str()) {
                info!(device = %name, "BLE: peripheral matched");
                periph_opt = Some(p);
                break;
            }
        }

        let periph = periph_opt.ok_or_else(|| WireBandError::Ble(
            format!("BLE peripheral '{}' not found after scan", self.name_filter)
        ))?;

        periph.connect().await
            .map_err(|e| WireBandError::Ble(format!("BLE connect: {e}")))?;
        periph.discover_services().await
            .map_err(|e| WireBandError::Ble(format!("BLE discover services: {e}")))?;

        let chars = periph.characteristics();
        let target = chars.iter()
            .find(|c| c.uuid == self.char_uuid)
            .ok_or_else(|| WireBandError::Ble(
                format!("characteristic {} not found on device", self.char_uuid)
            ))?
            .clone();

        periph.subscribe(&target).await
            .map_err(|e| WireBandError::Ble(format!("BLE subscribe: {e}")))?;

        let device_name = periph.properties().await.ok().flatten()
            .and_then(|p| p.local_name)
            .unwrap_or_else(|| self.name_filter.clone());

        info!(device = %device_name, char = %self.char_uuid, "BLE: streaming notifications");

        let mut stream = periph.notifications().await
            .map_err(|e| WireBandError::Ble(format!("BLE notifications stream: {e}")))?;

        while let Some(notif) = stream.next().await {
            self.process(&client, &device_name, &notif.value).await;
        }

        warn!(device = %device_name, "BLE: notification stream ended");
        Ok(())
    }

    async fn process(&mut self, client: &WireBandClient, device: &str, data: &[u8]) {
        // Try UTF-8 JSON, fall back to hex string
        let payload: serde_json::Value = std::str::from_utf8(data)
            .ok()
            .and_then(|s| serde_json::from_str(s).ok())
            .unwrap_or_else(|| serde_json::Value::String(hex_encode(data)));

        let topic = format!("{}/{}/data", self.topic_prefix, device);

        if self.delta_filter.should_suppress(&topic, &payload) {
            debug!(topic = %topic, "BLE: notification delta-filtered");
            return;
        }

        let symbol  = self.classifier.classify(&topic, &payload);
        let encoded = frame::encode(symbol, &topic, &payload);
        client.buffer_event(topic, symbol, encoded, unix_ts()).await;
        debug!(device = %device, symbol = format!("{symbol:#06x}"), "BLE notification buffered");
    }
}