1use crate::{StreamingEvent, StreamingConfig, StreamError};
6use async_trait::async_trait;
7use futures::stream::{Stream, StreamExt};
8use std::pin::Pin;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tracing::{error, info, warn};
12
13#[async_trait]
15pub trait StreamProcessor: Send + Sync {
16 async fn process_event(&self, event: StreamingEvent) -> Result<(), StreamError>;
18
19 async fn process_batch(&self, events: Vec<StreamingEvent>) -> Result<(), StreamError>;
21
22 fn name(&self) -> &'static str;
24
25 async fn health_check(&self) -> Result<(), StreamError>;
27}
28
29pub struct EventStreamProcessor<P: StreamProcessor> {
31 processor: Arc<P>,
32 config: StreamingConfig,
33 event_tx: mpsc::UnboundedSender<StreamingEvent>,
34 event_rx: mpsc::UnboundedReceiver<StreamingEvent>,
35}
36
37impl<P: StreamProcessor + 'static> EventStreamProcessor<P> {
38 pub fn new(processor: P, config: StreamingConfig) -> Self {
40 let (event_tx, event_rx) = mpsc::unbounded_channel();
41
42 Self {
43 processor: Arc::new(processor),
44 config,
45 event_tx,
46 event_rx,
47 }
48 }
49
50 pub async fn start_processing(mut self) -> Result<(), StreamError> {
52 info!("Starting event stream processor: {}", self.processor.name());
53
54 let processor = Arc::clone(&self.processor);
55 let batch_size = self.config.processing.batch_size;
56 let processing_timeout = std::time::Duration::from_secs(
57 self.config.processing.processing_timeout_seconds
58 );
59
60 tokio::spawn(async move {
61 let mut batch = Vec::with_capacity(batch_size);
62 let mut last_process_time = std::time::Instant::now();
63
64 while let Some(event) = self.event_rx.recv().await {
65 batch.push(event);
66
67 let should_process = batch.len() >= batch_size ||
69 last_process_time.elapsed() >= processing_timeout;
70
71 if should_process {
72 if let Err(e) = processor.process_batch(batch).await {
73 error!("Failed to process batch: {}", e);
74 }
75 batch = Vec::with_capacity(batch_size);
76 last_process_time = std::time::Instant::now();
77 }
78 }
79
80 if !batch.is_empty() {
82 if let Err(e) = processor.process_batch(batch).await {
83 error!("Failed to process final batch: {}", e);
84 }
85 }
86 });
87
88 Ok(())
89 }
90
91 pub fn send_event(&self, event: StreamingEvent) -> Result<(), StreamError> {
93 self.event_tx.send(event)
94 .map_err(|_| StreamError::ChannelClosed)
95 }
96
97 pub fn event_sender(&self) -> EventSender {
99 EventSender {
100 sender: self.event_tx.clone(),
101 }
102 }
103
104 pub async fn health_check(&self) -> Result<(), StreamError> {
106 self.processor.health_check().await
107 }
108}
109
110#[derive(Clone)]
112pub struct EventSender {
113 sender: mpsc::UnboundedSender<StreamingEvent>,
114}
115
116impl EventSender {
117 pub fn send(&self, event: StreamingEvent) -> Result<(), StreamError> {
119 self.sender.send(event)
120 .map_err(|_| StreamError::ChannelClosed)
121 }
122
123 pub fn send_security_event(&self, event: fukurow_core::model::CyberEvent, source: String) -> Result<(), StreamError> {
125 let streaming_event = StreamingEvent::SecurityEvent {
126 event,
127 timestamp: chrono::Utc::now(),
128 source,
129 };
130 self.send(streaming_event)
131 }
132
133 pub fn send_reasoning_result(&self, actions: Vec<fukurow_core::model::SecurityAction>, execution_time_ms: u64, event_count: usize) -> Result<(), StreamError> {
135 let streaming_event = StreamingEvent::ReasoningResult {
136 actions,
137 execution_time_ms,
138 event_count,
139 timestamp: chrono::Utc::now(),
140 };
141 self.send(streaming_event)
142 }
143
144 pub fn send_anomaly(&self, score: f64, threshold: f64, metric: String) -> Result<(), StreamError> {
146 let streaming_event = StreamingEvent::AnomalyDetected {
147 score,
148 threshold,
149 metric,
150 timestamp: chrono::Utc::now(),
151 };
152 self.send(streaming_event)
153 }
154
155 pub fn send_metrics(&self, cpu_usage: f64, memory_usage: f64, active_connections: u32) -> Result<(), StreamError> {
157 let streaming_event = StreamingEvent::SystemMetrics {
158 cpu_usage,
159 memory_usage,
160 active_connections,
161 timestamp: chrono::Utc::now(),
162 };
163 self.send(streaming_event)
164 }
165}
166
167#[async_trait]
169pub trait StreamConsumer: Send + Sync {
170 async fn consume(&self) -> Pin<Box<dyn Stream<Item = Result<StreamingEvent, StreamError>> + Send>>;
172
173 fn name(&self) -> &'static str;
175
176 async fn health_check(&self) -> Result<(), StreamError>;
178}
179
180#[async_trait]
182pub trait StreamProducer: Send + Sync {
183 async fn produce(&self, event: StreamingEvent) -> Result<(), StreamError>;
185
186 async fn produce_batch(&self, events: Vec<StreamingEvent>) -> Result<(), StreamError>;
188
189 fn name(&self) -> &'static str;
191
192 async fn health_check(&self) -> Result<(), StreamError>;
194}
195
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use fukurow_core::model::CyberEvent;
201
202 struct MockProcessor;
203
204 #[async_trait]
205 impl StreamProcessor for MockProcessor {
206 async fn process_event(&self, _event: StreamingEvent) -> Result<(), StreamError> {
207 Ok(())
208 }
209
210 async fn process_batch(&self, events: Vec<StreamingEvent>) -> Result<(), StreamError> {
211 info!("Processed batch of {} events", events.len());
212 Ok(())
213 }
214
215 fn name(&self) -> &'static str {
216 "mock_processor"
217 }
218
219 async fn health_check(&self) -> Result<(), StreamError> {
220 Ok(())
221 }
222 }
223
224 #[tokio::test]
225 async fn test_event_sender() {
226 let processor = MockProcessor;
227 let config = StreamingConfig::default();
228 let stream_processor = EventStreamProcessor::new(processor, config);
229
230 let sender = stream_processor.event_sender();
231
232 let cyber_event = CyberEvent::NetworkConnection {
234 source_ip: "192.168.1.1".to_string(),
235 dest_ip: "10.0.0.1".to_string(),
236 port: 443,
237 protocol: "tcp".to_string(),
238 timestamp: 1640995200,
239 };
240
241 sender.send_security_event(cyber_event, "test_sensor".to_string()).unwrap();
242
243 let actions = vec![];
245 sender.send_reasoning_result(actions, 150, 5).unwrap();
246
247 sender.send_anomaly(2.5, 2.0, "login_attempts".to_string()).unwrap();
249
250 sender.send_metrics(45.5, 67.8, 150).unwrap();
252
253 assert!(stream_processor.health_check().await.is_ok());
255 }
256
257 #[test]
258 fn test_stream_error_display() {
259 let err = StreamError::ChannelClosed;
260 assert_eq!(err.to_string(), "Channel closed");
261
262 let err = StreamError::ConnectionError("connection failed".to_string());
263 assert_eq!(err.to_string(), "Connection error: connection failed");
264 }
265}