1use std::sync::Arc;
43use std::time::{Duration as StdDuration, Instant};
44use chrono::{DateTime, Duration as ChronoDuration, Utc};
45use futures::{Stream, StreamExt};
46use serde::{Deserialize, Serialize};
47use tokio::sync::{RwLock, Semaphore};
48
49use crate::optimized::{OptimizedConfig, OptimizedDiscoveryEngine, SignificantPattern};
50use crate::ruvector_native::SemanticVector;
51use crate::Result;
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct StreamingConfig {
56 pub discovery_config: OptimizedConfig,
58
59 pub window_size: StdDuration,
61
62 pub slide_interval: Option<StdDuration>,
64
65 pub max_buffer_size: usize,
67
68 pub processing_timeout: Option<StdDuration>,
70
71 pub batch_size: usize,
73
74 pub auto_detect_patterns: bool,
76
77 pub detection_interval: usize,
79
80 pub max_concurrency: usize,
82}
83
84impl Default for StreamingConfig {
85 fn default() -> Self {
86 Self {
87 discovery_config: OptimizedConfig::default(),
88 window_size: StdDuration::from_secs(60),
89 slide_interval: Some(StdDuration::from_secs(30)),
90 max_buffer_size: 10000,
91 processing_timeout: Some(StdDuration::from_secs(5)),
92 batch_size: 100,
93 auto_detect_patterns: true,
94 detection_interval: 100,
95 max_concurrency: 4,
96 }
97 }
98}
99
100#[derive(Debug, Clone, Default, Serialize, Deserialize)]
102pub struct StreamingMetrics {
103 pub vectors_processed: u64,
105
106 pub patterns_detected: u64,
108
109 pub avg_latency_ms: f64,
111
112 pub throughput_per_sec: f64,
114
115 pub windows_processed: u64,
117
118 pub bytes_processed: u64,
120
121 pub backpressure_events: u64,
123
124 pub errors: u64,
126
127 pub peak_buffer_size: usize,
129
130 pub start_time: Option<DateTime<Utc>>,
132
133 pub last_update: Option<DateTime<Utc>>,
135}
136
137impl StreamingMetrics {
138 pub fn uptime_secs(&self) -> f64 {
140 if let (Some(start), Some(last)) = (self.start_time, self.last_update) {
141 (last - start).num_milliseconds() as f64 / 1000.0
142 } else {
143 0.0
144 }
145 }
146
147 pub fn calculate_throughput(&mut self) {
149 let uptime = self.uptime_secs();
150 if uptime > 0.0 {
151 self.throughput_per_sec = self.vectors_processed as f64 / uptime;
152 }
153 }
154}
155
156#[derive(Debug, Clone)]
158struct TimeWindow {
159 start: DateTime<Utc>,
160 end: DateTime<Utc>,
161 vectors: Vec<SemanticVector>,
162}
163
164impl TimeWindow {
165 fn new(start: DateTime<Utc>, duration: ChronoDuration) -> Self {
166 Self {
167 start,
168 end: start + duration,
169 vectors: Vec::new(),
170 }
171 }
172
173 fn contains(&self, timestamp: DateTime<Utc>) -> bool {
174 timestamp >= self.start && timestamp < self.end
175 }
176
177 fn add_vector(&mut self, vector: SemanticVector) {
178 self.vectors.push(vector);
179 }
180
181 fn is_complete(&self, now: DateTime<Utc>) -> bool {
182 now >= self.end
183 }
184}
185
186pub struct StreamingEngine {
188 config: StreamingConfig,
190
191 engine: Arc<RwLock<OptimizedDiscoveryEngine>>,
193
194 on_pattern: Arc<RwLock<Option<Box<dyn Fn(SignificantPattern) + Send + Sync>>>>,
196
197 metrics: Arc<RwLock<StreamingMetrics>>,
199
200 windows: Arc<RwLock<Vec<TimeWindow>>>,
202
203 semaphore: Arc<Semaphore>,
205
206 latencies: Arc<RwLock<Vec<f64>>>,
208}
209
210impl StreamingEngine {
211 pub fn new(config: StreamingConfig) -> Self {
213 let discovery_config = config.discovery_config.clone();
214 let max_buffer = config.max_buffer_size;
215
216 let mut metrics = StreamingMetrics::default();
217 metrics.start_time = Some(Utc::now());
218
219 Self {
220 config,
221 engine: Arc::new(RwLock::new(OptimizedDiscoveryEngine::new(discovery_config))),
222 on_pattern: Arc::new(RwLock::new(None)),
223 metrics: Arc::new(RwLock::new(metrics)),
224 windows: Arc::new(RwLock::new(Vec::new())),
225 semaphore: Arc::new(Semaphore::new(max_buffer)),
226 latencies: Arc::new(RwLock::new(Vec::with_capacity(1000))),
227 }
228 }
229
230 pub async fn set_pattern_callback<F>(&mut self, callback: F)
232 where
233 F: Fn(SignificantPattern) + Send + Sync + 'static,
234 {
235 let mut on_pattern = self.on_pattern.write().await;
236 *on_pattern = Some(Box::new(callback));
237 }
238
239 pub async fn ingest_stream<S>(&mut self, stream: S) -> Result<()>
241 where
242 S: Stream<Item = SemanticVector> + Send,
243 {
244 let mut stream = Box::pin(stream);
245 let mut vector_count = 0_u64;
246 let mut current_batch = Vec::with_capacity(self.config.batch_size);
247
248 let window_duration = ChronoDuration::from_std(self.config.window_size)
250 .map_err(|e| crate::FrameworkError::Config(format!("Invalid window size: {}", e)))?;
251
252 let mut last_window_start = Utc::now();
253 self.create_window(last_window_start, window_duration).await;
254
255 while let Some(vector) = stream.next().await {
256 let _permit = self.semaphore.acquire().await.map_err(|e| {
258 crate::FrameworkError::Ingestion(format!("Backpressure semaphore error: {}", e))
259 })?;
260
261 let start = Instant::now();
262
263 if let Some(slide_interval) = self.config.slide_interval {
265 let slide_duration = ChronoDuration::from_std(slide_interval)
266 .map_err(|e| crate::FrameworkError::Config(format!("Invalid slide interval: {}", e)))?;
267
268 let now = Utc::now();
269 if (now - last_window_start) >= slide_duration {
270 self.create_window(now, window_duration).await;
271 last_window_start = now;
272 }
273 }
274
275 self.add_to_windows(vector.clone()).await;
277 current_batch.push(vector);
278 vector_count += 1;
279
280 if current_batch.len() >= self.config.batch_size {
282 self.process_batch(¤t_batch).await?;
283 current_batch.clear();
284 }
285
286 if self.config.auto_detect_patterns && vector_count % self.config.detection_interval as u64 == 0 {
288 self.detect_patterns().await?;
289 }
290
291 self.close_completed_windows().await?;
293
294 let latency_ms = start.elapsed().as_micros() as f64 / 1000.0;
296 self.record_latency(latency_ms).await;
297
298 let mut metrics = self.metrics.write().await;
300 metrics.vectors_processed = vector_count;
301 metrics.last_update = Some(Utc::now());
302 }
303
304 if !current_batch.is_empty() {
306 self.process_batch(¤t_batch).await?;
307 }
308
309 if self.config.auto_detect_patterns {
311 self.detect_patterns().await?;
312 }
313
314 self.close_all_windows().await?;
316
317 let mut metrics = self.metrics.write().await;
319 metrics.calculate_throughput();
320
321 Ok(())
322 }
323
324 async fn process_batch(&self, vectors: &[SemanticVector]) -> Result<()> {
326 let batch_size = self.config.batch_size;
327 let chunks: Vec<_> = vectors.chunks(batch_size).collect();
328
329 let semaphore = Arc::new(Semaphore::new(self.config.max_concurrency));
331 let mut tasks = Vec::new();
332
333 for chunk in chunks {
334 let chunk_vec = chunk.to_vec();
335 let engine = self.engine.clone();
336 let sem = semaphore.clone();
337
338 let task = tokio::spawn(async move {
339 let _permit = sem.acquire().await.ok()?;
340 let mut engine_guard = engine.write().await;
341
342 #[cfg(feature = "parallel")]
343 {
344 engine_guard.add_vectors_batch(chunk_vec);
345 }
346
347 #[cfg(not(feature = "parallel"))]
348 {
349 for vector in chunk_vec {
350 engine_guard.add_vector(vector);
351 }
352 }
353
354 Some(())
355 });
356
357 tasks.push(task);
358 }
359
360 for task in tasks {
362 if let Err(e) = task.await {
363 tracing::warn!("Batch processing task failed: {}", e);
364 let mut metrics = self.metrics.write().await;
365 metrics.errors += 1;
366 }
367 }
368
369 Ok(())
370 }
371
372 async fn create_window(&self, start: DateTime<Utc>, duration: ChronoDuration) {
374 let window = TimeWindow::new(start, duration);
375 let mut windows = self.windows.write().await;
376 windows.push(window);
377 }
378
379 async fn add_to_windows(&self, vector: SemanticVector) {
381 let timestamp = vector.timestamp;
382 let mut windows = self.windows.write().await;
383
384 for window in windows.iter_mut() {
385 if window.contains(timestamp) {
386 window.add_vector(vector.clone());
387 }
388 }
389 }
390
391 async fn close_completed_windows(&self) -> Result<()> {
393 let now = Utc::now();
394 let mut windows = self.windows.write().await;
395
396 let (completed, active): (Vec<_>, Vec<_>) = windows
398 .drain(..)
399 .partition(|w| w.is_complete(now));
400
401 *windows = active;
402 drop(windows); for window in completed {
406 self.process_window(window).await?;
407
408 let mut metrics = self.metrics.write().await;
409 metrics.windows_processed += 1;
410 }
411
412 Ok(())
413 }
414
415 async fn close_all_windows(&self) -> Result<()> {
417 let mut windows = self.windows.write().await;
418 let all_windows: Vec<_> = windows.drain(..).collect();
419 drop(windows);
420
421 for window in all_windows {
422 self.process_window(window).await?;
423 }
424
425 Ok(())
426 }
427
428 async fn process_window(&self, window: TimeWindow) -> Result<()> {
430 if window.vectors.is_empty() {
431 return Ok(());
432 }
433
434 tracing::debug!(
435 "Processing window: {} vectors from {} to {}",
436 window.vectors.len(),
437 window.start,
438 window.end
439 );
440
441 self.process_batch(&window.vectors).await?;
443
444 if self.config.auto_detect_patterns {
446 self.detect_patterns().await?;
447 }
448
449 Ok(())
450 }
451
452 async fn detect_patterns(&self) -> Result<()> {
454 let patterns = {
455 let mut engine = self.engine.write().await;
456 engine.detect_patterns_with_significance()
457 };
458
459 let pattern_count = patterns.len();
460
461 let on_pattern = self.on_pattern.read().await;
463 if let Some(callback) = on_pattern.as_ref() {
464 for pattern in patterns {
465 if pattern.is_significant {
466 callback(pattern);
467 }
468 }
469 }
470
471 let mut metrics = self.metrics.write().await;
473 metrics.patterns_detected += pattern_count as u64;
474
475 Ok(())
476 }
477
478 async fn record_latency(&self, latency_ms: f64) {
480 let mut latencies = self.latencies.write().await;
481 latencies.push(latency_ms);
482
483 let len = latencies.len();
485 if len > 1000 {
486 latencies.drain(0..len - 1000);
487 }
488
489 let avg = latencies.iter().sum::<f64>() / latencies.len() as f64;
491 let mut metrics = self.metrics.write().await;
492 metrics.avg_latency_ms = avg;
493 }
494
495 pub async fn metrics(&self) -> StreamingMetrics {
497 let mut metrics = self.metrics.read().await.clone();
498 metrics.calculate_throughput();
499 metrics
500 }
501
502 pub async fn engine_stats(&self) -> crate::optimized::OptimizedStats {
504 let engine = self.engine.read().await;
505 engine.stats()
506 }
507
508 pub async fn reset_metrics(&self) {
510 let mut metrics = self.metrics.write().await;
511 *metrics = StreamingMetrics::default();
512 metrics.start_time = Some(Utc::now());
513
514 let mut latencies = self.latencies.write().await;
515 latencies.clear();
516 }
517}
518
519pub struct StreamingEngineBuilder {
521 config: StreamingConfig,
522}
523
524impl StreamingEngineBuilder {
525 pub fn new() -> Self {
527 Self {
528 config: StreamingConfig::default(),
529 }
530 }
531
532 pub fn window_size(mut self, duration: StdDuration) -> Self {
534 self.config.window_size = duration;
535 self
536 }
537
538 pub fn slide_interval(mut self, duration: StdDuration) -> Self {
540 self.config.slide_interval = Some(duration);
541 self
542 }
543
544 pub fn tumbling_windows(mut self) -> Self {
546 self.config.slide_interval = None;
547 self
548 }
549
550 pub fn max_buffer_size(mut self, size: usize) -> Self {
552 self.config.max_buffer_size = size;
553 self
554 }
555
556 pub fn batch_size(mut self, size: usize) -> Self {
558 self.config.batch_size = size;
559 self
560 }
561
562 pub fn max_concurrency(mut self, concurrency: usize) -> Self {
564 self.config.max_concurrency = concurrency;
565 self
566 }
567
568 pub fn detection_interval(mut self, interval: usize) -> Self {
570 self.config.detection_interval = interval;
571 self
572 }
573
574 pub fn discovery_config(mut self, config: OptimizedConfig) -> Self {
576 self.config.discovery_config = config;
577 self
578 }
579
580 pub fn build(self) -> StreamingEngine {
582 StreamingEngine::new(self.config)
583 }
584}
585
586impl Default for StreamingEngineBuilder {
587 fn default() -> Self {
588 Self::new()
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595 use futures::stream;
596 use crate::ruvector_native::Domain;
597 use std::collections::HashMap;
598
599 fn create_test_vector(id: &str, domain: Domain) -> SemanticVector {
600 SemanticVector {
601 id: id.to_string(),
602 embedding: vec![0.1, 0.2, 0.3, 0.4],
603 domain,
604 timestamp: Utc::now(),
605 metadata: HashMap::new(),
606 }
607 }
608
609 #[tokio::test]
610 async fn test_streaming_engine_creation() {
611 let config = StreamingConfig::default();
612 let engine = StreamingEngine::new(config);
613 let metrics = engine.metrics().await;
614
615 assert_eq!(metrics.vectors_processed, 0);
616 assert_eq!(metrics.patterns_detected, 0);
617 }
618
619 #[tokio::test]
620 async fn test_pattern_callback() {
621 let config = StreamingConfig {
622 auto_detect_patterns: true,
623 detection_interval: 2,
624 ..Default::default()
625 };
626
627 let mut engine = StreamingEngine::new(config);
628
629 let pattern_count = Arc::new(RwLock::new(0_u64));
630 let pc = pattern_count.clone();
631
632 engine.set_pattern_callback(move |_pattern| {
633 let pc = pc.clone();
634 tokio::spawn(async move {
635 let mut count = pc.write().await;
636 *count += 1;
637 });
638 }).await;
639
640 let vectors = vec![
642 create_test_vector("v1", Domain::Climate),
643 create_test_vector("v2", Domain::Climate),
644 create_test_vector("v3", Domain::Finance),
645 ];
646
647 let vector_stream = stream::iter(vectors);
648 engine.ingest_stream(vector_stream).await.unwrap();
649
650 let metrics = engine.metrics().await;
651 assert!(metrics.vectors_processed >= 3);
652 }
653
654 #[tokio::test]
655 async fn test_windowed_processing() {
656 let config = StreamingConfig {
657 window_size: StdDuration::from_millis(100),
658 slide_interval: Some(StdDuration::from_millis(50)),
659 auto_detect_patterns: false,
660 ..Default::default()
661 };
662
663 let mut engine = StreamingEngine::new(config);
664
665 let vectors = vec![
666 create_test_vector("v1", Domain::Climate),
667 create_test_vector("v2", Domain::Climate),
668 ];
669
670 let vector_stream = stream::iter(vectors);
671 engine.ingest_stream(vector_stream).await.unwrap();
672
673 let metrics = engine.metrics().await;
674 assert_eq!(metrics.vectors_processed, 2);
675 }
676
677 #[tokio::test]
678 async fn test_builder() {
679 let engine = StreamingEngineBuilder::new()
680 .window_size(StdDuration::from_secs(30))
681 .slide_interval(StdDuration::from_secs(15))
682 .max_buffer_size(5000)
683 .batch_size(50)
684 .build();
685
686 let metrics = engine.metrics().await;
687 assert_eq!(metrics.vectors_processed, 0);
688 }
689
690 #[tokio::test]
691 async fn test_metrics_calculation() {
692 let mut metrics = StreamingMetrics {
693 vectors_processed: 1000,
694 start_time: Some(Utc::now() - ChronoDuration::seconds(10)),
695 last_update: Some(Utc::now()),
696 ..Default::default()
697 };
698
699 metrics.calculate_throughput();
700 assert!(metrics.throughput_per_sec > 0.0);
701 assert!(metrics.uptime_secs() >= 9.0 && metrics.uptime_secs() <= 11.0);
702 }
703}