Skip to main content

wireband_edge/
client.rs

1//! Wire.Band edge client — ring buffer + HTTP flush loop.
2
3use std::collections::VecDeque;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7#[cfg(feature = "crypto")]
8use crate::crypto::CryptoContext;
9
10use reqwest::{Client, header};
11use serde_json::Value;
12use tokio::sync::{Mutex, oneshot};
13use tokio::time;
14use tracing::{debug, error, info, warn};
15
16use crate::frame;
17use crate::symbols::METRICS_GAUGE_SET;
18
19fn unix_ts() -> f64 {
20    SystemTime::now()
21        .duration_since(UNIX_EPOCH)
22        .unwrap_or_default()
23        .as_secs_f64()
24}
25
26// ---------------------------------------------------------------------------
27// BufferedEvent
28// ---------------------------------------------------------------------------
29
30#[derive(Debug, Clone)]
31pub struct BufferedEvent {
32    pub topic:    String,
33    pub symbol:   u16,
34    pub frame:    Vec<u8>, // theta-prefixed compact frame
35    pub ts:       f64,
36    pub attempts: u32,
37}
38
39impl BufferedEvent {
40    pub fn new(topic: String, symbol: u16, frame: Vec<u8>) -> Self {
41        Self { topic, symbol, frame, ts: unix_ts(), attempts: 0 }
42    }
43
44    /// Serialize to the JSON shape expected by `/iot/v1/ingest/batch`.
45    pub fn to_ingest_json(&self, device_id: &str) -> Value {
46        serde_json::json!({
47            "device_id": device_id,
48            "topic":     self.topic,
49            "symbol":    format!("{:#06x}", self.symbol),
50            "frame":     frame::to_hex(&self.frame),
51            "ts":        self.ts,
52        })
53    }
54}
55
56// ---------------------------------------------------------------------------
57// Config
58// ---------------------------------------------------------------------------
59
60#[derive(Debug, Clone)]
61pub struct ClientConfig {
62    pub backend_url:     String,
63    pub device_id:       String,
64    pub api_key:         Option<String>,
65    pub buffer_size:     usize,
66    pub flush_interval:  Duration,
67    pub flush_batch:     usize,
68    pub max_retries:     u32,
69    pub backoff_base:    f64,
70    pub delta_threshold: f64,
71    /// Optional frame crypto context. When set, every frame is encrypted
72    /// before entering the ring buffer (AEAD + optional symbol remapping +
73    /// optional HKDF key derivation). Requires the `crypto` feature.
74    #[cfg(feature = "crypto")]
75    pub crypto_ctx: Option<Arc<CryptoContext>>,
76}
77
78impl Default for ClientConfig {
79    fn default() -> Self {
80        Self {
81            backend_url:     "http://localhost:8000".to_string(),
82            device_id:       "edge-node".to_string(),
83            api_key:         None,
84            buffer_size:     50_000,
85            flush_interval:  Duration::from_secs(1),
86            flush_batch:     200,
87            max_retries:     3,
88            backoff_base:    2.0,
89            delta_threshold: 0.0,
90            #[cfg(feature = "crypto")]
91            crypto_ctx:      None,
92        }
93    }
94}
95
96// ---------------------------------------------------------------------------
97// Stats
98// ---------------------------------------------------------------------------
99
100#[derive(Debug, Default, Clone)]
101pub struct ClientStats {
102    pub events_ingested:  u64,
103    pub events_flushed:   u64,
104    pub events_dropped:   u64,
105    pub flush_errors:     u64,
106    pub bytes_sent:       u64,
107    pub frames_encrypted: u64,
108    pub crypto_errors:    u64,
109}
110
111// ---------------------------------------------------------------------------
112// Shared inner state (Arc-wrapped so the client is Clone)
113// ---------------------------------------------------------------------------
114
115struct Inner {
116    config: ClientConfig,
117    buffer: Mutex<VecDeque<BufferedEvent>>,
118    stats:  Mutex<ClientStats>,
119    http:   Client,
120}
121
122impl Inner {
123    fn new(config: ClientConfig) -> Self {
124        let mut headers = header::HeaderMap::new();
125        if let Some(ref key) = config.api_key {
126            if let Ok(val) = header::HeaderValue::from_str(&format!("Bearer {key}")) {
127                headers.insert(header::AUTHORIZATION, val);
128            }
129        }
130
131        let http = Client::builder()
132            .default_headers(headers)
133            .timeout(Duration::from_secs(10))
134            .build()
135            .expect("failed to build HTTP client");
136
137        Self {
138            config,
139            buffer: Mutex::new(VecDeque::new()),
140            stats:  Mutex::new(ClientStats::default()),
141            http,
142        }
143    }
144}
145
146// ---------------------------------------------------------------------------
147// WireBandClient
148// ---------------------------------------------------------------------------
149
150/// Gateway-side Wire.Band agent.
151///
152/// Clone-safe: all shared state is `Arc`-backed. Clone the client to pass
153/// into a connector task without needing explicit `Arc<Mutex<WireBandClient>>`.
154///
155/// # Example
156///
157/// ```ignore
158/// use wireband_edge::client::{WireBandClient, ClientConfig};
159///
160/// let client = WireBandClient::new(ClientConfig {
161///     backend_url: "http://localhost:8000".into(),
162///     device_id:   "rpi-01".into(),
163///     ..Default::default()
164/// });
165///
166/// client.start();
167/// client.ingest(serde_json::json!({"temp": 22.5}), "env/zone-a", None).await;
168/// client.flush().await;
169/// client.stop().await;
170/// ```
171#[derive(Clone)]
172pub struct WireBandClient {
173    inner:        Arc<Inner>,
174    // Lifecycle state is only meaningful on the "owner" clone.
175    shutdown_tx:  Arc<Mutex<Option<oneshot::Sender<()>>>>,
176    flush_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
177}
178
179impl WireBandClient {
180    pub fn new(config: ClientConfig) -> Self {
181        Self {
182            inner:        Arc::new(Inner::new(config)),
183            shutdown_tx:  Arc::new(Mutex::new(None)),
184            flush_handle: Arc::new(Mutex::new(None)),
185        }
186    }
187
188    // -----------------------------------------------------------------------
189    // Lifecycle
190    // -----------------------------------------------------------------------
191
192    /// Start the background flush loop.
193    pub fn start(&self) {
194        let (tx, mut rx) = oneshot::channel::<()>();
195        let inner = Arc::clone(&self.inner);
196
197        let handle = tokio::spawn(async move {
198            let mut interval = time::interval(inner.config.flush_interval);
199            loop {
200                tokio::select! {
201                    _ = interval.tick() => {
202                        Self::do_flush(&inner).await;
203                    }
204                    _ = &mut rx => {
205                        debug!("Flush loop shutting down");
206                        break;
207                    }
208                }
209            }
210        });
211
212        let tx_slot = Arc::clone(&self.shutdown_tx);
213        let handle_slot = Arc::clone(&self.flush_handle);
214        tokio::spawn(async move {
215            *tx_slot.lock().await     = Some(tx);
216            *handle_slot.lock().await = Some(handle);
217        });
218
219        debug!(
220            interval_ms = self.inner.config.flush_interval.as_millis(),
221            "WireBandClient flush loop started"
222        );
223    }
224
225    /// Stop the flush loop and perform a final drain flush.
226    pub async fn stop(&self) {
227        if let Some(tx) = self.shutdown_tx.lock().await.take() {
228            let _ = tx.send(());
229        }
230        if let Some(handle) = self.flush_handle.lock().await.take() {
231            let _ = handle.await;
232        }
233        // Final flush — drain remaining events
234        Self::do_flush(&self.inner).await;
235        info!("WireBandClient stopped. stats={:?}", self.stats().await);
236    }
237
238    // -----------------------------------------------------------------------
239    // Ingest
240    // -----------------------------------------------------------------------
241
242    /// Buffer a raw value (not from MQTT).
243    ///
244    /// `symbol` defaults to [`METRICS_GAUGE_SET`] when `None`.
245    pub async fn ingest(&self, data: Value, topic: &str, symbol: Option<u16>) {
246        let sym = symbol.unwrap_or(METRICS_GAUGE_SET);
247        let f   = self.maybe_encrypt(frame::encode(sym, topic, &data)).await;
248        self.push(BufferedEvent::new(topic.to_string(), sym, f)).await;
249    }
250
251    /// Buffer a pre-encoded event (produced by a connector).
252    pub async fn buffer_event(&self, topic: String, symbol: u16, frame: Vec<u8>, ts: f64) {
253        let frame = self.maybe_encrypt(frame).await;
254        self.push(BufferedEvent { topic, symbol, frame, ts, attempts: 0 }).await;
255    }
256
257    /// Encrypt a frame if a crypto context is configured; pass through otherwise.
258    async fn maybe_encrypt(&self, frame: Vec<u8>) -> Vec<u8> {
259        #[cfg(feature = "crypto")]
260        if let Some(ref ctx) = self.inner.config.crypto_ctx {
261            if ctx.is_active() {
262                match ctx.encrypt_frame(&frame) {
263                    Ok(enc) => {
264                        self.inner.stats.lock().await.frames_encrypted += 1;
265                        return enc;
266                    }
267                    Err(e) => {
268                        warn!("Frame encryption failed, sending plaintext: {e}");
269                        self.inner.stats.lock().await.crypto_errors += 1;
270                    }
271                }
272            }
273        }
274        frame
275    }
276
277    async fn push(&self, event: BufferedEvent) {
278        let mut buf   = self.inner.buffer.lock().await;
279        let mut stats = self.inner.stats.lock().await;
280        if buf.len() >= self.inner.config.buffer_size {
281            buf.pop_front(); // ring: drop oldest
282            stats.events_dropped += 1;
283        }
284        buf.push_back(event);
285        stats.events_ingested += 1;
286    }
287
288    // -----------------------------------------------------------------------
289    // Flush
290    // -----------------------------------------------------------------------
291
292    /// Manually flush buffered events. Returns number of events sent.
293    pub async fn flush(&self) -> u64 {
294        Self::do_flush(&self.inner).await
295    }
296
297    async fn do_flush(inner: &Arc<Inner>) -> u64 {
298        let batch: Vec<BufferedEvent> = {
299            let mut buf = inner.buffer.lock().await;
300            if buf.is_empty() { return 0; }
301            let n = buf.len().min(inner.config.flush_batch);
302            buf.drain(..n).collect()
303        };
304
305        let payload: Vec<Value> = batch
306            .iter()
307            .map(|e| e.to_ingest_json(&inner.config.device_id))
308            .collect();
309
310        let sent = Self::send_with_retry(inner, &payload).await;
311
312        if sent {
313            let n     = batch.len() as u64;
314            let bytes = batch.iter().map(|e| e.frame.len() as u64).sum::<u64>();
315            let mut s = inner.stats.lock().await;
316            s.events_flushed += n;
317            s.bytes_sent     += bytes;
318            n
319        } else {
320            // Re-queue at front so events aren't lost
321            let mut buf = inner.buffer.lock().await;
322            for event in batch.into_iter().rev() {
323                buf.push_front(event);
324            }
325            0
326        }
327    }
328
329    async fn send_with_retry(inner: &Arc<Inner>, payload: &[Value]) -> bool {
330        let url = format!(
331            "{}/iot/v1/ingest/batch",
332            inner.config.backend_url.trim_end_matches('/')
333        );
334        let body = serde_json::json!({ "events": payload });
335
336        for attempt in 0..inner.config.max_retries {
337            match inner.http.post(&url).json(&body).send().await {
338                Ok(resp) if resp.status().as_u16() < 500 => {
339                    debug!(events = payload.len(), attempt, "Flush OK");
340                    return true;
341                }
342                Ok(resp) => {
343                    warn!(
344                        attempt, max = inner.config.max_retries,
345                        status = resp.status().as_u16(),
346                        "Flush server error"
347                    );
348                }
349                Err(e) => {
350                    warn!(attempt, max = inner.config.max_retries, err = %e, "Flush network error");
351                }
352            }
353
354            if attempt + 1 < inner.config.max_retries {
355                let delay = inner.config.backoff_base.powi(attempt as i32);
356                time::sleep(Duration::from_secs_f64(delay)).await;
357            }
358        }
359
360        error!(retries = inner.config.max_retries, "Flush failed after all retries");
361        inner.stats.lock().await.flush_errors += 1;
362        false
363    }
364
365    // -----------------------------------------------------------------------
366    // Stats / introspection
367    // -----------------------------------------------------------------------
368
369    pub async fn stats(&self) -> ClientStats {
370        self.inner.stats.lock().await.clone()
371    }
372
373    pub fn config(&self) -> &ClientConfig {
374        &self.inner.config
375    }
376
377    /// Best-effort non-blocking buffer depth check.
378    pub fn buffer_depth(&self) -> usize {
379        self.inner.buffer.try_lock().map(|b| b.len()).unwrap_or(0)
380    }
381}