fukurow_streaming/
lib.rs

1//! # Fukurow Streaming
2//!
3//! Real-time streaming processing for Fukurow reasoning engine.
4//! Supports Kafka, NATS, Redis Streams, and RabbitMQ.
5
6pub mod stream;
7pub mod processor;
8pub mod consumer;
9pub mod producer;
10pub mod config;
11
12pub use stream::{StreamConfig, StreamType, AbstractStream, StreamMessage, StreamError};
13pub use processor::{StreamProcessor, EventStreamProcessor, EventSender, StreamConsumer, StreamProducer};
14pub use consumer::*;
15pub use producer::*;
16pub use config::*;
17
18/// Streaming event types
19#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
20pub enum StreamingEvent {
21    /// Security event from sensors
22    SecurityEvent {
23        event: fukurow_core::model::CyberEvent,
24        timestamp: chrono::DateTime<chrono::Utc>,
25        source: String,
26    },
27
28    /// Reasoning result
29    ReasoningResult {
30        actions: Vec<fukurow_core::model::SecurityAction>,
31        execution_time_ms: u64,
32        event_count: usize,
33        timestamp: chrono::DateTime<chrono::Utc>,
34    },
35
36    /// Anomaly detection result
37    AnomalyDetected {
38        score: f64,
39        threshold: f64,
40        metric: String,
41        timestamp: chrono::DateTime<chrono::Utc>,
42    },
43
44    /// System metrics
45    SystemMetrics {
46        cpu_usage: f64,
47        memory_usage: f64,
48        active_connections: u32,
49        timestamp: chrono::DateTime<chrono::Utc>,
50    },
51}
52
53impl StreamingEvent {
54    /// Get event type as string
55    pub fn event_type(&self) -> &'static str {
56        match self {
57            StreamingEvent::SecurityEvent { .. } => "security_event",
58            StreamingEvent::ReasoningResult { .. } => "reasoning_result",
59            StreamingEvent::AnomalyDetected { .. } => "anomaly_detected",
60            StreamingEvent::SystemMetrics { .. } => "system_metrics",
61        }
62    }
63
64    /// Get event timestamp
65    pub fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
66        match self {
67            StreamingEvent::SecurityEvent { timestamp, .. } => *timestamp,
68            StreamingEvent::ReasoningResult { timestamp, .. } => *timestamp,
69            StreamingEvent::AnomalyDetected { timestamp, .. } => *timestamp,
70            StreamingEvent::SystemMetrics { timestamp, .. } => *timestamp,
71        }
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use fukurow_core::model::CyberEvent;
79
80    #[test]
81    fn test_streaming_event_types() {
82        let security_event = StreamingEvent::SecurityEvent {
83            event: CyberEvent::NetworkConnection {
84                source_ip: "192.168.1.1".to_string(),
85                dest_ip: "10.0.0.1".to_string(),
86                port: 443,
87                protocol: "tcp".to_string(),
88                timestamp: 1640995200,
89            },
90            timestamp: chrono::Utc::now(),
91            source: "sensor1".to_string(),
92        };
93
94        assert_eq!(security_event.event_type(), "security_event");
95        assert!(security_event.timestamp() <= chrono::Utc::now());
96    }
97
98    #[test]
99    fn test_system_metrics_event() {
100        let metrics_event = StreamingEvent::SystemMetrics {
101            cpu_usage: 45.5,
102            memory_usage: 67.8,
103            active_connections: 150,
104            timestamp: chrono::Utc::now(),
105        };
106
107        assert_eq!(metrics_event.event_type(), "system_metrics");
108        assert_eq!(metrics_event.timestamp() <= chrono::Utc::now(), true);
109    }
110}