1use std::collections::VecDeque;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7#[cfg(feature = "crypto")]
8use crate::crypto::CryptoContext;
9
10use reqwest::{Client, header};
11use serde_json::Value;
12use tokio::sync::{Mutex, oneshot};
13use tokio::time;
14use tracing::{debug, error, info, warn};
15
16use crate::frame;
17use crate::symbols::METRICS_GAUGE_SET;
18
19fn unix_ts() -> f64 {
20 SystemTime::now()
21 .duration_since(UNIX_EPOCH)
22 .unwrap_or_default()
23 .as_secs_f64()
24}
25
26#[derive(Debug, Clone)]
31pub struct BufferedEvent {
32 pub topic: String,
33 pub symbol: u16,
34 pub frame: Vec<u8>, pub ts: f64,
36 pub attempts: u32,
37}
38
39impl BufferedEvent {
40 pub fn new(topic: String, symbol: u16, frame: Vec<u8>) -> Self {
41 Self { topic, symbol, frame, ts: unix_ts(), attempts: 0 }
42 }
43
44 pub fn to_ingest_json(&self, device_id: &str) -> Value {
46 serde_json::json!({
47 "device_id": device_id,
48 "topic": self.topic,
49 "symbol": format!("{:#06x}", self.symbol),
50 "frame": frame::to_hex(&self.frame),
51 "ts": self.ts,
52 })
53 }
54}
55
56#[derive(Debug, Clone)]
61pub struct ClientConfig {
62 pub backend_url: String,
63 pub device_id: String,
64 pub api_key: Option<String>,
65 pub buffer_size: usize,
66 pub flush_interval: Duration,
67 pub flush_batch: usize,
68 pub max_retries: u32,
69 pub backoff_base: f64,
70 pub delta_threshold: f64,
71 #[cfg(feature = "crypto")]
75 pub crypto_ctx: Option<Arc<CryptoContext>>,
76}
77
78impl Default for ClientConfig {
79 fn default() -> Self {
80 Self {
81 backend_url: "http://localhost:8000".to_string(),
82 device_id: "edge-node".to_string(),
83 api_key: None,
84 buffer_size: 50_000,
85 flush_interval: Duration::from_secs(1),
86 flush_batch: 200,
87 max_retries: 3,
88 backoff_base: 2.0,
89 delta_threshold: 0.0,
90 #[cfg(feature = "crypto")]
91 crypto_ctx: None,
92 }
93 }
94}
95
96#[derive(Debug, Default, Clone)]
101pub struct ClientStats {
102 pub events_ingested: u64,
103 pub events_flushed: u64,
104 pub events_dropped: u64,
105 pub flush_errors: u64,
106 pub bytes_sent: u64,
107 pub frames_encrypted: u64,
108 pub crypto_errors: u64,
109}
110
111struct Inner {
116 config: ClientConfig,
117 buffer: Mutex<VecDeque<BufferedEvent>>,
118 stats: Mutex<ClientStats>,
119 http: Client,
120}
121
122impl Inner {
123 fn new(config: ClientConfig) -> Self {
124 let mut headers = header::HeaderMap::new();
125 if let Some(ref key) = config.api_key {
126 if let Ok(val) = header::HeaderValue::from_str(&format!("Bearer {key}")) {
127 headers.insert(header::AUTHORIZATION, val);
128 }
129 }
130
131 let http = Client::builder()
132 .default_headers(headers)
133 .timeout(Duration::from_secs(10))
134 .build()
135 .expect("failed to build HTTP client");
136
137 Self {
138 config,
139 buffer: Mutex::new(VecDeque::new()),
140 stats: Mutex::new(ClientStats::default()),
141 http,
142 }
143 }
144}
145
146#[derive(Clone)]
172pub struct WireBandClient {
173 inner: Arc<Inner>,
174 shutdown_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
176 flush_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
177}
178
179impl WireBandClient {
180 pub fn new(config: ClientConfig) -> Self {
181 Self {
182 inner: Arc::new(Inner::new(config)),
183 shutdown_tx: Arc::new(Mutex::new(None)),
184 flush_handle: Arc::new(Mutex::new(None)),
185 }
186 }
187
188 pub fn start(&self) {
194 let (tx, mut rx) = oneshot::channel::<()>();
195 let inner = Arc::clone(&self.inner);
196
197 let handle = tokio::spawn(async move {
198 let mut interval = time::interval(inner.config.flush_interval);
199 loop {
200 tokio::select! {
201 _ = interval.tick() => {
202 Self::do_flush(&inner).await;
203 }
204 _ = &mut rx => {
205 debug!("Flush loop shutting down");
206 break;
207 }
208 }
209 }
210 });
211
212 let tx_slot = Arc::clone(&self.shutdown_tx);
213 let handle_slot = Arc::clone(&self.flush_handle);
214 tokio::spawn(async move {
215 *tx_slot.lock().await = Some(tx);
216 *handle_slot.lock().await = Some(handle);
217 });
218
219 debug!(
220 interval_ms = self.inner.config.flush_interval.as_millis(),
221 "WireBandClient flush loop started"
222 );
223 }
224
225 pub async fn stop(&self) {
227 if let Some(tx) = self.shutdown_tx.lock().await.take() {
228 let _ = tx.send(());
229 }
230 if let Some(handle) = self.flush_handle.lock().await.take() {
231 let _ = handle.await;
232 }
233 Self::do_flush(&self.inner).await;
235 info!("WireBandClient stopped. stats={:?}", self.stats().await);
236 }
237
238 pub async fn ingest(&self, data: Value, topic: &str, symbol: Option<u16>) {
246 let sym = symbol.unwrap_or(METRICS_GAUGE_SET);
247 let f = self.maybe_encrypt(frame::encode(sym, topic, &data)).await;
248 self.push(BufferedEvent::new(topic.to_string(), sym, f)).await;
249 }
250
251 pub async fn buffer_event(&self, topic: String, symbol: u16, frame: Vec<u8>, ts: f64) {
253 let frame = self.maybe_encrypt(frame).await;
254 self.push(BufferedEvent { topic, symbol, frame, ts, attempts: 0 }).await;
255 }
256
257 async fn maybe_encrypt(&self, frame: Vec<u8>) -> Vec<u8> {
259 #[cfg(feature = "crypto")]
260 if let Some(ref ctx) = self.inner.config.crypto_ctx {
261 if ctx.is_active() {
262 match ctx.encrypt_frame(&frame) {
263 Ok(enc) => {
264 self.inner.stats.lock().await.frames_encrypted += 1;
265 return enc;
266 }
267 Err(e) => {
268 warn!("Frame encryption failed, sending plaintext: {e}");
269 self.inner.stats.lock().await.crypto_errors += 1;
270 }
271 }
272 }
273 }
274 frame
275 }
276
277 async fn push(&self, event: BufferedEvent) {
278 let mut buf = self.inner.buffer.lock().await;
279 let mut stats = self.inner.stats.lock().await;
280 if buf.len() >= self.inner.config.buffer_size {
281 buf.pop_front(); stats.events_dropped += 1;
283 }
284 buf.push_back(event);
285 stats.events_ingested += 1;
286 }
287
288 pub async fn flush(&self) -> u64 {
294 Self::do_flush(&self.inner).await
295 }
296
297 async fn do_flush(inner: &Arc<Inner>) -> u64 {
298 let batch: Vec<BufferedEvent> = {
299 let mut buf = inner.buffer.lock().await;
300 if buf.is_empty() { return 0; }
301 let n = buf.len().min(inner.config.flush_batch);
302 buf.drain(..n).collect()
303 };
304
305 let payload: Vec<Value> = batch
306 .iter()
307 .map(|e| e.to_ingest_json(&inner.config.device_id))
308 .collect();
309
310 let sent = Self::send_with_retry(inner, &payload).await;
311
312 if sent {
313 let n = batch.len() as u64;
314 let bytes = batch.iter().map(|e| e.frame.len() as u64).sum::<u64>();
315 let mut s = inner.stats.lock().await;
316 s.events_flushed += n;
317 s.bytes_sent += bytes;
318 n
319 } else {
320 let mut buf = inner.buffer.lock().await;
322 for event in batch.into_iter().rev() {
323 buf.push_front(event);
324 }
325 0
326 }
327 }
328
329 async fn send_with_retry(inner: &Arc<Inner>, payload: &[Value]) -> bool {
330 let url = format!(
331 "{}/iot/v1/ingest/batch",
332 inner.config.backend_url.trim_end_matches('/')
333 );
334 let body = serde_json::json!({ "events": payload });
335
336 for attempt in 0..inner.config.max_retries {
337 match inner.http.post(&url).json(&body).send().await {
338 Ok(resp) if resp.status().as_u16() < 500 => {
339 debug!(events = payload.len(), attempt, "Flush OK");
340 return true;
341 }
342 Ok(resp) => {
343 warn!(
344 attempt, max = inner.config.max_retries,
345 status = resp.status().as_u16(),
346 "Flush server error"
347 );
348 }
349 Err(e) => {
350 warn!(attempt, max = inner.config.max_retries, err = %e, "Flush network error");
351 }
352 }
353
354 if attempt + 1 < inner.config.max_retries {
355 let delay = inner.config.backoff_base.powi(attempt as i32);
356 time::sleep(Duration::from_secs_f64(delay)).await;
357 }
358 }
359
360 error!(retries = inner.config.max_retries, "Flush failed after all retries");
361 inner.stats.lock().await.flush_errors += 1;
362 false
363 }
364
365 pub async fn stats(&self) -> ClientStats {
370 self.inner.stats.lock().await.clone()
371 }
372
373 pub fn config(&self) -> &ClientConfig {
374 &self.inner.config
375 }
376
377 pub fn buffer_depth(&self) -> usize {
379 self.inner.buffer.try_lock().map(|b| b.len()).unwrap_or(0)
380 }
381}