use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[cfg(feature = "crypto")]
use crate::crypto::CryptoContext;
use reqwest::{Client, header};
use serde_json::Value;
use tokio::sync::{Mutex, oneshot};
use tokio::time;
use tracing::{debug, error, info, warn};
use crate::frame;
use crate::symbols::METRICS_GAUGE_SET;
fn unix_ts() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
#[derive(Debug, Clone)]
pub struct BufferedEvent {
pub topic: String,
pub symbol: u16,
pub frame: Vec<u8>, pub ts: f64,
pub attempts: u32,
}
impl BufferedEvent {
pub fn new(topic: String, symbol: u16, frame: Vec<u8>) -> Self {
Self { topic, symbol, frame, ts: unix_ts(), attempts: 0 }
}
pub fn to_ingest_json(&self, device_id: &str) -> Value {
serde_json::json!({
"device_id": device_id,
"topic": self.topic,
"symbol": format!("{:#06x}", self.symbol),
"frame": frame::to_hex(&self.frame),
"ts": self.ts,
})
}
}
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub backend_url: String,
pub device_id: String,
pub api_key: Option<String>,
pub buffer_size: usize,
pub flush_interval: Duration,
pub flush_batch: usize,
pub max_retries: u32,
pub backoff_base: f64,
pub delta_threshold: f64,
#[cfg(feature = "crypto")]
pub crypto_ctx: Option<Arc<CryptoContext>>,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
backend_url: "http://localhost:8000".to_string(),
device_id: "edge-node".to_string(),
api_key: None,
buffer_size: 50_000,
flush_interval: Duration::from_secs(1),
flush_batch: 200,
max_retries: 3,
backoff_base: 2.0,
delta_threshold: 0.0,
#[cfg(feature = "crypto")]
crypto_ctx: None,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct ClientStats {
pub events_ingested: u64,
pub events_flushed: u64,
pub events_dropped: u64,
pub flush_errors: u64,
pub bytes_sent: u64,
pub frames_encrypted: u64,
pub crypto_errors: u64,
}
struct Inner {
config: ClientConfig,
buffer: Mutex<VecDeque<BufferedEvent>>,
stats: Mutex<ClientStats>,
http: Client,
}
impl Inner {
fn new(config: ClientConfig) -> Self {
let mut headers = header::HeaderMap::new();
if let Some(ref key) = config.api_key {
if let Ok(val) = header::HeaderValue::from_str(&format!("Bearer {key}")) {
headers.insert(header::AUTHORIZATION, val);
}
}
let http = Client::builder()
.default_headers(headers)
.timeout(Duration::from_secs(10))
.build()
.expect("failed to build HTTP client");
Self {
config,
buffer: Mutex::new(VecDeque::new()),
stats: Mutex::new(ClientStats::default()),
http,
}
}
}
#[derive(Clone)]
pub struct WireBandClient {
inner: Arc<Inner>,
shutdown_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
flush_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}
impl WireBandClient {
pub fn new(config: ClientConfig) -> Self {
Self {
inner: Arc::new(Inner::new(config)),
shutdown_tx: Arc::new(Mutex::new(None)),
flush_handle: Arc::new(Mutex::new(None)),
}
}
pub fn start(&self) {
let (tx, mut rx) = oneshot::channel::<()>();
let inner = Arc::clone(&self.inner);
let handle = tokio::spawn(async move {
let mut interval = time::interval(inner.config.flush_interval);
loop {
tokio::select! {
_ = interval.tick() => {
Self::do_flush(&inner).await;
}
_ = &mut rx => {
debug!("Flush loop shutting down");
break;
}
}
}
});
let tx_slot = Arc::clone(&self.shutdown_tx);
let handle_slot = Arc::clone(&self.flush_handle);
tokio::spawn(async move {
*tx_slot.lock().await = Some(tx);
*handle_slot.lock().await = Some(handle);
});
debug!(
interval_ms = self.inner.config.flush_interval.as_millis(),
"WireBandClient flush loop started"
);
}
pub async fn stop(&self) {
if let Some(tx) = self.shutdown_tx.lock().await.take() {
let _ = tx.send(());
}
if let Some(handle) = self.flush_handle.lock().await.take() {
let _ = handle.await;
}
Self::do_flush(&self.inner).await;
info!("WireBandClient stopped. stats={:?}", self.stats().await);
}
pub async fn ingest(&self, data: Value, topic: &str, symbol: Option<u16>) {
let sym = symbol.unwrap_or(METRICS_GAUGE_SET);
let f = self.maybe_encrypt(frame::encode(sym, topic, &data)).await;
self.push(BufferedEvent::new(topic.to_string(), sym, f)).await;
}
pub async fn buffer_event(&self, topic: String, symbol: u16, frame: Vec<u8>, ts: f64) {
let frame = self.maybe_encrypt(frame).await;
self.push(BufferedEvent { topic, symbol, frame, ts, attempts: 0 }).await;
}
async fn maybe_encrypt(&self, frame: Vec<u8>) -> Vec<u8> {
#[cfg(feature = "crypto")]
if let Some(ref ctx) = self.inner.config.crypto_ctx {
if ctx.is_active() {
match ctx.encrypt_frame(&frame) {
Ok(enc) => {
self.inner.stats.lock().await.frames_encrypted += 1;
return enc;
}
Err(e) => {
warn!("Frame encryption failed, sending plaintext: {e}");
self.inner.stats.lock().await.crypto_errors += 1;
}
}
}
}
frame
}
async fn push(&self, event: BufferedEvent) {
let mut buf = self.inner.buffer.lock().await;
let mut stats = self.inner.stats.lock().await;
if buf.len() >= self.inner.config.buffer_size {
buf.pop_front(); stats.events_dropped += 1;
}
buf.push_back(event);
stats.events_ingested += 1;
}
pub async fn flush(&self) -> u64 {
Self::do_flush(&self.inner).await
}
async fn do_flush(inner: &Arc<Inner>) -> u64 {
let batch: Vec<BufferedEvent> = {
let mut buf = inner.buffer.lock().await;
if buf.is_empty() { return 0; }
let n = buf.len().min(inner.config.flush_batch);
buf.drain(..n).collect()
};
let payload: Vec<Value> = batch
.iter()
.map(|e| e.to_ingest_json(&inner.config.device_id))
.collect();
let sent = Self::send_with_retry(inner, &payload).await;
if sent {
let n = batch.len() as u64;
let bytes = batch.iter().map(|e| e.frame.len() as u64).sum::<u64>();
let mut s = inner.stats.lock().await;
s.events_flushed += n;
s.bytes_sent += bytes;
n
} else {
let mut buf = inner.buffer.lock().await;
for event in batch.into_iter().rev() {
buf.push_front(event);
}
0
}
}
async fn send_with_retry(inner: &Arc<Inner>, payload: &[Value]) -> bool {
let url = format!(
"{}/iot/v1/ingest/batch",
inner.config.backend_url.trim_end_matches('/')
);
let body = serde_json::json!({ "events": payload });
for attempt in 0..inner.config.max_retries {
match inner.http.post(&url).json(&body).send().await {
Ok(resp) if resp.status().as_u16() < 500 => {
debug!(events = payload.len(), attempt, "Flush OK");
return true;
}
Ok(resp) => {
warn!(
attempt, max = inner.config.max_retries,
status = resp.status().as_u16(),
"Flush server error"
);
}
Err(e) => {
warn!(attempt, max = inner.config.max_retries, err = %e, "Flush network error");
}
}
if attempt + 1 < inner.config.max_retries {
let delay = inner.config.backoff_base.powi(attempt as i32);
time::sleep(Duration::from_secs_f64(delay)).await;
}
}
error!(retries = inner.config.max_retries, "Flush failed after all retries");
inner.stats.lock().await.flush_errors += 1;
false
}
pub async fn stats(&self) -> ClientStats {
self.inner.stats.lock().await.clone()
}
pub fn config(&self) -> &ClientConfig {
&self.inner.config
}
pub fn buffer_depth(&self) -> usize {
self.inner.buffer.try_lock().map(|b| b.len()).unwrap_or(0)
}
}