Skip to main content

synapse_pingora/telemetry/
mod.rs

1//! Signal Horizon Telemetry Integration Module
2//!
3//! Provides resilient asynchronous telemetry delivery with batching,
4//! retry logic, and circuit breaker patterns for the synapse-pingora WAF proxy.
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::collections::HashSet;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12use thiserror::Error;
13use tokio::sync::{broadcast, Mutex, Notify};
14use tracing::{debug, warn};
15
16use crate::signals::auth_coverage::AuthCoverageSummary;
17
18use crate::utils::circuit_breaker::CircuitBreaker;
19
20pub mod auth_coverage_aggregator;
21
22/// Telemetry-specific errors.
23#[derive(Debug, Error)]
24pub enum TelemetryError {
25    #[error("telemetry endpoint unreachable: {message}")]
26    EndpointUnreachable { message: String },
27
28    #[error("circuit breaker open, rejecting telemetry")]
29    CircuitBreakerOpen,
30
31    #[error("buffer overflow, {dropped} events dropped")]
32    BufferOverflow { dropped: usize },
33
34    #[error("serialization error: {0}")]
35    SerializationError(String),
36
37    #[error("send timeout after {elapsed:?}")]
38    Timeout { elapsed: Duration },
39}
40
41pub type TelemetryResult<T> = Result<T, TelemetryError>;
42
43/// Types of telemetry events.
44#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum EventType {
47    RequestProcessed,
48    WafBlock,
49    RateLimitHit,
50    ConfigReload,
51    ServiceHealth,
52    SensorReport,
53    CampaignReport,
54    AuthCoverage,
55    LogEntry,
56}
57
58/// Telemetry event payload.
59#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(tag = "event_type", content = "data", rename_all = "snake_case")]
61pub enum TelemetryEvent {
62    RequestProcessed {
63        #[serde(skip_serializing_if = "Option::is_none")]
64        request_id: Option<String>,
65        latency_ms: u64,
66        status_code: u16,
67        waf_action: Option<String>,
68        site: String,
69        method: String,
70        path: String,
71    },
72    LogEntry {
73        #[serde(skip_serializing_if = "Option::is_none")]
74        request_id: Option<String>,
75        id: String,
76        source: String,
77        level: String,
78        message: String,
79        log_timestamp_ms: u64,
80        fields: Option<serde_json::Value>,
81        method: Option<String>,
82        path: Option<String>,
83        status_code: Option<u16>,
84        latency_ms: Option<f64>,
85        client_ip: Option<String>,
86        rule_id: Option<String>,
87    },
88    WafBlock {
89        #[serde(skip_serializing_if = "Option::is_none")]
90        request_id: Option<String>,
91        rule_id: String,
92        severity: String,
93        client_ip: String,
94        site: String,
95        path: String,
96    },
97    RateLimitHit {
98        #[serde(skip_serializing_if = "Option::is_none")]
99        request_id: Option<String>,
100        client_ip: String,
101        limit: u32,
102        window_secs: u32,
103        site: String,
104    },
105    ConfigReload {
106        sites_loaded: usize,
107        duration_ms: u64,
108        success: bool,
109        error: Option<String>,
110    },
111    ServiceHealth {
112        uptime_secs: u64,
113        memory_mb: u64,
114        active_connections: u64,
115        requests_per_sec: f64,
116    },
117    SensorReport {
118        sensor_id: String,
119        actor: ExternalActorContext,
120        signal: ExternalSignalContext,
121        request: ExternalRequestContext,
122    },
123    CampaignReport {
124        campaign_id: String,
125        confidence: f64,
126        attack_types: Vec<String>,
127        actor_count: usize,
128        correlation_reasons: Vec<String>,
129        timestamp_ms: u64,
130    },
131    AuthCoverage(AuthCoverageSummary),
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct ExternalActorContext {
136    pub ip: String,
137    pub session_id: Option<String>,
138    pub fingerprint: Option<String>,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ExternalSignalContext {
143    #[serde(rename = "type")]
144    pub signal_type: String,
145    pub severity: String,
146    pub details: serde_json::Value,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct ExternalRequestContext {
151    pub path: String,
152    pub method: String,
153    pub user_agent: Option<String>,
154}
155
156impl TelemetryEvent {
157    pub fn event_type(&self) -> EventType {
158        match self {
159            Self::RequestProcessed { .. } => EventType::RequestProcessed,
160            Self::WafBlock { .. } => EventType::WafBlock,
161            Self::RateLimitHit { .. } => EventType::RateLimitHit,
162            Self::ConfigReload { .. } => EventType::ConfigReload,
163            Self::ServiceHealth { .. } => EventType::ServiceHealth,
164            Self::SensorReport { .. } => EventType::SensorReport,
165            Self::CampaignReport { .. } => EventType::CampaignReport,
166            Self::AuthCoverage(_) => EventType::AuthCoverage,
167            Self::LogEntry { .. } => EventType::LogEntry,
168        }
169    }
170}
171
172/// Timestamped event wrapper.
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct TimestampedEvent {
175    pub timestamp_ms: u64,
176    pub instance_id: Option<String>,
177    pub event: TelemetryEvent,
178}
179
180impl TimestampedEvent {
181    pub fn new(event: TelemetryEvent, instance_id: Option<String>) -> Self {
182        let timestamp_ms = SystemTime::now()
183            .duration_since(UNIX_EPOCH)
184            .unwrap_or_default()
185            .as_millis() as u64;
186        Self {
187            timestamp_ms,
188            instance_id,
189            event,
190        }
191    }
192}
193
194/// Batch of telemetry events for transmission.
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct TelemetryBatch {
197    pub batch_id: String,
198    pub events: Vec<TimestampedEvent>,
199    pub created_at_ms: u64,
200}
201
202impl TelemetryBatch {
203    pub fn new(events: Vec<TimestampedEvent>) -> Self {
204        let created_at_ms = SystemTime::now()
205            .duration_since(UNIX_EPOCH)
206            .unwrap_or_default()
207            .as_millis() as u64;
208        let batch_id = format!("batch-{}", created_at_ms);
209        Self {
210            batch_id,
211            events,
212            created_at_ms,
213        }
214    }
215
216    pub fn len(&self) -> usize {
217        self.events.len()
218    }
219
220    pub fn is_empty(&self) -> bool {
221        self.events.is_empty()
222    }
223}
224
225/// Thread-safe event buffer.
226#[derive(Debug)]
227pub struct TelemetryBuffer {
228    events: Mutex<Vec<TimestampedEvent>>,
229    max_size: usize,
230    dropped: AtomicU64,
231    notify: Notify,
232}
233
234impl TelemetryBuffer {
235    pub fn new(max_size: usize) -> Self {
236        Self {
237            events: Mutex::new(Vec::with_capacity(max_size.min(1000))),
238            max_size,
239            dropped: AtomicU64::new(0),
240            notify: Notify::new(),
241        }
242    }
243
244    pub async fn push(&self, event: TimestampedEvent) -> bool {
245        let mut events = self.events.lock().await;
246        if events.len() >= self.max_size {
247            let dropped = self.dropped.fetch_add(1, Ordering::SeqCst);
248            if dropped.is_multiple_of(100) {
249                warn!(
250                    event_type = ?event.event.event_type(),
251                    total_dropped = dropped + 1,
252                    "Telemetry buffer overflow, dropping event"
253                );
254            }
255            return false;
256        }
257        events.push(event);
258        self.notify.notify_one();
259        true
260    }
261
262    pub async fn drain(&self) -> Vec<TimestampedEvent> {
263        let mut events = self.events.lock().await;
264        std::mem::take(&mut *events)
265    }
266
267    pub async fn len(&self) -> usize {
268        self.events.lock().await.len()
269    }
270
271    pub fn dropped_count(&self) -> u64 {
272        self.dropped.load(Ordering::SeqCst)
273    }
274
275    pub fn notified(&self) -> impl std::future::Future<Output = ()> + '_ {
276        self.notify.notified()
277    }
278}
279
280impl Default for TelemetryBuffer {
281    fn default() -> Self {
282        Self::new(10_000)
283    }
284}
285
286/// Telemetry statistics.
287#[derive(Debug, Default)]
288pub struct TelemetryStats {
289    pub events_sent: AtomicU64,
290    pub batches_sent: AtomicU64,
291    pub send_failures: AtomicU64,
292    pub retries: AtomicU64,
293}
294
295impl TelemetryStats {
296    pub fn snapshot(&self) -> TelemetryStatsSnapshot {
297        TelemetryStatsSnapshot {
298            events_sent: self.events_sent.load(Ordering::SeqCst),
299            batches_sent: self.batches_sent.load(Ordering::SeqCst),
300            send_failures: self.send_failures.load(Ordering::SeqCst),
301            retries: self.retries.load(Ordering::SeqCst),
302        }
303    }
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307pub struct TelemetryStatsSnapshot {
308    pub events_sent: u64,
309    pub batches_sent: u64,
310    pub send_failures: u64,
311    pub retries: u64,
312}
313
314/// Configuration for telemetry client.
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct TelemetryConfig {
317    pub enabled: bool,
318    pub endpoint: String,
319    pub api_key: Option<String>,
320    pub batch_size: usize,
321    pub flush_interval: Duration,
322    pub max_retries: u32,
323    pub initial_backoff: Duration,
324    pub max_backoff: Duration,
325    pub max_buffer_size: usize,
326    pub circuit_breaker_threshold: u64,
327    pub circuit_breaker_timeout: Duration,
328    #[serde(default)]
329    pub enabled_events: HashSet<EventType>,
330    pub instance_id: Option<String>,
331    /// When true, skips actual HTTP sending (for testing)
332    #[serde(default)]
333    pub dry_run: bool,
334}
335
336impl Default for TelemetryConfig {
337    fn default() -> Self {
338        Self {
339            enabled: true,
340            endpoint: "http://localhost:3100/telemetry".to_string(),
341            api_key: None,
342            batch_size: 100,
343            flush_interval: Duration::from_secs(10),
344            max_retries: 3,
345            initial_backoff: Duration::from_millis(100),
346            max_backoff: Duration::from_secs(30),
347            max_buffer_size: 10_000,
348            circuit_breaker_threshold: 5,
349            circuit_breaker_timeout: Duration::from_secs(60),
350            enabled_events: HashSet::new(),
351            instance_id: None,
352            dry_run: false,
353        }
354    }
355}
356
357impl TelemetryConfig {
358    pub fn new(endpoint: impl Into<String>) -> Self {
359        Self {
360            endpoint: endpoint.into(),
361            ..Default::default()
362        }
363    }
364
365    pub fn with_api_key(mut self, key: impl Into<String>) -> Self {
366        self.api_key = Some(key.into());
367        self
368    }
369
370    pub fn with_batch_size(mut self, size: usize) -> Self {
371        self.batch_size = size;
372        self
373    }
374
375    pub fn with_flush_interval(mut self, interval: Duration) -> Self {
376        self.flush_interval = interval;
377        self
378    }
379
380    pub fn with_instance_id(mut self, id: impl Into<String>) -> Self {
381        self.instance_id = Some(id.into());
382        self
383    }
384
385    pub fn with_enabled_events(mut self, events: HashSet<EventType>) -> Self {
386        self.enabled_events = events;
387        self
388    }
389
390    pub fn is_event_enabled(&self, event_type: &EventType) -> bool {
391        self.enabled_events.is_empty() || self.enabled_events.contains(event_type)
392    }
393}
394
395/// Telemetry client for sending events to Signal Horizon.
396#[derive(Clone)] // Derived Clone to support Fire-and-Forget emitting
397pub struct TelemetryClient {
398    config: TelemetryConfig,
399    buffer: Arc<TelemetryBuffer>,
400    circuit_breaker: Arc<CircuitBreaker>,
401    stats: Arc<TelemetryStats>,
402    shutdown: broadcast::Sender<()>,
403}
404
405impl TelemetryClient {
406    pub fn new(config: TelemetryConfig) -> Self {
407        let buffer = Arc::new(TelemetryBuffer::new(config.max_buffer_size));
408        let circuit_breaker = Arc::new(CircuitBreaker::new(
409            config.circuit_breaker_threshold,
410            config.circuit_breaker_timeout,
411        ));
412        let (shutdown, _) = broadcast::channel(1);
413
414        Self {
415            config,
416            buffer,
417            circuit_breaker,
418            stats: Arc::new(TelemetryStats::default()),
419            shutdown,
420        }
421    }
422
423    fn apply_auth_headers(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
424        if let Some(ref key) = self.config.api_key {
425            request
426                .bearer_auth(key)
427                .header("X-API-Key", key)
428                .header("X-Admin-Key", key)
429        } else {
430            request
431        }
432    }
433
434    /// Records a telemetry event.
435    pub async fn record(&self, event: TelemetryEvent) -> TelemetryResult<()> {
436        if !self.config.enabled {
437            return Ok(());
438        }
439
440        if !self.config.is_event_enabled(&event.event_type()) {
441            return Ok(());
442        }
443
444        let timestamped = TimestampedEvent::new(event, self.config.instance_id.clone());
445        if !self.buffer.push(timestamped).await {
446            return Err(TelemetryError::BufferOverflow { dropped: 1 });
447        }
448
449        // Auto-flush if batch size reached
450        if self.buffer.len().await >= self.config.batch_size {
451            self.flush().await?;
452        }
453
454        Ok(())
455    }
456
457    /// Flushes buffered events.
458    pub async fn flush(&self) -> TelemetryResult<()> {
459        if !self.config.enabled {
460            return Ok(());
461        }
462
463        if !self.circuit_breaker.allow_request().await {
464            return Err(TelemetryError::CircuitBreakerOpen);
465        }
466
467        let events = self.buffer.drain().await;
468        if events.is_empty() {
469            return Ok(());
470        }
471
472        let batch = TelemetryBatch::new(events);
473        self.send_batch_with_retry(&batch).await
474    }
475
476    /// Reports a single event immediately (bypassing batching).
477    /// Used for critical security alerts like blocks.
478    pub async fn report(&self, event: TelemetryEvent) -> TelemetryResult<()> {
479        if !self.config.enabled {
480            return Ok(());
481        }
482
483        // Map TelemetryEvent::SensorReport to the flat format expected by Risk Server
484        let payload = match event {
485            TelemetryEvent::SensorReport {
486                sensor_id,
487                actor,
488                signal,
489                request,
490            } => {
491                serde_json::json!({
492                    "sensorId": sensor_id,
493                    "actor": actor,
494                    "signal": signal,
495                    "request": request,
496                    "timestamp": TimestampedEvent::new(TelemetryEvent::ServiceHealth {
497                        uptime_secs: 0, memory_mb: 0, active_connections: 0, requests_per_sec: 0.0
498                    }, None).timestamp_ms
499                })
500            }
501            _ => serde_json::to_value(&event)
502                .map_err(|e| TelemetryError::SerializationError(e.to_string()))?,
503        };
504
505        let client = reqwest::Client::new();
506        let request = client
507            .post(&self.config.endpoint)
508            .json(&payload)
509            .timeout(Duration::from_secs(2));
510        let response = self.apply_auth_headers(request).send().await.map_err(|e| {
511            TelemetryError::EndpointUnreachable {
512                message: e.to_string(),
513            }
514        })?;
515
516        if !response.status().is_success() {
517            return Err(TelemetryError::EndpointUnreachable {
518                message: format!("HTTP {}", response.status()),
519            });
520        }
521
522        Ok(())
523    }
524
525    async fn send_batch_with_retry(&self, batch: &TelemetryBatch) -> TelemetryResult<()> {
526        // In dry_run mode, skip actual HTTP sending but still update stats
527        if self.config.dry_run {
528            self.stats
529                .events_sent
530                .fetch_add(batch.len() as u64, Ordering::SeqCst);
531            self.stats.batches_sent.fetch_add(1, Ordering::SeqCst);
532            return Ok(());
533        }
534
535        let mut backoff = self.config.initial_backoff;
536
537        for attempt in 0..=self.config.max_retries {
538            match self.send_batch(batch).await {
539                Ok(()) => {
540                    self.circuit_breaker.record_success().await;
541                    self.stats
542                        .events_sent
543                        .fetch_add(batch.len() as u64, Ordering::SeqCst);
544                    self.stats.batches_sent.fetch_add(1, Ordering::SeqCst);
545                    return Ok(());
546                }
547                Err(e) => {
548                    if attempt < self.config.max_retries {
549                        self.stats.retries.fetch_add(1, Ordering::SeqCst);
550                        debug!(
551                            "Telemetry send failed (attempt {}), retrying: {}",
552                            attempt + 1,
553                            e
554                        );
555                        tokio::time::sleep(backoff).await;
556                        backoff = backoff
557                            .checked_mul(2)
558                            .unwrap_or(self.config.max_backoff)
559                            .min(self.config.max_backoff);
560                    } else {
561                        self.circuit_breaker.record_failure().await;
562                        self.stats.send_failures.fetch_add(1, Ordering::SeqCst);
563                        warn!(
564                            "Telemetry send failed after {} retries: {}",
565                            self.config.max_retries, e
566                        );
567                        return Err(e);
568                    }
569                }
570            }
571        }
572
573        Ok(())
574    }
575
576    async fn send_batch(&self, batch: &TelemetryBatch) -> TelemetryResult<()> {
577        debug!(
578            batch_id = %batch.batch_id,
579            event_count = batch.len(),
580            "Sending telemetry batch to {}",
581            self.config.endpoint
582        );
583
584        // Build the batch payload
585        let payload = serde_json::json!({
586            "batch_id": batch.batch_id.to_string(),
587            "events": batch.events,
588            "timestamp_ms": SystemTime::now()
589                .duration_since(UNIX_EPOCH)
590                .map(|d| d.as_millis() as u64)
591                .unwrap_or(0),
592        });
593
594        // Send HTTP POST to telemetry endpoint
595        let client = reqwest::Client::builder()
596            .timeout(Duration::from_secs(5))
597            .build()
598            .map_err(|e| TelemetryError::EndpointUnreachable {
599                message: e.to_string(),
600            })?;
601
602        let response = client.post(&self.config.endpoint).json(&payload);
603        let response = self
604            .apply_auth_headers(response)
605            .send()
606            .await
607            .map_err(|e| {
608                warn!(error = %e, "Telemetry batch send failed");
609                TelemetryError::EndpointUnreachable {
610                    message: e.to_string(),
611                }
612            })?;
613
614        if !response.status().is_success() {
615            let status = response.status();
616            warn!(status = %status, "Telemetry endpoint returned error status");
617            return Err(TelemetryError::EndpointUnreachable {
618                message: format!("HTTP {}", status),
619            });
620        }
621
622        debug!(batch_id = %batch.batch_id, "Telemetry batch sent successfully");
623        Ok(())
624    }
625
626    /// Starts the background flush task.
627    pub fn start_background_flush(&self) -> tokio::task::JoinHandle<()> {
628        let client = self.clone();
629        let mut shutdown = client.shutdown.subscribe();
630
631        tokio::spawn(async move {
632            let mut interval = tokio::time::interval(client.config.flush_interval);
633
634            loop {
635                tokio::select! {
636                    _ = interval.tick() => {
637                        if let Err(err) = client.flush().await {
638                            debug!("Background telemetry flush failed: {}", err);
639                        }
640                    }
641                    _ = shutdown.recv() => {
642                        if let Err(err) = client.flush().await {
643                            debug!("Final telemetry flush failed: {}", err);
644                        }
645                        break;
646                    }
647                }
648            }
649        })
650    }
651
652    /// Triggers shutdown of background tasks.
653    pub fn shutdown(&self) {
654        let _ = self.shutdown.send(());
655    }
656
657    /// Returns telemetry statistics.
658    pub fn stats(&self) -> TelemetryStatsSnapshot {
659        self.stats.snapshot()
660    }
661
662    /// Returns the dropped event count.
663    pub fn dropped_count(&self) -> u64 {
664        self.buffer.dropped_count()
665    }
666
667    /// Returns whether telemetry is enabled.
668    pub fn is_enabled(&self) -> bool {
669        self.config.enabled
670    }
671}
672
673/// Trait for emitting arbitrary signals.
674#[async_trait]
675pub trait SignalEmitter: Send + Sync {
676    async fn emit(&self, signal_type: &str, payload: serde_json::Value);
677}
678
679#[async_trait]
680impl SignalEmitter for TelemetryClient {
681    async fn emit(&self, signal_type: &str, payload: serde_json::Value) {
682        if signal_type == "auth_coverage_summary" {
683            if let Ok(summary) = serde_json::from_value::<AuthCoverageSummary>(payload) {
684                // Clone self is cheap because it just increments ref counts of Arcs
685                // Wait, TelemetryClient struct fields are Arc except config.
686                // So cloning TelemetryClient is cheap.
687                let _ = self.record(TelemetryEvent::AuthCoverage(summary)).await;
688            }
689        }
690    }
691}
692
693// Convenience functions for creating events
694pub fn request_processed(
695    latency_ms: u64,
696    status_code: u16,
697    waf_action: Option<String>,
698    site: String,
699    method: String,
700    path: String,
701) -> TelemetryEvent {
702    TelemetryEvent::RequestProcessed {
703        request_id: None,
704        latency_ms,
705        status_code,
706        waf_action,
707        site,
708        method,
709        path,
710    }
711}
712
713pub fn waf_block(
714    rule_id: String,
715    severity: String,
716    client_ip: String,
717    site: String,
718    path: String,
719) -> TelemetryEvent {
720    TelemetryEvent::WafBlock {
721        request_id: None,
722        rule_id,
723        severity,
724        client_ip,
725        site,
726        path,
727    }
728}
729
730pub fn rate_limit_hit(
731    request_id: Option<String>,
732    client_ip: String,
733    limit: u32,
734    window_secs: u32,
735    site: String,
736) -> TelemetryEvent {
737    TelemetryEvent::RateLimitHit {
738        request_id,
739        client_ip,
740        limit,
741        window_secs,
742        site,
743    }
744}
745
746pub fn config_reload(
747    sites_loaded: usize,
748    duration_ms: u64,
749    success: bool,
750    error: Option<String>,
751) -> TelemetryEvent {
752    TelemetryEvent::ConfigReload {
753        sites_loaded,
754        duration_ms,
755        success,
756        error,
757    }
758}
759
760pub fn service_health(
761    uptime_secs: u64,
762    memory_mb: u64,
763    active_connections: u64,
764    requests_per_sec: f64,
765) -> TelemetryEvent {
766    TelemetryEvent::ServiceHealth {
767        uptime_secs,
768        memory_mb,
769        active_connections,
770        requests_per_sec,
771    }
772}
773
774#[cfg(test)]
775mod tests {
776    use super::*;
777    use crate::utils::circuit_breaker::CircuitState;
778
779    fn test_config() -> TelemetryConfig {
780        TelemetryConfig {
781            enabled: true,
782            endpoint: "http://test:8080/telemetry".to_string(),
783            batch_size: 10,
784            flush_interval: Duration::from_millis(100),
785            max_buffer_size: 100,
786            dry_run: true, // Skip actual HTTP for tests
787            ..Default::default()
788        }
789    }
790
791    #[test]
792    fn test_config_defaults() {
793        let config = TelemetryConfig::default();
794        assert!(config.enabled);
795        assert_eq!(config.batch_size, 100);
796        assert_eq!(config.max_retries, 3);
797    }
798
799    #[test]
800    fn test_config_builder() {
801        let config = TelemetryConfig::new("http://custom:9000")
802            .with_api_key("secret")
803            .with_batch_size(50)
804            .with_instance_id("node-1");
805
806        assert_eq!(config.endpoint, "http://custom:9000");
807        assert_eq!(config.api_key, Some("secret".to_string()));
808        assert_eq!(config.batch_size, 50);
809        assert_eq!(config.instance_id, Some("node-1".to_string()));
810    }
811
812    #[test]
813    fn test_event_type_classification() {
814        let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
815        assert_eq!(event.event_type(), EventType::RequestProcessed);
816
817        let event = waf_block(
818            "rule-1".into(),
819            "high".into(),
820            "1.2.3.4".into(),
821            "site".into(),
822            "/".into(),
823        );
824        assert_eq!(event.event_type(), EventType::WafBlock);
825
826        let event = TelemetryEvent::LogEntry {
827            request_id: None,
828            id: "log-1".into(),
829            source: "access".into(),
830            level: "info".into(),
831            message: "GET /".into(),
832            log_timestamp_ms: 1234,
833            fields: None,
834            method: Some("GET".into()),
835            path: Some("/".into()),
836            status_code: Some(200),
837            latency_ms: Some(12.5),
838            client_ip: Some("203.0.113.1".into()),
839            rule_id: None,
840        };
841        assert_eq!(event.event_type(), EventType::LogEntry);
842
843        let event = rate_limit_hit(None, "1.2.3.4".into(), 100, 60, "site".into());
844        assert_eq!(event.event_type(), EventType::RateLimitHit);
845
846        let event = config_reload(5, 100, true, None);
847        assert_eq!(event.event_type(), EventType::ConfigReload);
848
849        let event = service_health(3600, 512, 100, 1000.0);
850        assert_eq!(event.event_type(), EventType::ServiceHealth);
851    }
852
853    #[test]
854    fn test_event_serialization() {
855        let event = request_processed(
856            100,
857            200,
858            Some("pass".into()),
859            "site".into(),
860            "GET".into(),
861            "/api".into(),
862        );
863        let json = serde_json::to_string(&event).unwrap();
864        assert!(json.contains("request_processed"));
865        assert!(json.contains("100"));
866    }
867
868    #[test]
869    fn test_request_id_serialization() {
870        let event = TelemetryEvent::RequestProcessed {
871            request_id: Some("req_123".into()),
872            latency_ms: 100,
873            status_code: 200,
874            waf_action: None,
875            site: "site".into(),
876            method: "GET".into(),
877            path: "/".into(),
878        };
879        let value = serde_json::to_value(&event).unwrap();
880        assert_eq!(value["event_type"], "request_processed");
881        assert_eq!(value["data"]["request_id"], "req_123");
882    }
883
884    #[test]
885    fn test_log_entry_request_id_serialization() {
886        let event = TelemetryEvent::LogEntry {
887            request_id: Some("req_456".into()),
888            id: "log-1".into(),
889            source: "access".into(),
890            level: "info".into(),
891            message: "GET /".into(),
892            log_timestamp_ms: 1234,
893            fields: None,
894            method: Some("GET".into()),
895            path: Some("/".into()),
896            status_code: Some(200),
897            latency_ms: Some(12.5),
898            client_ip: Some("203.0.113.1".into()),
899            rule_id: None,
900        };
901        let value = serde_json::to_value(&event).unwrap();
902        assert_eq!(value["event_type"], "log_entry");
903        assert_eq!(value["data"]["request_id"], "req_456");
904    }
905
906    #[test]
907    fn test_timestamped_event() {
908        let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
909        let timestamped = TimestampedEvent::new(event, Some("node-1".to_string()));
910
911        assert!(timestamped.timestamp_ms > 0);
912        assert_eq!(timestamped.instance_id, Some("node-1".to_string()));
913    }
914
915    #[test]
916    fn test_batch_creation() {
917        let events: Vec<TimestampedEvent> = (0..5)
918            .map(|i| {
919                let event =
920                    request_processed(i * 10, 200, None, "site".into(), "GET".into(), "/".into());
921                TimestampedEvent::new(event, None)
922            })
923            .collect();
924
925        let batch = TelemetryBatch::new(events);
926        assert_eq!(batch.len(), 5);
927        assert!(!batch.is_empty());
928        assert!(batch.batch_id.starts_with("batch-"));
929    }
930
931    #[tokio::test]
932    async fn test_circuit_breaker_closed() {
933        let cb = CircuitBreaker::default();
934        assert_eq!(cb.state().await, CircuitState::Closed);
935        assert!(cb.allow_request().await);
936    }
937
938    #[tokio::test]
939    async fn test_circuit_breaker_opens_on_failures() {
940        let cb = CircuitBreaker::new(3, Duration::from_secs(60));
941
942        for _ in 0..3 {
943            cb.record_failure().await;
944        }
945
946        assert_eq!(cb.state().await, CircuitState::Open);
947        assert!(!cb.allow_request().await);
948    }
949
950    #[tokio::test]
951    async fn test_circuit_breaker_success_resets() {
952        let cb = CircuitBreaker::new(3, Duration::from_secs(60));
953
954        cb.record_failure().await;
955        cb.record_failure().await;
956        cb.record_success().await;
957
958        assert_eq!(cb.state().await, CircuitState::Closed);
959        assert!(cb.allow_request().await);
960    }
961
962    #[tokio::test]
963    async fn test_buffer_push_and_drain() {
964        let buffer = TelemetryBuffer::new(10);
965        let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
966        let timestamped = TimestampedEvent::new(event, None);
967
968        assert!(buffer.push(timestamped).await);
969        assert_eq!(buffer.len().await, 1);
970
971        let drained = buffer.drain().await;
972        assert_eq!(drained.len(), 1);
973        assert_eq!(buffer.len().await, 0);
974    }
975
976    #[tokio::test]
977    async fn test_buffer_overflow() {
978        let buffer = TelemetryBuffer::new(2);
979
980        for _ in 0..3 {
981            let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
982            let timestamped = TimestampedEvent::new(event, None);
983            buffer.push(timestamped).await;
984        }
985
986        assert_eq!(buffer.len().await, 2);
987        assert_eq!(buffer.dropped_count(), 1);
988    }
989
990    #[tokio::test]
991    async fn test_client_record_event() {
992        let client = TelemetryClient::new(test_config());
993        let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
994
995        let result = client.record(event).await;
996        assert!(result.is_ok());
997    }
998
999    #[tokio::test]
1000    async fn test_client_flush() {
1001        let client = TelemetryClient::new(test_config());
1002
1003        for _ in 0..5 {
1004            let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
1005            client.record(event).await.unwrap();
1006        }
1007
1008        let result = client.flush().await;
1009        assert!(result.is_ok());
1010
1011        let stats = client.stats();
1012        assert_eq!(stats.events_sent, 5);
1013        assert_eq!(stats.batches_sent, 1);
1014    }
1015
1016    #[tokio::test]
1017    async fn test_client_auto_flush_on_batch_size() {
1018        let mut config = test_config();
1019        config.batch_size = 3;
1020        let client = TelemetryClient::new(config);
1021
1022        for _ in 0..3 {
1023            let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
1024            client.record(event).await.unwrap();
1025        }
1026
1027        // Should have auto-flushed
1028        let stats = client.stats();
1029        assert_eq!(stats.events_sent, 3);
1030    }
1031
1032    #[tokio::test]
1033    async fn test_client_disabled() {
1034        let mut config = test_config();
1035        config.enabled = false;
1036        let client = TelemetryClient::new(config);
1037
1038        let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
1039        let result = client.record(event).await;
1040        assert!(result.is_ok());
1041
1042        let stats = client.stats();
1043        assert_eq!(stats.events_sent, 0);
1044    }
1045
1046    #[tokio::test]
1047    async fn test_client_event_filtering() {
1048        let mut config = test_config();
1049        config.enabled_events = [EventType::WafBlock].into_iter().collect();
1050        let client = TelemetryClient::new(config);
1051
1052        // This event type is not enabled
1053        let event = request_processed(100, 200, None, "site".into(), "GET".into(), "/".into());
1054        client.record(event).await.unwrap();
1055
1056        // This event type is enabled
1057        let event = waf_block(
1058            "rule-1".into(),
1059            "high".into(),
1060            "1.2.3.4".into(),
1061            "site".into(),
1062            "/".into(),
1063        );
1064        client.record(event).await.unwrap();
1065
1066        client.flush().await.unwrap();
1067        let stats = client.stats();
1068        assert_eq!(stats.events_sent, 1);
1069    }
1070
1071    #[test]
1072    fn test_stats_snapshot() {
1073        let stats = TelemetryStats::default();
1074        stats.events_sent.store(100, Ordering::SeqCst);
1075        stats.batches_sent.store(10, Ordering::SeqCst);
1076
1077        let snapshot = stats.snapshot();
1078        assert_eq!(snapshot.events_sent, 100);
1079        assert_eq!(snapshot.batches_sent, 10);
1080    }
1081
1082    #[test]
1083    fn test_config_event_enabled() {
1084        let mut config = TelemetryConfig::default();
1085        // Empty means all enabled
1086        assert!(config.is_event_enabled(&EventType::WafBlock));
1087
1088        config.enabled_events = [EventType::WafBlock].into_iter().collect();
1089        assert!(config.is_event_enabled(&EventType::WafBlock));
1090        assert!(!config.is_event_enabled(&EventType::RequestProcessed));
1091    }
1092}