1pub mod stream;
7pub mod processor;
8pub mod consumer;
9pub mod producer;
10pub mod config;
11
12pub use stream::{StreamConfig, StreamType, AbstractStream, StreamMessage, StreamError};
13pub use processor::{StreamProcessor, EventStreamProcessor, EventSender, StreamConsumer, StreamProducer};
14pub use consumer::*;
15pub use producer::*;
16pub use config::*;
17
18#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
20pub enum StreamingEvent {
21 SecurityEvent {
23 event: fukurow_core::model::CyberEvent,
24 timestamp: chrono::DateTime<chrono::Utc>,
25 source: String,
26 },
27
28 ReasoningResult {
30 actions: Vec<fukurow_core::model::SecurityAction>,
31 execution_time_ms: u64,
32 event_count: usize,
33 timestamp: chrono::DateTime<chrono::Utc>,
34 },
35
36 AnomalyDetected {
38 score: f64,
39 threshold: f64,
40 metric: String,
41 timestamp: chrono::DateTime<chrono::Utc>,
42 },
43
44 SystemMetrics {
46 cpu_usage: f64,
47 memory_usage: f64,
48 active_connections: u32,
49 timestamp: chrono::DateTime<chrono::Utc>,
50 },
51}
52
53impl StreamingEvent {
54 pub fn event_type(&self) -> &'static str {
56 match self {
57 StreamingEvent::SecurityEvent { .. } => "security_event",
58 StreamingEvent::ReasoningResult { .. } => "reasoning_result",
59 StreamingEvent::AnomalyDetected { .. } => "anomaly_detected",
60 StreamingEvent::SystemMetrics { .. } => "system_metrics",
61 }
62 }
63
64 pub fn timestamp(&self) -> chrono::DateTime<chrono::Utc> {
66 match self {
67 StreamingEvent::SecurityEvent { timestamp, .. } => *timestamp,
68 StreamingEvent::ReasoningResult { timestamp, .. } => *timestamp,
69 StreamingEvent::AnomalyDetected { timestamp, .. } => *timestamp,
70 StreamingEvent::SystemMetrics { timestamp, .. } => *timestamp,
71 }
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78 use fukurow_core::model::CyberEvent;
79
80 #[test]
81 fn test_streaming_event_types() {
82 let security_event = StreamingEvent::SecurityEvent {
83 event: CyberEvent::NetworkConnection {
84 source_ip: "192.168.1.1".to_string(),
85 dest_ip: "10.0.0.1".to_string(),
86 port: 443,
87 protocol: "tcp".to_string(),
88 timestamp: 1640995200,
89 },
90 timestamp: chrono::Utc::now(),
91 source: "sensor1".to_string(),
92 };
93
94 assert_eq!(security_event.event_type(), "security_event");
95 assert!(security_event.timestamp() <= chrono::Utc::now());
96 }
97
98 #[test]
99 fn test_system_metrics_event() {
100 let metrics_event = StreamingEvent::SystemMetrics {
101 cpu_usage: 45.5,
102 memory_usage: 67.8,
103 active_connections: 150,
104 timestamp: chrono::Utc::now(),
105 };
106
107 assert_eq!(metrics_event.event_type(), "system_metrics");
108 assert_eq!(metrics_event.timestamp() <= chrono::Utc::now(), true);
109 }
110}