oxirs_vec/real_time_embedding_pipeline/
streaming.rs1use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Duration;
7
8use crate::real_time_embedding_pipeline::{
9 traits::{ContentItem, HealthStatus},
10 types::{StreamState, StreamStatus},
11};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct StreamConfig {
16 pub stream_id: String,
18 pub buffer_size: usize,
20 pub timeout: Duration,
22 pub max_retries: usize,
24 pub enable_compression: bool,
26}
27
28impl Default for StreamConfig {
29 fn default() -> Self {
30 Self {
31 stream_id: "default".to_string(),
32 buffer_size: 1000,
33 timeout: Duration::from_secs(30),
34 max_retries: 3,
35 enable_compression: false,
36 }
37 }
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct StreamProcessorConfig {
43 pub max_concurrent_streams: usize,
45 pub timeout_config: TimeoutConfig,
47 pub buffer_config: BufferConfig,
49 pub error_config: ErrorConfig,
51}
52
53impl Default for StreamProcessorConfig {
54 fn default() -> Self {
55 Self {
56 max_concurrent_streams: 10,
57 timeout_config: TimeoutConfig::default(),
58 buffer_config: BufferConfig::default(),
59 error_config: ErrorConfig::default(),
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct TimeoutConfig {
67 pub connection_timeout: Duration,
69 pub read_timeout: Duration,
71 pub write_timeout: Duration,
73 pub idle_timeout: Duration,
75}
76
77impl Default for TimeoutConfig {
78 fn default() -> Self {
79 Self {
80 connection_timeout: Duration::from_secs(10),
81 read_timeout: Duration::from_secs(30),
82 write_timeout: Duration::from_secs(30),
83 idle_timeout: Duration::from_secs(300),
84 }
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct BufferConfig {
91 pub initial_size: usize,
93 pub max_size: usize,
95 pub growth_factor: f64,
97 pub adaptive_sizing: bool,
99}
100
101impl Default for BufferConfig {
102 fn default() -> Self {
103 Self {
104 initial_size: 1000,
105 max_size: 100000,
106 growth_factor: 1.5,
107 adaptive_sizing: true,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ErrorConfig {
115 pub max_retries: usize,
117 pub retry_delay: Duration,
119 pub backoff_factor: f64,
121 pub max_retry_delay: Duration,
123 pub enable_circuit_breaker: bool,
125}
126
127impl Default for ErrorConfig {
128 fn default() -> Self {
129 Self {
130 max_retries: 3,
131 retry_delay: Duration::from_millis(100),
132 backoff_factor: 2.0,
133 max_retry_delay: Duration::from_secs(30),
134 enable_circuit_breaker: true,
135 }
136 }
137}
138
139pub struct StreamProcessor {
141 stream_id: String,
143 config: StreamConfig,
145 is_running: AtomicBool,
147 state: StreamState,
149}
150
151impl StreamProcessor {
152 pub fn new(stream_id: String, config: StreamConfig) -> Result<Self> {
154 let state = StreamState {
155 stream_id: stream_id.clone(),
156 offset: 0,
157 last_processed: std::time::SystemTime::now(),
158 status: StreamStatus::Initializing,
159 error_count: 0,
160 last_error: None,
161 };
162
163 Ok(Self {
164 stream_id,
165 config,
166 is_running: AtomicBool::new(false),
167 state,
168 })
169 }
170
171 pub async fn start(&self) -> Result<()> {
173 if self.is_running.load(Ordering::Acquire) {
174 return Err(anyhow::anyhow!("Stream processor is already running"));
175 }
176
177 self.is_running.store(true, Ordering::Release);
178
179 self.initialize_stream().await?;
181
182 Ok(())
183 }
184
185 pub async fn stop(&self) -> Result<()> {
187 self.is_running.store(false, Ordering::Release);
188
189 self.cleanup_stream().await?;
191
192 Ok(())
193 }
194
195 pub async fn process_item(&self, item: ContentItem) -> Result<()> {
197 if !self.is_running.load(Ordering::Acquire) {
198 return Err(anyhow::anyhow!("Stream processor is not running"));
199 }
200
201 self.handle_content_item(item).await?;
203
204 Ok(())
205 }
206
207 pub fn get_state(&self) -> &StreamState {
209 &self.state
210 }
211
212 pub fn get_config(&self) -> &StreamConfig {
214 &self.config
215 }
216
217 pub async fn health_check(&self) -> Result<HealthStatus> {
219 if !self.is_running.load(Ordering::Acquire) {
220 return Ok(HealthStatus::Unhealthy {
221 message: "Stream processor is not running".to_string(),
222 });
223 }
224
225 if self.state.error_count > 10 {
227 return Ok(HealthStatus::Warning {
228 message: format!("High error count: {}", self.state.error_count),
229 });
230 }
231
232 Ok(HealthStatus::Healthy)
233 }
234
235 pub fn is_running(&self) -> bool {
237 self.is_running.load(Ordering::Acquire)
238 }
239
240 async fn initialize_stream(&self) -> Result<()> {
243 tokio::time::sleep(Duration::from_millis(10)).await;
245 Ok(())
246 }
247
248 async fn cleanup_stream(&self) -> Result<()> {
249 tokio::time::sleep(Duration::from_millis(10)).await;
251 Ok(())
252 }
253
254 async fn handle_content_item(&self, _item: ContentItem) -> Result<()> {
255 tokio::time::sleep(Duration::from_millis(1)).await;
262 Ok(())
263 }
264}
265
266pub struct StreamMultiplexer {
268 processors: std::sync::RwLock<std::collections::HashMap<String, StreamProcessor>>,
270 config: StreamProcessorConfig,
272}
273
274impl StreamMultiplexer {
275 pub fn new(config: StreamProcessorConfig) -> Self {
277 Self {
278 processors: std::sync::RwLock::new(std::collections::HashMap::new()),
279 config,
280 }
281 }
282
283 pub fn add_processor(&self, processor: StreamProcessor) -> Result<()> {
285 let mut processors = self
286 .processors
287 .write()
288 .map_err(|_| anyhow::anyhow!("Failed to acquire processors lock"))?;
289
290 let stream_id = processor.stream_id.clone();
291 processors.insert(stream_id, processor);
292 Ok(())
293 }
294
295 pub async fn remove_processor(&self, stream_id: &str) -> Result<()> {
297 let processor = {
298 let mut processors = self
299 .processors
300 .write()
301 .map_err(|_| anyhow::anyhow!("Failed to acquire processors lock"))?;
302 processors.remove(stream_id)
303 };
304
305 if let Some(processor) = processor {
306 processor.stop().await?;
307 }
308
309 Ok(())
310 }
311
312 pub fn processor_count(&self) -> usize {
314 self.processors.read().map(|p| p.len()).unwrap_or(0)
315 }
316
317 pub async fn health_check(&self) -> Result<HealthStatus> {
319 let processor_count = {
320 let processors = self
321 .processors
322 .read()
323 .map_err(|_| anyhow::anyhow!("Failed to acquire processors lock"))?;
324 processors.len()
325 };
326
327 let unhealthy_count = {
328 0
331 };
332
333 if unhealthy_count == 0 {
334 Ok(HealthStatus::Healthy)
335 } else if unhealthy_count < processor_count {
336 Ok(HealthStatus::Warning {
337 message: format!("{unhealthy_count} processors are unhealthy"),
338 })
339 } else {
340 Ok(HealthStatus::Unhealthy {
341 message: "All processors are unhealthy".to_string(),
342 })
343 }
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 #[test]
352 fn test_stream_config_default() {
353 let config = StreamConfig::default();
354 assert_eq!(config.buffer_size, 1000);
355 assert_eq!(config.max_retries, 3);
356 }
357
358 #[tokio::test]
359 async fn test_stream_processor_creation() {
360 let config = StreamConfig::default();
361 let processor = StreamProcessor::new("test_stream".to_string(), config);
362 assert!(processor.is_ok());
363 }
364
365 #[tokio::test]
366 async fn test_stream_processor_start_stop() {
367 let config = StreamConfig::default();
368 let processor = StreamProcessor::new("test_stream".to_string(), config).unwrap();
369
370 assert!(!processor.is_running());
371
372 let start_result = processor.start().await;
373 assert!(start_result.is_ok());
374 assert!(processor.is_running());
375
376 let stop_result = processor.stop().await;
377 assert!(stop_result.is_ok());
378 }
379
380 #[test]
381 fn test_stream_multiplexer() {
382 let config = StreamProcessorConfig::default();
383 let multiplexer = StreamMultiplexer::new(config);
384 assert_eq!(multiplexer.processor_count(), 0);
385 }
386}