wireband-edge 0.4.0

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Documentation
//! Wire.Band edge client — ring buffer + HTTP flush loop.

use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[cfg(feature = "crypto")]
use crate::crypto::CryptoContext;

use reqwest::{Client, header};
use serde_json::Value;
use tokio::sync::{Mutex, oneshot};
use tokio::time;
use tracing::{debug, error, info, warn};

use crate::frame;
use crate::symbols::METRICS_GAUGE_SET;

fn unix_ts() -> f64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64()
}

// ---------------------------------------------------------------------------
// BufferedEvent
// ---------------------------------------------------------------------------

#[derive(Debug, Clone)]
pub struct BufferedEvent {
    pub topic:    String,
    pub symbol:   u16,
    pub frame:    Vec<u8>, // theta-prefixed compact frame
    pub ts:       f64,
    pub attempts: u32,
}

impl BufferedEvent {
    pub fn new(topic: String, symbol: u16, frame: Vec<u8>) -> Self {
        Self { topic, symbol, frame, ts: unix_ts(), attempts: 0 }
    }

    /// Serialize to the JSON shape expected by `/iot/v1/ingest/batch`.
    pub fn to_ingest_json(&self, device_id: &str) -> Value {
        serde_json::json!({
            "device_id": device_id,
            "topic":     self.topic,
            "symbol":    format!("{:#06x}", self.symbol),
            "frame":     frame::to_hex(&self.frame),
            "ts":        self.ts,
        })
    }
}

// ---------------------------------------------------------------------------
// Config
// ---------------------------------------------------------------------------

#[derive(Debug, Clone)]
pub struct ClientConfig {
    pub backend_url:     String,
    pub device_id:       String,
    pub api_key:         Option<String>,
    pub buffer_size:     usize,
    pub flush_interval:  Duration,
    pub flush_batch:     usize,
    pub max_retries:     u32,
    pub backoff_base:    f64,
    pub delta_threshold: f64,
    /// Optional frame crypto context. When set, every frame is encrypted
    /// before entering the ring buffer (AEAD + optional symbol remapping +
    /// optional HKDF key derivation). Requires the `crypto` feature.
    #[cfg(feature = "crypto")]
    pub crypto_ctx: Option<Arc<CryptoContext>>,
}

impl Default for ClientConfig {
    fn default() -> Self {
        Self {
            backend_url:     "http://localhost:8000".to_string(),
            device_id:       "edge-node".to_string(),
            api_key:         None,
            buffer_size:     50_000,
            flush_interval:  Duration::from_secs(1),
            flush_batch:     200,
            max_retries:     3,
            backoff_base:    2.0,
            delta_threshold: 0.0,
            #[cfg(feature = "crypto")]
            crypto_ctx:      None,
        }
    }
}

// ---------------------------------------------------------------------------
// Stats
// ---------------------------------------------------------------------------

#[derive(Debug, Default, Clone)]
pub struct ClientStats {
    pub events_ingested:  u64,
    pub events_flushed:   u64,
    pub events_dropped:   u64,
    pub flush_errors:     u64,
    pub bytes_sent:       u64,
    pub frames_encrypted: u64,
    pub crypto_errors:    u64,
}

// ---------------------------------------------------------------------------
// Shared inner state (Arc-wrapped so the client is Clone)
// ---------------------------------------------------------------------------

struct Inner {
    config: ClientConfig,
    buffer: Mutex<VecDeque<BufferedEvent>>,
    stats:  Mutex<ClientStats>,
    http:   Client,
}

impl Inner {
    fn new(config: ClientConfig) -> Self {
        let mut headers = header::HeaderMap::new();
        if let Some(ref key) = config.api_key {
            if let Ok(val) = header::HeaderValue::from_str(&format!("Bearer {key}")) {
                headers.insert(header::AUTHORIZATION, val);
            }
        }

        let http = Client::builder()
            .default_headers(headers)
            .timeout(Duration::from_secs(10))
            .build()
            .expect("failed to build HTTP client");

        Self {
            config,
            buffer: Mutex::new(VecDeque::new()),
            stats:  Mutex::new(ClientStats::default()),
            http,
        }
    }
}

// ---------------------------------------------------------------------------
// WireBandClient
// ---------------------------------------------------------------------------

/// Gateway-side Wire.Band agent.
///
/// Clone-safe: all shared state is `Arc`-backed. Clone the client to pass
/// into a connector task without needing explicit `Arc<Mutex<WireBandClient>>`.
///
/// # Example
///
/// ```ignore
/// use wireband_edge::client::{WireBandClient, ClientConfig};
///
/// let client = WireBandClient::new(ClientConfig {
///     backend_url: "http://localhost:8000".into(),
///     device_id:   "rpi-01".into(),
///     ..Default::default()
/// });
///
/// client.start();
/// client.ingest(serde_json::json!({"temp": 22.5}), "env/zone-a", None).await;
/// client.flush().await;
/// client.stop().await;
/// ```
#[derive(Clone)]
pub struct WireBandClient {
    inner:        Arc<Inner>,
    // Lifecycle state is only meaningful on the "owner" clone.
    shutdown_tx:  Arc<Mutex<Option<oneshot::Sender<()>>>>,
    flush_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}

impl WireBandClient {
    pub fn new(config: ClientConfig) -> Self {
        Self {
            inner:        Arc::new(Inner::new(config)),
            shutdown_tx:  Arc::new(Mutex::new(None)),
            flush_handle: Arc::new(Mutex::new(None)),
        }
    }

    // -----------------------------------------------------------------------
    // Lifecycle
    // -----------------------------------------------------------------------

    /// Start the background flush loop.
    pub fn start(&self) {
        let (tx, mut rx) = oneshot::channel::<()>();
        let inner = Arc::clone(&self.inner);

        let handle = tokio::spawn(async move {
            let mut interval = time::interval(inner.config.flush_interval);
            loop {
                tokio::select! {
                    _ = interval.tick() => {
                        Self::do_flush(&inner).await;
                    }
                    _ = &mut rx => {
                        debug!("Flush loop shutting down");
                        break;
                    }
                }
            }
        });

        let tx_slot = Arc::clone(&self.shutdown_tx);
        let handle_slot = Arc::clone(&self.flush_handle);
        tokio::spawn(async move {
            *tx_slot.lock().await     = Some(tx);
            *handle_slot.lock().await = Some(handle);
        });

        debug!(
            interval_ms = self.inner.config.flush_interval.as_millis(),
            "WireBandClient flush loop started"
        );
    }

    /// Stop the flush loop and perform a final drain flush.
    pub async fn stop(&self) {
        if let Some(tx) = self.shutdown_tx.lock().await.take() {
            let _ = tx.send(());
        }
        if let Some(handle) = self.flush_handle.lock().await.take() {
            let _ = handle.await;
        }
        // Final flush — drain remaining events
        Self::do_flush(&self.inner).await;
        info!("WireBandClient stopped. stats={:?}", self.stats().await);
    }

    // -----------------------------------------------------------------------
    // Ingest
    // -----------------------------------------------------------------------

    /// Buffer a raw value (not from MQTT).
    ///
    /// `symbol` defaults to [`METRICS_GAUGE_SET`] when `None`.
    pub async fn ingest(&self, data: Value, topic: &str, symbol: Option<u16>) {
        let sym = symbol.unwrap_or(METRICS_GAUGE_SET);
        let f   = self.maybe_encrypt(frame::encode(sym, topic, &data)).await;
        self.push(BufferedEvent::new(topic.to_string(), sym, f)).await;
    }

    /// Buffer a pre-encoded event (produced by a connector).
    pub async fn buffer_event(&self, topic: String, symbol: u16, frame: Vec<u8>, ts: f64) {
        let frame = self.maybe_encrypt(frame).await;
        self.push(BufferedEvent { topic, symbol, frame, ts, attempts: 0 }).await;
    }

    /// Encrypt a frame if a crypto context is configured; pass through otherwise.
    async fn maybe_encrypt(&self, frame: Vec<u8>) -> Vec<u8> {
        #[cfg(feature = "crypto")]
        if let Some(ref ctx) = self.inner.config.crypto_ctx {
            if ctx.is_active() {
                match ctx.encrypt_frame(&frame) {
                    Ok(enc) => {
                        self.inner.stats.lock().await.frames_encrypted += 1;
                        return enc;
                    }
                    Err(e) => {
                        warn!("Frame encryption failed, sending plaintext: {e}");
                        self.inner.stats.lock().await.crypto_errors += 1;
                    }
                }
            }
        }
        frame
    }

    async fn push(&self, event: BufferedEvent) {
        let mut buf   = self.inner.buffer.lock().await;
        let mut stats = self.inner.stats.lock().await;
        if buf.len() >= self.inner.config.buffer_size {
            buf.pop_front(); // ring: drop oldest
            stats.events_dropped += 1;
        }
        buf.push_back(event);
        stats.events_ingested += 1;
    }

    // -----------------------------------------------------------------------
    // Flush
    // -----------------------------------------------------------------------

    /// Manually flush buffered events. Returns number of events sent.
    pub async fn flush(&self) -> u64 {
        Self::do_flush(&self.inner).await
    }

    async fn do_flush(inner: &Arc<Inner>) -> u64 {
        let batch: Vec<BufferedEvent> = {
            let mut buf = inner.buffer.lock().await;
            if buf.is_empty() { return 0; }
            let n = buf.len().min(inner.config.flush_batch);
            buf.drain(..n).collect()
        };

        let payload: Vec<Value> = batch
            .iter()
            .map(|e| e.to_ingest_json(&inner.config.device_id))
            .collect();

        let sent = Self::send_with_retry(inner, &payload).await;

        if sent {
            let n     = batch.len() as u64;
            let bytes = batch.iter().map(|e| e.frame.len() as u64).sum::<u64>();
            let mut s = inner.stats.lock().await;
            s.events_flushed += n;
            s.bytes_sent     += bytes;
            n
        } else {
            // Re-queue at front so events aren't lost
            let mut buf = inner.buffer.lock().await;
            for event in batch.into_iter().rev() {
                buf.push_front(event);
            }
            0
        }
    }

    async fn send_with_retry(inner: &Arc<Inner>, payload: &[Value]) -> bool {
        let url = format!(
            "{}/iot/v1/ingest/batch",
            inner.config.backend_url.trim_end_matches('/')
        );
        let body = serde_json::json!({ "events": payload });

        for attempt in 0..inner.config.max_retries {
            match inner.http.post(&url).json(&body).send().await {
                Ok(resp) if resp.status().as_u16() < 500 => {
                    debug!(events = payload.len(), attempt, "Flush OK");
                    return true;
                }
                Ok(resp) => {
                    warn!(
                        attempt, max = inner.config.max_retries,
                        status = resp.status().as_u16(),
                        "Flush server error"
                    );
                }
                Err(e) => {
                    warn!(attempt, max = inner.config.max_retries, err = %e, "Flush network error");
                }
            }

            if attempt + 1 < inner.config.max_retries {
                let delay = inner.config.backoff_base.powi(attempt as i32);
                time::sleep(Duration::from_secs_f64(delay)).await;
            }
        }

        error!(retries = inner.config.max_retries, "Flush failed after all retries");
        inner.stats.lock().await.flush_errors += 1;
        false
    }

    // -----------------------------------------------------------------------
    // Stats / introspection
    // -----------------------------------------------------------------------

    pub async fn stats(&self) -> ClientStats {
        self.inner.stats.lock().await.clone()
    }

    pub fn config(&self) -> &ClientConfig {
        &self.inner.config
    }

    /// Best-effort non-blocking buffer depth check.
    pub fn buffer_depth(&self) -> usize {
        self.inner.buffer.try_lock().map(|b| b.len()).unwrap_or(0)
    }
}