1use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::collections::HashSet;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12use thiserror::Error;
13use tokio::sync::{broadcast, Mutex, Notify};
14use tracing::{debug, warn};
15
16use crate::signals::auth_coverage::AuthCoverageSummary;
17
18use crate::utils::circuit_breaker::CircuitBreaker;
19
20pub mod auth_coverage_aggregator;
21
22#[derive(Debug, Error)]
24pub enum TelemetryError {
25 #[error("telemetry endpoint unreachable: {message}")]
26 EndpointUnreachable { message: String },
27
28 #[error("circuit breaker open, rejecting telemetry")]
29 CircuitBreakerOpen,
30
31 #[error("buffer overflow, {dropped} events dropped")]
32 BufferOverflow { dropped: usize },
33
34 #[error("serialization error: {0}")]
35 SerializationError(String),
36
37 #[error("send timeout after {elapsed:?}")]
38 Timeout { elapsed: Duration },
39}
40
41pub type TelemetryResult<T> = Result<T, TelemetryError>;
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum EventType {
47 RequestProcessed,
48 WafBlock,
49 RateLimitHit,
50 ConfigReload,
51 ServiceHealth,
52 SensorReport,
53 CampaignReport,
54 AuthCoverage,
55 LogEntry,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(tag = "event_type", content = "data", rename_all = "snake_case")]
61pub enum TelemetryEvent {
62 RequestProcessed {
63 #[serde(skip_serializing_if = "Option::is_none")]
64 request_id: Option<String>,
65 latency_ms: u64,
66 status_code: u16,
67 waf_action: Option<String>,
68 site: String,
69 method: String,
70 path: String,
71 },
72 LogEntry {
73 #[serde(skip_serializing_if = "Option::is_none")]
74 request_id: Option<String>,
75 id: String,
76 source: String,
77 level: String,
78 message: String,
79 log_timestamp_ms: u64,
80 fields: Option<serde_json::Value>,
81 method: Option<String>,
82 path: Option<String>,
83 status_code: Option<u16>,
84 latency_ms: Option<f64>,
85 client_ip: Option<String>,
86 rule_id: Option<String>,
87 },
88 WafBlock {
89 #[serde(skip_serializing_if = "Option::is_none")]
90 request_id: Option<String>,
91 rule_id: String,
92 severity: String,
93 client_ip: String,
94 site: String,
95 path: String,
96 },
97 RateLimitHit {
98 #[serde(skip_serializing_if = "Option::is_none")]
99 request_id: Option<String>,
100 client_ip: String,
101 limit: u32,
102 window_secs: u32,
103 site: String,
104 },
105 ConfigReload {
106 sites_loaded: usize,
107 duration_ms: u64,
108 success: bool,
109 error: Option<String>,
110 },
111 ServiceHealth {
112 uptime_secs: u64,
113 memory_mb: u64,
114 active_connections: u64,
115 requests_per_sec: f64,
116 },
117 SensorReport {
118 sensor_id: String,
119 actor: ExternalActorContext,
120 signal: ExternalSignalContext,
121 request: ExternalRequestContext,
122 },
123 CampaignReport {
124 campaign_id: String,
125 confidence: f64,
126 attack_types: Vec<String>,
127 actor_count: usize,
128 correlation_reasons: Vec<String>,
129 timestamp_ms: u64,
130 },
131 AuthCoverage(AuthCoverageSummary),
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct ExternalActorContext {
136 pub ip: String,
137 pub session_id: Option<String>,
138 pub fingerprint: Option<String>,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ExternalSignalContext {
143 #[serde(rename = "type")]
144 pub signal_type: String,
145 pub severity: String,
146 pub details: serde_json::Value,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct ExternalRequestContext {
151 pub path: String,
152 pub method: String,
153 pub user_agent: Option<String>,
154}
155
156impl TelemetryEvent {
157 pub fn event_type(&self) -> EventType {
158 match self {
159 Self::RequestProcessed { .. } => EventType::RequestProcessed,
160 Self::WafBlock { .. } => EventType::WafBlock,
161 Self::RateLimitHit { .. } => EventType::RateLimitHit,
162 Self::ConfigReload { .. } => EventType::ConfigReload,
163 Self::ServiceHealth { .. } => EventType::ServiceHealth,
164 Self::SensorReport { .. } => EventType::SensorReport,
165 Self::CampaignReport { .. } => EventType::CampaignReport,
166 Self::AuthCoverage(_) => EventType::AuthCoverage,
167 Self::LogEntry { .. } => EventType::LogEntry,
168 }
169 }
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct TimestampedEvent {
175 pub timestamp_ms: u64,
176 pub instance_id: Option<String>,
177 pub event: TelemetryEvent,
178}
179
180impl TimestampedEvent {
181 pub fn new(event: TelemetryEvent, instance_id: Option<String>) -> Self {
182 let timestamp_ms = SystemTime::now()
183 .duration_since(UNIX_EPOCH)
184 .unwrap_or_default()
185 .as_millis() as u64;
186 Self {
187 timestamp_ms,
188 instance_id,
189 event,
190 }
191 }
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct TelemetryBatch {
197 pub batch_id: String,
198 pub events: Vec<TimestampedEvent>,
199 pub created_at_ms: u64,
200}
201
202impl TelemetryBatch {
203 pub fn new(events: Vec<TimestampedEvent>) -> Self {
204 let created_at_ms = SystemTime::now()
205 .duration_since(UNIX_EPOCH)
206 .unwrap_or_default()
207 .as_millis() as u64;
208 let batch_id = format!("batch-{}", created_at_ms);
209 Self {
210 batch_id,
211 events,
212 created_at_ms,
213 }
214 }
215
216 pub fn len(&self) -> usize {
217 self.events.len()
218 }
219
220 pub fn is_empty(&self) -> bool {
221 self.events.is_empty()
222 }
223}
224
225#[derive(Debug)]
227pub struct TelemetryBuffer {
228 events: Mutex<Vec<TimestampedEvent>>,
229 max_size: usize,
230 dropped: AtomicU64,
231 notify: Notify,
232}
233
234impl TelemetryBuffer {
235 pub fn new(max_size: usize) -> Self {
236 Self {
237 events: Mutex::new(Vec::with_capacity(max_size.min(1000))),
238 max_size,
239 dropped: AtomicU64::new(0),
240 notify: Notify::new(),
241 }
242 }
243
244 pub async fn push(&self, event: TimestampedEvent) -> bool {
245 let mut events = self.events.lock().await;
246 if events.len() >= self.max_size {
247 let dropped = self.dropped.fetch_add(1, Ordering::SeqCst);
248 if dropped.is_multiple_of(100) {
249 warn!(
250 event_type = ?event.event.event_type(),
251 total_dropped = dropped + 1,
252 "Telemetry buffer overflow, dropping event"
253 );
254 }
255 return false;
256 }
257 events.push(event);
258 self.notify.notify_one();
259 true
260 }
261
262 pub async fn drain(&self) -> Vec<TimestampedEvent> {
263 let mut events = self.events.lock().await;
264 std::mem::take(&mut *events)
265 }
266
267 pub async fn len(&self) -> usize {
268 self.events.lock().await.len()
269 }
270
271 pub fn dropped_count(&self) -> u64 {
272 self.dropped.load(Ordering::SeqCst)
273 }
274
275 pub fn notified(&self) -> impl std::future::Future<Output = ()> + '_ {
276 self.notify.notified()
277 }
278}
279
280impl Default for TelemetryBuffer {
281 fn default() -> Self {
282 Self::new(10_000)
283 }
284}
285
286#[derive(Debug, Default)]
288pub struct TelemetryStats {
289 pub events_sent: AtomicU64,
290 pub batches_sent: AtomicU64,
291 pub send_failures: AtomicU64,
292 pub retries: AtomicU64,
293}
294
295impl TelemetryStats {
296 pub fn snapshot(&self) -> TelemetryStatsSnapshot {
297 TelemetryStatsSnapshot {
298 events_sent: self.events_sent.load(Ordering::SeqCst),
299 batches_sent: self.batches_sent.load(Ordering::SeqCst),
300 send_failures: self.send_failures.load(Ordering::SeqCst),
301 retries: self.retries.load(Ordering::SeqCst),
302 }
303 }
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct TelemetryStatsSnapshot {
308 pub events_sent: u64,
309 pub batches_sent: u64,
310 pub send_failures: u64,
311 pub retries: u64,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct TelemetryConfig {
317 pub enabled: bool,
318 pub endpoint: String,
319 pub api_key: Option<String>,
320 pub batch_size: usize,
321 pub flush_interval: Duration,
322 pub max_retries: u32,
323 pub initial_backoff: Duration,
324 pub max_backoff: Duration,
325 pub max_buffer_size: usize,
326 pub circuit_breaker_threshold: u64,
327 pub circuit_breaker_timeout: Duration,
328 #[serde(default)]
329 pub enabled_events: HashSet<EventType>,
330 pub instance_id: Option<String>,
331 #[serde(default)]
333 pub dry_run: bool,
334}
335
336impl Default for TelemetryConfig {
337 fn default() -> Self {
338 Self {
339 enabled: true,
340 endpoint: "http://localhost:3100/telemetry".to_string(),
341 api_key: None,
342 batch_size: 100,
343 flush_interval: Duration::from_secs(10),
344 max_retries: 3,
345 initial_backoff: Duration::from_millis(100),
346 max_backoff: Duration::from_secs(30),
347 max_buffer_size: 10_000,
348 circuit_breaker_threshold: 5,
349 circuit_breaker_timeout: Duration::from_secs(60),
350 enabled_events: HashSet::new(),
351 instance_id: None,
352 dry_run: false,
353 }
354 }
355}
356
357impl TelemetryConfig {
358 pub fn new(endpoint: impl Into<String>) -> Self {
359 Self {
360 endpoint: endpoint.into(),
361 ..Default::default()
362 }
363 }
364
365 pub fn with_api_key(mut self, key: impl Into<String>) -> Self {
366 self.api_key = Some(key.into());
367 self
368 }
369
370 pub fn with_batch_size(mut self, size: usize) -> Self {
371 self.batch_size = size;
372 self
373 }
374
375 pub fn with_flush_interval(mut self, interval: Duration) -> Self {
376 self.flush_interval = interval;
377 self
378 }
379
380 pub fn with_instance_id(mut self, id: impl Into<String>) -> Self {
381 self.instance_id = Some(id.into());
382 self
383 }
384
385 pub fn with_enabled_events(mut self, events: HashSet<EventType>) -> Self {
386 self.enabled_events = events;
387 self
388 }
389
390 pub fn is_event_enabled(&self, event_type: &EventType) -> bool {
391 self.enabled_events.is_empty() || self.enabled_events.contains(event_type)
392 }
393}
394
395#[derive(Clone)] pub struct TelemetryClient {
398 config: TelemetryConfig,
399 buffer: Arc<TelemetryBuffer>,
400 circuit_breaker: Arc<CircuitBreaker>,
401 stats: Arc<TelemetryStats>,
402 shutdown: broadcast::Sender<()>,
403}
404
405impl TelemetryClient {
406 pub fn new(config: TelemetryConfig) -> Self {
407 let buffer = Arc::new(TelemetryBuffer::new(config.max_buffer_size));
408 let circuit_breaker = Arc::new(CircuitBreaker::new(
409 config.circuit_breaker_threshold,
410 config.circuit_breaker_timeout,
411 ));
412 let (shutdown, _) = broadcast::channel(1);
413
414 Self {
415 config,
416 buffer,
417 circuit_breaker,
418 stats: Arc::new(TelemetryStats::default()),
419 shutdown,
420 }
421 }
422
423 fn apply_auth_headers(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
424 if let Some(ref key) = self.config.api_key {
425 request
426 .bearer_auth(key)
427 .header("X-API-Key", key)
428 .header("X-Admin-Key", key)
429 } else {
430 request
431 }
432 }
433
434 pub async fn record(&self, event: TelemetryEvent) -> TelemetryResult<()> {
436 if !self.config.enabled {
437 return Ok(());
438 }
439
440 if !self.config.is_event_enabled(&event.event_type()) {
441 return Ok(());
442 }
443
444 let timestamped = TimestampedEvent::new(event, self.config.instance_id.clone());
445 if !self.buffer.push(timestamped).await {
446 return Err(TelemetryError::BufferOverflow { dropped: 1 });
447 }
448
449 if self.buffer.len().await >= self.config.batch_size {
451 self.flush().await?;
452 }
453
454 Ok(())
455 }
456
457 pub async fn flush(&self) -> TelemetryResult<()> {
459 if !self.config.enabled {
460 return Ok(());
461 }
462
463 if !self.circuit_breaker.allow_request().await {
464 return Err(TelemetryError::CircuitBreakerOpen);
465 }
466
467 let events = self.buffer.drain().await;
468 if events.is_empty() {
469 return Ok(());
470 }
471
472 let batch = TelemetryBatch::new(events);
473 self.send_batch_with_retry(&batch).await
474 }
475
476 pub async fn report(&self, event: TelemetryEvent) -> TelemetryResult<()> {
479 if !self.config.enabled {
480 return Ok(());
481 }
482
483 let payload = match event {
485 TelemetryEvent::SensorReport {
486 sensor_id,
487 actor,
488 signal,
489 request,
490 } => {
491 serde_json::json!({
492 "sensorId": sensor_id,
493 "actor": actor,
494 "signal": signal,
495 "request": request,
496 "timestamp": TimestampedEvent::new(TelemetryEvent::ServiceHealth {
497 uptime_secs: 0, memory_mb: 0, active_connections: 0, requests_per_sec: 0.0
498 }, None).timestamp_ms
499 })
500 }
501 _ => serde_json::to_value(&event)
502 .map_err(|e| TelemetryError::SerializationError(e.to_string()))?,
503 };
504
505 let client = reqwest::Client::new();
506 let request = client
507 .post(&self.config.endpoint)
508 .json(&payload)
509 .timeout(Duration::from_secs(2));
510 let response = self.apply_auth_headers(request).send().await.map_err(|e| {
511 TelemetryError::EndpointUnreachable {
512 message: e.to_string(),
513 }
514 })?;
515
516 if !response.status().is_success() {
517 return Err(TelemetryError::EndpointUnreachable {
518 message: format!("HTTP {}", response.status()),
519 });
520 }
521
522 Ok(())
523 }
524
525 async fn send_batch_with_retry(&self, batch: &TelemetryBatch) -> TelemetryResult<()> {
526 if self.config.dry_run {
528 self.stats
529 .events_sent
530 .fetch_add(batch.len() as u64, Ordering::SeqCst);
531 self.stats.batches_sent.fetch_add(1, Ordering::SeqCst);
532 return Ok(());
533 }
534
535 let mut backoff = self.config.initial_backoff;
536
537 for attempt in 0..=self.config.max_retries {
538 match self.send_batch(batch).await {
539 Ok(()) => {
540 self.circuit_breaker.record_success().await;
541 self.stats
542 .events_sent
543 .fetch_add(batch.len() as u64, Ordering::SeqCst);
544 self.stats.batches_sent.fetch_add(1, Ordering::SeqCst);
545 return Ok(());
546 }
547 Err(e) => {
548 if attempt < self.config.max_retries {
549 self.stats.retries.fetch_add(1, Ordering::SeqCst);
550 debug!(
551 "Telemetry send failed (attempt {}), retrying: {}",
552 attempt + 1,
553 e
554 );
555 tokio::time::sleep(backoff).await;
556 backoff = backoff
557 .checked_mul(2)
558 .unwrap_or(self.config.max_backoff)
559 .min(self.config.max_backoff);
560 } else {
561 self.circuit_breaker.record_failure().await;
562 self.stats.send_failures.fetch_add(1, Ordering::SeqCst);
563 warn!(
564 "Telemetry send failed after {} retries: {}",
565 self.config.max_retries, e
566 );
567 return Err(e);
568 }
569 }
570 }
571 }
572
573 Ok(())
574 }
575
576 async fn send_batch(&self, batch: &TelemetryBatch) -> TelemetryResult<()> {
577 debug!(
578 batch_id = %batch.batch_id,
579 event_count = batch.len(),
580 "Sending telemetry batch to {}",
581 self.config.endpoint
582 );
583
584 let payload = serde_json::json!({
586 "batch_id": batch.batch_id.to_string(),
587 "events": batch.events,
588 "timestamp_ms": SystemTime::now()
589 .duration_since(UNIX_EPOCH)
590 .map(|d| d.as_millis() as u64)
591 .unwrap_or(0),
592 });
593
594 let client = reqwest::Client::builder()
596 .timeout(Duration::from_secs(5))
597 .build()
598 .map_err(|e| TelemetryError::EndpointUnreachable {
599 message: e.to_string(),
600 })?;
601
602 let response = client.post(&self.config.endpoint).json(&payload);
603 let response = self
604 .apply_auth_headers(response)
605 .send()
606 .await
607 .map_err(|e| {
608 warn!(error = %e, "Telemetry batch send failed");
609 TelemetryError::EndpointUnreachable {
610 message: e.to_string(),
611 }
612 })?;
613
614 if !response.status().is_success() {
615 let status = response.status();
616 warn!(status = %status, "Telemetry endpoint returned error status");
617 return Err(TelemetryError::EndpointUnreachable {
618 message: format!("HTTP {}", status),
619 });
620 }
621
622 debug!(batch_id = %batch.batch_id, "Telemetry batch sent successfully");
623 Ok(())
624 }
625
626 pub fn start_background_flush(&self) -> tokio::task::JoinHandle<()> {
628 let client = self.clone();
629 let mut shutdown = client.shutdown.subscribe();
630
631 tokio::spawn(async move {
632 let mut interval = tokio::time::interval(client.config.flush_interval);
633
634 loop {
635 tokio::select! {
636 _ = interval.tick() => {
637 if let Err(err) = client.flush().await {
638 debug!("Background telemetry flush failed: {}", err);
639 }
640 }
641 _ = shutdown.recv() => {
642 if let Err(err) = client.flush().await {
643 debug!("Final telemetry flush failed: {}", err);
644 }
645 break;
646 }
647 }
648 }
649 })
650 }
651
652 pub fn shutdown(&self) {
654 let _ = self.shutdown.send(());
655 }
656
657 pub fn stats(&self) -> TelemetryStatsSnapshot {
659 self.stats.snapshot()
660 }
661
662 pub fn dropped_count(&self) -> u64 {
664 self.buffer.dropped_count()
665 }
666
667 pub fn is_enabled(&self) -> bool {
669 self.config.enabled
670 }
671}
672
673#[async_trait]
675pub trait SignalEmitter: Send + Sync {
676 async fn emit(&self, signal_type: &str, payload: serde_json::Value);
677}
678
679#[async_trait]
680impl SignalEmitter for TelemetryClient {
681 async fn emit(&self, signal_type: &str, payload: serde_json::Value) {
682 if signal_type == "auth_coverage_summary" {
683 if let Ok(summary) = serde_json::from_value::<AuthCoverageSummary>(payload) {
684 let _ = self.record(TelemetryEvent::AuthCoverage(summary)).await;
688 }
689 }
690 }
691}
692
693pub fn request_processed(
695 latency_ms: u64,
696 status_code: u16,
697 waf_action: Option<String>,
698 site: String,
699 method: String,
700 path: String,
701) -> TelemetryEvent {
702 TelemetryEvent::RequestProcessed {
703 request_id: None,
704 latency_ms,
705 status_code,
706 waf_action,
707 site,
708 method,
709 path,
710 }
711}
712
713pub fn waf_block(
714 rule_id: String,
715 severity: String,
716 client_ip: String,
717 site: String,
718 path: String,
719) -> TelemetryEvent {
720 TelemetryEvent::WafBlock {
721 request_id: None,
722 rule_id,
723 severity,
724 client_ip,
725 site,
726 path,
727 }
728}
729
730pub fn rate_limit_hit(
731 request_id: Option<String>,
732 client_ip: String,
733 limit: u32,
734 window_secs: u32,
735 site: String,
736) -> TelemetryEvent {
737 TelemetryEvent::RateLimitHit {
738 request_id,
739 client_ip,
740 limit,
741 window_secs,
742 site,
743 }
744}
745
746pub fn config_reload(
747 sites_loaded: usize,
748 duration_ms: u64,
749 success: bool,
750 error: Option<String>,
751) -> TelemetryEvent {
752 TelemetryEvent::ConfigReload {
753 sites_loaded,
754 duration_ms,
755 success,
756 error,
757 }
758}
759
760pub fn service_health(
761 uptime_secs: u64,
762 memory_mb: u64,
763 active_connections: u64,
764 requests_per_sec: f64,
765) -> TelemetryEvent {
766 TelemetryEvent::ServiceHealth {
767 uptime_secs,
768 memory_mb,
769 active_connections,
770 requests_per_sec,
771 }
772}
773
774#[cfg(test)]
775mod tests {
776 use super::*;
777 use crate::utils::circuit_breaker::CircuitState;
778
779 fn test_config() -> TelemetryConfig {
780 TelemetryConfig {
781 enabled: true,
782 endpoint: "http://test:8080/telemetry".to_string(),
783 batch_size: 10,
784 flush_interval: Duration::from_millis(100),
785 max_buffer_size: 100,
786 dry_run: true, ..Default::default()
788 }
789 }
790
791 #[test]
792 fn test_config_defaults() {
793 let config = TelemetryConfig::default();
794 assert!(config.enabled);
795 assert_eq!(config.batch_size, 100);
796 assert_eq!(config.max_retries, 3);
797 }
798
799 #[test]
800 fn test_config_builder() {
801 let config = TelemetryConfig::new("http://custom:9000")
802 .with_api_key("secret")
803 .with_batch_size(50)
804 .with_instance_id("node-1");
805
806 assert_eq!(config.endpoint, "http://custom:9000");
807 assert_eq!(config.api_key, Some("secret".to_string()));
808 assert_eq!(config.batch_size, 50);
809 assert_eq!(config.instance_id, Some("node-1".to_string()));
810 }
811
812 #[test]
813 fn test_event_type_classification() {
814 let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
815 assert_eq!(event.event_type(), EventType::RequestProcessed);
816
817 let event = waf_block(
818 "rule-1".into(),
819 "high".into(),
820 "1.2.3.4".into(),
821 "site".into(),
822 "/".into(),
823 );
824 assert_eq!(event.event_type(), EventType::WafBlock);
825
826 let event = TelemetryEvent::LogEntry {
827 request_id: None,
828 id: "log-1".into(),
829 source: "access".into(),
830 level: "info".into(),
831 message: "GET /".into(),
832 log_timestamp_ms: 1234,
833 fields: None,
834 method: Some("GET".into()),
835 path: Some("/".into()),
836 status_code: Some(200),
837 latency_ms: Some(12.5),
838 client_ip: Some("203.0.113.1".into()),
839 rule_id: None,
840 };
841 assert_eq!(event.event_type(), EventType::LogEntry);
842
843 let event = rate_limit_hit(None, "1.2.3.4".into(), 100, 60, "site".into());
844 assert_eq!(event.event_type(), EventType::RateLimitHit);
845
846 let event = config_reload(5, 100, true, None);
847 assert_eq!(event.event_type(), EventType::ConfigReload);
848
849 let event = service_health(3600, 512, 100, 1000.0);
850 assert_eq!(event.event_type(), EventType::ServiceHealth);
851 }
852
853 #[test]
854 fn test_event_serialization() {
855 let event = request_processed(
856 100,
857 200,
858 Some("pass".into()),
859 "site".into(),
860 "GET".into(),
861 "/api".into(),
862 );
863 let json = serde_json::to_string(&event).unwrap();
864 assert!(json.contains("request_processed"));
865 assert!(json.contains("100"));
866 }
867
868 #[test]
869 fn test_request_id_serialization() {
870 let event = TelemetryEvent::RequestProcessed {
871 request_id: Some("req_123".into()),
872 latency_ms: 100,
873 status_code: 200,
874 waf_action: None,
875 site: "site".into(),
876 method: "GET".into(),
877 path: "/".into(),
878 };
879 let value = serde_json::to_value(&event).unwrap();
880 assert_eq!(value["event_type"], "request_processed");
881 assert_eq!(value["data"]["request_id"], "req_123");
882 }
883
884 #[test]
885 fn test_log_entry_request_id_serialization() {
886 let event = TelemetryEvent::LogEntry {
887 request_id: Some("req_456".into()),
888 id: "log-1".into(),
889 source: "access".into(),
890 level: "info".into(),
891 message: "GET /".into(),
892 log_timestamp_ms: 1234,
893 fields: None,
894 method: Some("GET".into()),
895 path: Some("/".into()),
896 status_code: Some(200),
897 latency_ms: Some(12.5),
898 client_ip: Some("203.0.113.1".into()),
899 rule_id: None,
900 };
901 let value = serde_json::to_value(&event).unwrap();
902 assert_eq!(value["event_type"], "log_entry");
903 assert_eq!(value["data"]["request_id"], "req_456");
904 }
905
906 #[test]
907 fn test_timestamped_event() {
908 let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
909 let timestamped = TimestampedEvent::new(event, Some("node-1".to_string()));
910
911 assert!(timestamped.timestamp_ms > 0);
912 assert_eq!(timestamped.instance_id, Some("node-1".to_string()));
913 }
914
915 #[test]
916 fn test_batch_creation() {
917 let events: Vec<TimestampedEvent> = (0..5)
918 .map(|i| {
919 let event =
920 request_processed(i * 10, 200, None, "site".into(), "GET".into(), "/".into());
921 TimestampedEvent::new(event, None)
922 })
923 .collect();
924
925 let batch = TelemetryBatch::new(events);
926 assert_eq!(batch.len(), 5);
927 assert!(!batch.is_empty());
928 assert!(batch.batch_id.starts_with("batch-"));
929 }
930
931 #[tokio::test]
932 async fn test_circuit_breaker_closed() {
933 let cb = CircuitBreaker::default();
934 assert_eq!(cb.state().await, CircuitState::Closed);
935 assert!(cb.allow_request().await);
936 }
937
938 #[tokio::test]
939 async fn test_circuit_breaker_opens_on_failures() {
940 let cb = CircuitBreaker::new(3, Duration::from_secs(60));
941
942 for _ in 0..3 {
943 cb.record_failure().await;
944 }
945
946 assert_eq!(cb.state().await, CircuitState::Open);
947 assert!(!cb.allow_request().await);
948 }
949
950 #[tokio::test]
951 async fn test_circuit_breaker_success_resets() {
952 let cb = CircuitBreaker::new(3, Duration::from_secs(60));
953
954 cb.record_failure().await;
955 cb.record_failure().await;
956 cb.record_success().await;
957
958 assert_eq!(cb.state().await, CircuitState::Closed);
959 assert!(cb.allow_request().await);
960 }
961
962 #[tokio::test]
963 async fn test_buffer_push_and_drain() {
964 let buffer = TelemetryBuffer::new(10);
965 let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
966 let timestamped = TimestampedEvent::new(event, None);
967
968 assert!(buffer.push(timestamped).await);
969 assert_eq!(buffer.len().await, 1);
970
971 let drained = buffer.drain().await;
972 assert_eq!(drained.len(), 1);
973 assert_eq!(buffer.len().await, 0);
974 }
975
976 #[tokio::test]
977 async fn test_buffer_overflow() {
978 let buffer = TelemetryBuffer::new(2);
979
980 for _ in 0..3 {
981 let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
982 let timestamped = TimestampedEvent::new(event, None);
983 buffer.push(timestamped).await;
984 }
985
986 assert_eq!(buffer.len().await, 2);
987 assert_eq!(buffer.dropped_count(), 1);
988 }
989
990 #[tokio::test]
991 async fn test_client_record_event() {
992 let client = TelemetryClient::new(test_config());
993 let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
994
995 let result = client.record(event).await;
996 assert!(result.is_ok());
997 }
998
999 #[tokio::test]
1000 async fn test_client_flush() {
1001 let client = TelemetryClient::new(test_config());
1002
1003 for _ in 0..5 {
1004 let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
1005 client.record(event).await.unwrap();
1006 }
1007
1008 let result = client.flush().await;
1009 assert!(result.is_ok());
1010
1011 let stats = client.stats();
1012 assert_eq!(stats.events_sent, 5);
1013 assert_eq!(stats.batches_sent, 1);
1014 }
1015
1016 #[tokio::test]
1017 async fn test_client_auto_flush_on_batch_size() {
1018 let mut config = test_config();
1019 config.batch_size = 3;
1020 let client = TelemetryClient::new(config);
1021
1022 for _ in 0..3 {
1023 let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
1024 client.record(event).await.unwrap();
1025 }
1026
1027 let stats = client.stats();
1029 assert_eq!(stats.events_sent, 3);
1030 }
1031
1032 #[tokio::test]
1033 async fn test_client_disabled() {
1034 let mut config = test_config();
1035 config.enabled = false;
1036 let client = TelemetryClient::new(config);
1037
1038 let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
1039 let result = client.record(event).await;
1040 assert!(result.is_ok());
1041
1042 let stats = client.stats();
1043 assert_eq!(stats.events_sent, 0);
1044 }
1045
1046 #[tokio::test]
1047 async fn test_client_event_filtering() {
1048 let mut config = test_config();
1049 config.enabled_events = [EventType::WafBlock].into_iter().collect();
1050 let client = TelemetryClient::new(config);
1051
1052 let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
1054 client.record(event).await.unwrap();
1055
1056 let event = waf_block(
1058 "rule-1".into(),
1059 "high".into(),
1060 "1.2.3.4".into(),
1061 "site".into(),
1062 "/".into(),
1063 );
1064 client.record(event).await.unwrap();
1065
1066 client.flush().await.unwrap();
1067 let stats = client.stats();
1068 assert_eq!(stats.events_sent, 1);
1069 }
1070
1071 #[test]
1072 fn test_stats_snapshot() {
1073 let stats = TelemetryStats::default();
1074 stats.events_sent.store(100, Ordering::SeqCst);
1075 stats.batches_sent.store(10, Ordering::SeqCst);
1076
1077 let snapshot = stats.snapshot();
1078 assert_eq!(snapshot.events_sent, 100);
1079 assert_eq!(snapshot.batches_sent, 10);
1080 }
1081
1082 #[test]
1083 fn test_config_event_enabled() {
1084 let mut config = TelemetryConfig::default();
1085 assert!(config.is_event_enabled(&EventType::WafBlock));
1087
1088 config.enabled_events = [EventType::WafBlock].into_iter().collect();
1089 assert!(config.is_event_enabled(&EventType::WafBlock));
1090 assert!(!config.is_event_enabled(&EventType::RequestProcessed));
1091 }
1092}