fukurow_streaming/
processor.rs

1//! # Stream Processor
2//!
3//! Core streaming processor for handling events
4
5use crate::{StreamingEvent, StreamingConfig, StreamError};
6use async_trait::async_trait;
7use futures::stream::{Stream, StreamExt};
8use std::pin::Pin;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tracing::{error, info, warn};
12
13/// Stream processor trait
14#[async_trait]
15pub trait StreamProcessor: Send + Sync {
16    /// Process a single event
17    async fn process_event(&self, event: StreamingEvent) -> Result<(), StreamError>;
18
19    /// Process a batch of events
20    async fn process_batch(&self, events: Vec<StreamingEvent>) -> Result<(), StreamError>;
21
22    /// Get processor name
23    fn name(&self) -> &'static str;
24
25    /// Get processor health status
26    async fn health_check(&self) -> Result<(), StreamError>;
27}
28
29/// Event stream processor
30pub struct EventStreamProcessor<P: StreamProcessor> {
31    processor: Arc<P>,
32    config: StreamingConfig,
33    event_tx: mpsc::UnboundedSender<StreamingEvent>,
34    event_rx: mpsc::UnboundedReceiver<StreamingEvent>,
35}
36
37impl<P: StreamProcessor + 'static> EventStreamProcessor<P> {
38    /// Create a new event stream processor
39    pub fn new(processor: P, config: StreamingConfig) -> Self {
40        let (event_tx, event_rx) = mpsc::unbounded_channel();
41
42        Self {
43            processor: Arc::new(processor),
44            config,
45            event_tx,
46            event_rx,
47        }
48    }
49
50    /// Start processing events
51    pub async fn start_processing(mut self) -> Result<(), StreamError> {
52        info!("Starting event stream processor: {}", self.processor.name());
53
54        let processor = Arc::clone(&self.processor);
55        let batch_size = self.config.processing.batch_size;
56        let processing_timeout = std::time::Duration::from_secs(
57            self.config.processing.processing_timeout_seconds
58        );
59
60        tokio::spawn(async move {
61            let mut batch = Vec::with_capacity(batch_size);
62            let mut last_process_time = std::time::Instant::now();
63
64            while let Some(event) = self.event_rx.recv().await {
65                batch.push(event);
66
67                // Process batch if it's full or timeout has passed
68                let should_process = batch.len() >= batch_size ||
69                    last_process_time.elapsed() >= processing_timeout;
70
71                if should_process {
72                    if let Err(e) = processor.process_batch(batch).await {
73                        error!("Failed to process batch: {}", e);
74                    }
75                    batch = Vec::with_capacity(batch_size);
76                    last_process_time = std::time::Instant::now();
77                }
78            }
79
80            // Process remaining events
81            if !batch.is_empty() {
82                if let Err(e) = processor.process_batch(batch).await {
83                    error!("Failed to process final batch: {}", e);
84                }
85            }
86        });
87
88        Ok(())
89    }
90
91    /// Send event to processor
92    pub fn send_event(&self, event: StreamingEvent) -> Result<(), StreamError> {
93        self.event_tx.send(event)
94            .map_err(|_| StreamError::ChannelClosed)
95    }
96
97    /// Create event sender handle
98    pub fn event_sender(&self) -> EventSender {
99        EventSender {
100            sender: self.event_tx.clone(),
101        }
102    }
103
104    /// Get processor health
105    pub async fn health_check(&self) -> Result<(), StreamError> {
106        self.processor.health_check().await
107    }
108}
109
110/// Event sender handle for external components
111#[derive(Clone)]
112pub struct EventSender {
113    sender: mpsc::UnboundedSender<StreamingEvent>,
114}
115
116impl EventSender {
117    /// Send an event
118    pub fn send(&self, event: StreamingEvent) -> Result<(), StreamError> {
119        self.sender.send(event)
120            .map_err(|_| StreamError::ChannelClosed)
121    }
122
123    /// Send security event
124    pub fn send_security_event(&self, event: fukurow_core::model::CyberEvent, source: String) -> Result<(), StreamError> {
125        let streaming_event = StreamingEvent::SecurityEvent {
126            event,
127            timestamp: chrono::Utc::now(),
128            source,
129        };
130        self.send(streaming_event)
131    }
132
133    /// Send reasoning result
134    pub fn send_reasoning_result(&self, actions: Vec<fukurow_core::model::SecurityAction>, execution_time_ms: u64, event_count: usize) -> Result<(), StreamError> {
135        let streaming_event = StreamingEvent::ReasoningResult {
136            actions,
137            execution_time_ms,
138            event_count,
139            timestamp: chrono::Utc::now(),
140        };
141        self.send(streaming_event)
142    }
143
144    /// Send anomaly detection result
145    pub fn send_anomaly(&self, score: f64, threshold: f64, metric: String) -> Result<(), StreamError> {
146        let streaming_event = StreamingEvent::AnomalyDetected {
147            score,
148            threshold,
149            metric,
150            timestamp: chrono::Utc::now(),
151        };
152        self.send(streaming_event)
153    }
154
155    /// Send system metrics
156    pub fn send_metrics(&self, cpu_usage: f64, memory_usage: f64, active_connections: u32) -> Result<(), StreamError> {
157        let streaming_event = StreamingEvent::SystemMetrics {
158            cpu_usage,
159            memory_usage,
160            active_connections,
161            timestamp: chrono::Utc::now(),
162        };
163        self.send(streaming_event)
164    }
165}
166
167/// Stream consumer trait
168#[async_trait]
169pub trait StreamConsumer: Send + Sync {
170    /// Consume events from stream
171    async fn consume(&self) -> Pin<Box<dyn Stream<Item = Result<StreamingEvent, StreamError>> + Send>>;
172
173    /// Get consumer name
174    fn name(&self) -> &'static str;
175
176    /// Get consumer health status
177    async fn health_check(&self) -> Result<(), StreamError>;
178}
179
180/// Stream producer trait
181#[async_trait]
182pub trait StreamProducer: Send + Sync {
183    /// Produce event to stream
184    async fn produce(&self, event: StreamingEvent) -> Result<(), StreamError>;
185
186    /// Produce batch of events
187    async fn produce_batch(&self, events: Vec<StreamingEvent>) -> Result<(), StreamError>;
188
189    /// Get producer name
190    fn name(&self) -> &'static str;
191
192    /// Get producer health status
193    async fn health_check(&self) -> Result<(), StreamError>;
194}
195
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use fukurow_core::model::CyberEvent;
201
202    struct MockProcessor;
203
204    #[async_trait]
205    impl StreamProcessor for MockProcessor {
206        async fn process_event(&self, _event: StreamingEvent) -> Result<(), StreamError> {
207            Ok(())
208        }
209
210        async fn process_batch(&self, events: Vec<StreamingEvent>) -> Result<(), StreamError> {
211            info!("Processed batch of {} events", events.len());
212            Ok(())
213        }
214
215        fn name(&self) -> &'static str {
216            "mock_processor"
217        }
218
219        async fn health_check(&self) -> Result<(), StreamError> {
220            Ok(())
221        }
222    }
223
224    #[tokio::test]
225    async fn test_event_sender() {
226        let processor = MockProcessor;
227        let config = StreamingConfig::default();
228        let stream_processor = EventStreamProcessor::new(processor, config);
229
230        let sender = stream_processor.event_sender();
231
232        // Send security event
233        let cyber_event = CyberEvent::NetworkConnection {
234            source_ip: "192.168.1.1".to_string(),
235            dest_ip: "10.0.0.1".to_string(),
236            port: 443,
237            protocol: "tcp".to_string(),
238            timestamp: 1640995200,
239        };
240
241        sender.send_security_event(cyber_event, "test_sensor".to_string()).unwrap();
242
243        // Send reasoning result
244        let actions = vec![];
245        sender.send_reasoning_result(actions, 150, 5).unwrap();
246
247        // Send anomaly
248        sender.send_anomaly(2.5, 2.0, "login_attempts".to_string()).unwrap();
249
250        // Send metrics
251        sender.send_metrics(45.5, 67.8, 150).unwrap();
252
253        // Health check
254        assert!(stream_processor.health_check().await.is_ok());
255    }
256
257    #[test]
258    fn test_stream_error_display() {
259        let err = StreamError::ChannelClosed;
260        assert_eq!(err.to_string(), "Channel closed");
261
262        let err = StreamError::ConnectionError("connection failed".to_string());
263        assert_eq!(err.to_string(), "Connection error: connection failed");
264    }
265}