wireband-edge 0.4.1

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Documentation
//! PyO3 bindings — exposes Wire.Band edge client to Python via maturin.
//!
//! Build with:
//! ```bash
//! pip install maturin
//! cd edge-rs
//! maturin develop --features python
//! ```
//!
//! Then in Python:
//! ```python
//! from wireband_edge_rs import WireBandClient
//!
//! client = WireBandClient(
//!     backend_url="https://ingest.wire.band",
//!     device_id="factory-rpi4",
//!     api_key="YOUR_API_KEY",
//! )
//! client.start()
//! client.buffer_event("sensors/temp", 0xFC62, '{"value": 23.5}')
//! print(client.stats())
//! ```

use std::time::{SystemTime, UNIX_EPOCH};

use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyDict;
use tokio::runtime::Runtime;

use crate::client::{ClientConfig, WireBandClient};
use crate::frame;

fn unix_ts() -> f64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64()
}

// ---------------------------------------------------------------------------
// WireBandClient Python wrapper
// ---------------------------------------------------------------------------

/// High-performance Wire.Band edge client backed by Rust.
///
/// Maintains a ring buffer of up to `buffer_size` events and flushes them
/// to the Wire.Band backend in batches every `flush_interval` seconds.
///
/// Thread-safe: the internal state is Arc-backed so the object can be shared
/// across Python threads (release the GIL before calling methods from multiple
/// threads simultaneously).
#[pyclass(name = "WireBandClient")]
pub struct PyWireBandClient {
    client:  WireBandClient,
    runtime: Runtime,
}

#[pymethods]
impl PyWireBandClient {
    /// Create a new client.
    ///
    /// Args:
    ///     backend_url: Base URL of the Wire.Band ingest endpoint.
    ///     device_id:   Unique identifier for this edge node.
    ///     api_key:     Optional bearer token for authentication.
    ///     buffer_size: Max events held in RAM before oldest are dropped (default 50_000).
    ///     flush_batch: Events per HTTP flush request (default 200).
    ///     flush_interval_secs: Flush interval in seconds (default 1.0).
    #[new]
    #[pyo3(signature = (
        backend_url,
        device_id,
        api_key = None,
        buffer_size = 50_000,
        flush_batch = 200,
        flush_interval_secs = 1.0,
    ))]
    fn new(
        backend_url:         String,
        device_id:           String,
        api_key:             Option<String>,
        buffer_size:         usize,
        flush_batch:         usize,
        flush_interval_secs: f64,
    ) -> PyResult<Self> {
        let runtime = Runtime::new()
            .map_err(|e| PyRuntimeError::new_err(format!("tokio runtime: {e}")))?;

        let config = ClientConfig {
            backend_url,
            device_id,
            api_key,
            buffer_size,
            flush_batch,
            flush_interval: std::time::Duration::from_secs_f64(flush_interval_secs),
            ..Default::default()
        };

        let client = WireBandClient::new(config);

        // start() calls tokio::spawn — must be inside a runtime context
        runtime.block_on(async { client.start() });

        Ok(Self { client, runtime })
    }

    /// Buffer a pre-classified event.
    ///
    /// Args:
    ///     topic:   MQTT-style topic string (e.g. `"sensors/zone-a/temperature"`).
    ///     symbol:  2-byte theta symbol (e.g. `0xFC62` for `SENSOR_TEMPERATURE`).
    ///     data:    JSON string payload (e.g. `'{"value": 23.5}'`).
    fn buffer_event(
        &self,
        py:     Python<'_>,
        topic:  String,
        symbol: u16,
        data:   String,
    ) -> PyResult<()> {
        let payload: serde_json::Value = serde_json::from_str(&data)
            .map_err(|e| PyValueError::new_err(format!("invalid JSON: {e}")))?;
        let encoded = frame::encode(symbol, &topic, &payload);
        let ts      = unix_ts();

        // Release the GIL while we take the async mutex lock
        py.allow_threads(|| {
            self.runtime.block_on(self.client.buffer_event(topic, symbol, encoded, ts));
        });
        Ok(())
    }

    /// Ingest a raw value, classifying it automatically from the topic.
    ///
    /// Args:
    ///     data:   JSON string payload.
    ///     topic:  MQTT-style topic string.
    ///     symbol: Optional theta symbol override; auto-classified when `None`.
    fn ingest(
        &self,
        py:     Python<'_>,
        data:   String,
        topic:  String,
        symbol: Option<u16>,
    ) -> PyResult<()> {
        let value: serde_json::Value = serde_json::from_str(&data)
            .map_err(|e| PyValueError::new_err(format!("invalid JSON: {e}")))?;

        py.allow_threads(|| {
            self.runtime.block_on(self.client.ingest(value, &topic, symbol));
        });
        Ok(())
    }

    /// Manually flush buffered events to the backend.
    ///
    /// Returns the number of events sent.
    fn flush(&self, py: Python<'_>) -> u64 {
        py.allow_threads(|| self.runtime.block_on(self.client.flush()))
    }

    /// Current buffer depth (non-blocking best-effort).
    fn buffer_depth(&self) -> usize {
        self.client.buffer_depth()
    }

    /// Return a dict with ingestion statistics.
    fn stats<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
        let s    = self.runtime.block_on(self.client.stats());
        let dict = PyDict::new_bound(py);
        dict.set_item("events_ingested", s.events_ingested)?;
        dict.set_item("events_flushed",  s.events_flushed)?;
        dict.set_item("events_dropped",  s.events_dropped)?;
        dict.set_item("flush_errors",    s.flush_errors)?;
        dict.set_item("bytes_sent",      s.bytes_sent)?;
        Ok(dict)
    }

    fn __repr__(&self) -> String {
        let cfg = self.client.config();
        format!(
            "WireBandClient(device_id={:?}, backend_url={:?}, buffer_depth={})",
            cfg.device_id,
            cfg.backend_url,
            self.client.buffer_depth()
        )
    }
}

impl Drop for PyWireBandClient {
    /// Perform a final flush on GC/scope exit.
    fn drop(&mut self) {
        self.runtime.block_on(self.client.stop());
    }
}

// ---------------------------------------------------------------------------
// Module entry point
// ---------------------------------------------------------------------------

#[pymodule]
fn wireband_edge_rs(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_class::<PyWireBandClient>()?;
    Ok(())
}