f1_nexus_telemetry/
lib.rs

1//! F1 Nexus Telemetry Processing Engine
2//!
3//! Real-time telemetry data ingestion, processing, and anomaly detection
4//! with sub-millisecond latency using SIMD optimization and neural inference.
5
6pub mod processor;
7pub mod stream;
8pub mod anomaly;
9pub mod buffer;
10pub mod predictor;
11
12pub use processor::*;
13pub use stream::*;
14pub use anomaly::*;
15pub use buffer::*;
16pub use predictor::*;
17
18use f1_nexus_core::TelemetrySnapshot;
19use std::sync::Arc;
20use tokio::sync::broadcast;
21
22/// Telemetry processing engine
23pub struct TelemetryEngine {
24    processor: Arc<TelemetryProcessor>,
25    anomaly_detector: Arc<AnomalyDetector>,
26    tx: broadcast::Sender<TelemetryEvent>,
27}
28
29/// Telemetry events
30#[derive(Debug, Clone)]
31pub enum TelemetryEvent {
32    Snapshot(TelemetrySnapshot),
33    Anomaly(AnomalyInfo),
34    LegacyAnomaly(AnomalyAlert), // For backward compatibility
35    StreamStart { session_id: String },
36    StreamEnd { session_id: String },
37}
38
39impl TelemetryEngine {
40    /// Create new telemetry engine
41    pub fn new(config: TelemetryConfig) -> Self {
42        let (tx, _) = broadcast::channel(10_000);
43
44        TelemetryEngine {
45            processor: Arc::new(TelemetryProcessor::new(config.clone())),
46            anomaly_detector: Arc::new(AnomalyDetector::new(config)),
47            tx,
48        }
49    }
50
51    /// Process incoming telemetry snapshot
52    pub async fn process(&self, snapshot: TelemetrySnapshot) -> Result<(), TelemetryError> {
53        // Process telemetry (validation, normalization, etc.)
54        self.processor.process(&snapshot)?;
55
56        // Run anomaly detection (new statistical detector)
57        let anomalies = self.anomaly_detector.detect(&snapshot);
58        for anomaly in anomalies {
59            let _ = self.tx.send(TelemetryEvent::Anomaly(anomaly));
60        }
61
62        // Broadcast processed snapshot
63        let _ = self.tx.send(TelemetryEvent::Snapshot(snapshot));
64
65        Ok(())
66    }
67
68    /// Subscribe to telemetry events
69    pub fn subscribe(&self) -> broadcast::Receiver<TelemetryEvent> {
70        self.tx.subscribe()
71    }
72
73    /// Get processing statistics
74    pub fn stats(&self) -> ProcessingStats {
75        self.processor.stats()
76    }
77}
78
79/// Telemetry configuration
80#[derive(Debug, Clone)]
81pub struct TelemetryConfig {
82    /// Enable anomaly detection
83    pub enable_anomaly_detection: bool,
84
85    /// Anomaly detection threshold
86    pub anomaly_threshold: f32,
87
88    /// Buffer size for sliding window analysis
89    pub buffer_size: usize,
90
91    /// Enable SIMD optimizations
92    pub enable_simd: bool,
93}
94
95impl Default for TelemetryConfig {
96    fn default() -> Self {
97        TelemetryConfig {
98            enable_anomaly_detection: true,
99            anomaly_threshold: 0.95,
100            buffer_size: 1000,
101            enable_simd: true,
102        }
103    }
104}
105
106/// Telemetry processing errors
107#[derive(Debug, thiserror::Error)]
108pub enum TelemetryError {
109    #[error("Invalid telemetry data: {0}")]
110    InvalidData(String),
111
112    #[error("Processing error: {0}")]
113    ProcessingError(String),
114
115    #[error("Anomaly detection error: {0}")]
116    AnomalyDetectionError(String),
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn test_telemetry_engine_creation() {
125        let config = TelemetryConfig::default();
126        let _engine = TelemetryEngine::new(config);
127        // Engine created successfully
128    }
129}