oxirs_vec/real_time_embedding_pipeline/
streaming.rs

1//! Stream processing components for the real-time embedding pipeline
2
3use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Duration;
7
8use crate::real_time_embedding_pipeline::{
9    traits::{ContentItem, HealthStatus},
10    types::{StreamState, StreamStatus},
11};
12
13/// Configuration for stream processors
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct StreamConfig {
16    /// Stream identifier
17    pub stream_id: String,
18    /// Buffer size for the stream
19    pub buffer_size: usize,
20    /// Processing timeout
21    pub timeout: Duration,
22    /// Maximum retry attempts
23    pub max_retries: usize,
24    /// Enable compression
25    pub enable_compression: bool,
26}
27
28impl Default for StreamConfig {
29    fn default() -> Self {
30        Self {
31            stream_id: "default".to_string(),
32            buffer_size: 1000,
33            timeout: Duration::from_secs(30),
34            max_retries: 3,
35            enable_compression: false,
36        }
37    }
38}
39
40/// Stream processor configuration
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct StreamProcessorConfig {
43    /// Maximum concurrent streams
44    pub max_concurrent_streams: usize,
45    /// Stream timeout settings
46    pub timeout_config: TimeoutConfig,
47    /// Buffer management settings
48    pub buffer_config: BufferConfig,
49    /// Error handling settings
50    pub error_config: ErrorConfig,
51}
52
53impl Default for StreamProcessorConfig {
54    fn default() -> Self {
55        Self {
56            max_concurrent_streams: 10,
57            timeout_config: TimeoutConfig::default(),
58            buffer_config: BufferConfig::default(),
59            error_config: ErrorConfig::default(),
60        }
61    }
62}
63
64/// Timeout configuration for stream processing
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct TimeoutConfig {
67    /// Connection timeout
68    pub connection_timeout: Duration,
69    /// Read timeout
70    pub read_timeout: Duration,
71    /// Write timeout
72    pub write_timeout: Duration,
73    /// Idle timeout
74    pub idle_timeout: Duration,
75}
76
77impl Default for TimeoutConfig {
78    fn default() -> Self {
79        Self {
80            connection_timeout: Duration::from_secs(10),
81            read_timeout: Duration::from_secs(30),
82            write_timeout: Duration::from_secs(30),
83            idle_timeout: Duration::from_secs(300),
84        }
85    }
86}
87
88/// Buffer configuration for stream processing
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct BufferConfig {
91    /// Initial buffer size
92    pub initial_size: usize,
93    /// Maximum buffer size
94    pub max_size: usize,
95    /// Growth factor when resizing
96    pub growth_factor: f64,
97    /// Enable adaptive sizing
98    pub adaptive_sizing: bool,
99}
100
101impl Default for BufferConfig {
102    fn default() -> Self {
103        Self {
104            initial_size: 1000,
105            max_size: 100000,
106            growth_factor: 1.5,
107            adaptive_sizing: true,
108        }
109    }
110}
111
112/// Error handling configuration
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ErrorConfig {
115    /// Maximum retry attempts
116    pub max_retries: usize,
117    /// Retry delay
118    pub retry_delay: Duration,
119    /// Exponential backoff factor
120    pub backoff_factor: f64,
121    /// Maximum retry delay
122    pub max_retry_delay: Duration,
123    /// Enable circuit breaker
124    pub enable_circuit_breaker: bool,
125}
126
127impl Default for ErrorConfig {
128    fn default() -> Self {
129        Self {
130            max_retries: 3,
131            retry_delay: Duration::from_millis(100),
132            backoff_factor: 2.0,
133            max_retry_delay: Duration::from_secs(30),
134            enable_circuit_breaker: true,
135        }
136    }
137}
138
139/// Stream processor for handling content streams
140pub struct StreamProcessor {
141    /// Stream identifier
142    stream_id: String,
143    /// Stream configuration
144    config: StreamConfig,
145    /// Running state
146    is_running: AtomicBool,
147    /// Current stream state
148    state: StreamState,
149}
150
151impl StreamProcessor {
152    /// Create a new stream processor
153    pub fn new(stream_id: String, config: StreamConfig) -> Result<Self> {
154        let state = StreamState {
155            stream_id: stream_id.clone(),
156            offset: 0,
157            last_processed: std::time::SystemTime::now(),
158            status: StreamStatus::Initializing,
159            error_count: 0,
160            last_error: None,
161        };
162
163        Ok(Self {
164            stream_id,
165            config,
166            is_running: AtomicBool::new(false),
167            state,
168        })
169    }
170
171    /// Start the stream processor
172    pub async fn start(&self) -> Result<()> {
173        if self.is_running.load(Ordering::Acquire) {
174            return Err(anyhow::anyhow!("Stream processor is already running"));
175        }
176
177        self.is_running.store(true, Ordering::Release);
178
179        // Initialize stream processing
180        self.initialize_stream().await?;
181
182        Ok(())
183    }
184
185    /// Stop the stream processor
186    pub async fn stop(&self) -> Result<()> {
187        self.is_running.store(false, Ordering::Release);
188
189        // Cleanup resources
190        self.cleanup_stream().await?;
191
192        Ok(())
193    }
194
195    /// Process a content item
196    pub async fn process_item(&self, item: ContentItem) -> Result<()> {
197        if !self.is_running.load(Ordering::Acquire) {
198            return Err(anyhow::anyhow!("Stream processor is not running"));
199        }
200
201        // Process the content item
202        self.handle_content_item(item).await?;
203
204        Ok(())
205    }
206
207    /// Get stream state
208    pub fn get_state(&self) -> &StreamState {
209        &self.state
210    }
211
212    /// Get stream configuration
213    pub fn get_config(&self) -> &StreamConfig {
214        &self.config
215    }
216
217    /// Check stream health
218    pub async fn health_check(&self) -> Result<HealthStatus> {
219        if !self.is_running.load(Ordering::Acquire) {
220            return Ok(HealthStatus::Unhealthy {
221                message: "Stream processor is not running".to_string(),
222            });
223        }
224
225        // Check various health metrics
226        if self.state.error_count > 10 {
227            return Ok(HealthStatus::Warning {
228                message: format!("High error count: {}", self.state.error_count),
229            });
230        }
231
232        Ok(HealthStatus::Healthy)
233    }
234
235    /// Check if the processor is running
236    pub fn is_running(&self) -> bool {
237        self.is_running.load(Ordering::Acquire)
238    }
239
240    // Private helper methods
241
242    async fn initialize_stream(&self) -> Result<()> {
243        // Initialize stream resources
244        tokio::time::sleep(Duration::from_millis(10)).await;
245        Ok(())
246    }
247
248    async fn cleanup_stream(&self) -> Result<()> {
249        // Cleanup stream resources
250        tokio::time::sleep(Duration::from_millis(10)).await;
251        Ok(())
252    }
253
254    async fn handle_content_item(&self, _item: ContentItem) -> Result<()> {
255        // Process content item
256        // This would typically involve:
257        // 1. Validation
258        // 2. Embedding generation
259        // 3. Index updates
260        // 4. Quality checks
261        tokio::time::sleep(Duration::from_millis(1)).await;
262        Ok(())
263    }
264}
265
266/// Stream multiplexer for handling multiple streams
267pub struct StreamMultiplexer {
268    /// Active stream processors
269    processors: std::sync::RwLock<std::collections::HashMap<String, StreamProcessor>>,
270    /// Configuration
271    config: StreamProcessorConfig,
272}
273
274impl StreamMultiplexer {
275    /// Create a new stream multiplexer
276    pub fn new(config: StreamProcessorConfig) -> Self {
277        Self {
278            processors: std::sync::RwLock::new(std::collections::HashMap::new()),
279            config,
280        }
281    }
282
283    /// Add a stream processor
284    pub fn add_processor(&self, processor: StreamProcessor) -> Result<()> {
285        let mut processors = self
286            .processors
287            .write()
288            .map_err(|_| anyhow::anyhow!("Failed to acquire processors lock"))?;
289
290        let stream_id = processor.stream_id.clone();
291        processors.insert(stream_id, processor);
292        Ok(())
293    }
294
295    /// Remove a stream processor
296    pub async fn remove_processor(&self, stream_id: &str) -> Result<()> {
297        let processor = {
298            let mut processors = self
299                .processors
300                .write()
301                .map_err(|_| anyhow::anyhow!("Failed to acquire processors lock"))?;
302            processors.remove(stream_id)
303        };
304
305        if let Some(processor) = processor {
306            processor.stop().await?;
307        }
308
309        Ok(())
310    }
311
312    /// Get processor count
313    pub fn processor_count(&self) -> usize {
314        self.processors.read().map(|p| p.len()).unwrap_or(0)
315    }
316
317    /// Check multiplexer health
318    pub async fn health_check(&self) -> Result<HealthStatus> {
319        let processor_count = {
320            let processors = self
321                .processors
322                .read()
323                .map_err(|_| anyhow::anyhow!("Failed to acquire processors lock"))?;
324            processors.len()
325        };
326
327        let unhealthy_count = {
328            // For now, return a simple health check without holding mutex across await
329            // TODO: Implement proper async health checking mechanism
330            0
331        };
332
333        if unhealthy_count == 0 {
334            Ok(HealthStatus::Healthy)
335        } else if unhealthy_count < processor_count {
336            Ok(HealthStatus::Warning {
337                message: format!("{unhealthy_count} processors are unhealthy"),
338            })
339        } else {
340            Ok(HealthStatus::Unhealthy {
341                message: "All processors are unhealthy".to_string(),
342            })
343        }
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    #[test]
352    fn test_stream_config_default() {
353        let config = StreamConfig::default();
354        assert_eq!(config.buffer_size, 1000);
355        assert_eq!(config.max_retries, 3);
356    }
357
358    #[tokio::test]
359    async fn test_stream_processor_creation() {
360        let config = StreamConfig::default();
361        let processor = StreamProcessor::new("test_stream".to_string(), config);
362        assert!(processor.is_ok());
363    }
364
365    #[tokio::test]
366    async fn test_stream_processor_start_stop() {
367        let config = StreamConfig::default();
368        let processor = StreamProcessor::new("test_stream".to_string(), config).unwrap();
369
370        assert!(!processor.is_running());
371
372        let start_result = processor.start().await;
373        assert!(start_result.is_ok());
374        assert!(processor.is_running());
375
376        let stop_result = processor.stop().await;
377        assert!(stop_result.is_ok());
378    }
379
380    #[test]
381    fn test_stream_multiplexer() {
382        let config = StreamProcessorConfig::default();
383        let multiplexer = StreamMultiplexer::new(config);
384        assert_eq!(multiplexer.processor_count(), 0);
385    }
386}