1use crate::errors::Result;
37use crate::providers::VisionProvider;
38use crate::types::OcrResult;
39use serde::{Deserialize, Serialize};
40use std::collections::VecDeque;
41use std::sync::{Arc, Mutex};
42use std::time::{Duration, Instant};
43use thiserror::Error;
44
45#[derive(Debug, Error)]
47pub enum StreamError {
48 #[error("Buffer overflow: {0}")]
49 BufferOverflow(String),
50
51 #[error("Invalid frame rate: {0}")]
52 InvalidFrameRate(String),
53
54 #[error("Processing failed: {0}")]
55 ProcessingFailed(String),
56
57 #[error("Stream closed")]
58 StreamClosed,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63pub enum SamplingStrategy {
64 All,
66
67 EveryNth(u32),
69
70 TimeInterval(u64),
72
73 ChangeDetection,
75
76 Adaptive,
78}
79
80impl Default for SamplingStrategy {
81 fn default() -> Self {
82 Self::TimeInterval(100) }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct StreamConfig {
89 pub max_fps: f64,
91
92 pub buffer_size: usize,
94
95 pub sampling_strategy: SamplingStrategy,
97
98 pub enable_smoothing: bool,
100
101 pub smoothing_window: usize,
103
104 pub change_threshold: f64,
106
107 pub enable_preprocessing: bool,
109
110 pub frame_timeout_ms: u64,
112}
113
114impl Default for StreamConfig {
115 fn default() -> Self {
116 Self {
117 max_fps: 10.0,
118 buffer_size: 30,
119 sampling_strategy: SamplingStrategy::default(),
120 enable_smoothing: true,
121 smoothing_window: 3,
122 change_threshold: 0.15,
123 enable_preprocessing: true,
124 frame_timeout_ms: 5000,
125 }
126 }
127}
128
129impl StreamConfig {
130 pub fn new() -> Self {
132 Self::default()
133 }
134
135 pub fn with_max_fps(mut self, fps: f64) -> Self {
137 self.max_fps = fps.max(0.1);
138 self
139 }
140
141 pub fn with_buffer_size(mut self, size: usize) -> Self {
143 self.buffer_size = size.max(1);
144 self
145 }
146
147 pub fn with_sampling_strategy(mut self, strategy: SamplingStrategy) -> Self {
149 self.sampling_strategy = strategy;
150 self
151 }
152
153 pub fn with_smoothing(mut self, enabled: bool) -> Self {
155 self.enable_smoothing = enabled;
156 self
157 }
158
159 pub fn with_change_threshold(mut self, threshold: f64) -> Self {
161 self.change_threshold = threshold.clamp(0.0, 1.0);
162 self
163 }
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct FrameMetadata {
169 pub frame_number: u64,
171
172 pub timestamp: std::time::SystemTime,
174
175 pub size: usize,
177
178 pub dimensions: Option<(u32, u32)>,
180
181 pub processed: bool,
183
184 pub processing_time: Option<Duration>,
186}
187
188#[derive(Debug, Clone)]
190struct BufferedFrame {
191 data: Vec<u8>,
193
194 metadata: FrameMetadata,
196
197 hash: u64,
199}
200
201impl BufferedFrame {
202 fn new(data: Vec<u8>, frame_number: u64) -> Self {
204 let hash = Self::compute_hash(&data);
205 let size = data.len();
206
207 Self {
208 data,
209 metadata: FrameMetadata {
210 frame_number,
211 timestamp: std::time::SystemTime::now(),
212 size,
213 dimensions: None,
214 processed: false,
215 processing_time: None,
216 },
217 hash,
218 }
219 }
220
221 fn compute_hash(data: &[u8]) -> u64 {
223 use std::collections::hash_map::DefaultHasher;
224 use std::hash::{Hash, Hasher};
225
226 let mut hasher = DefaultHasher::new();
227
228 let step = (data.len() / 100).max(1);
230 for i in (0..data.len()).step_by(step) {
231 data[i].hash(&mut hasher);
232 }
233
234 hasher.finish()
235 }
236
237 fn is_different_from(&self, other: &BufferedFrame, threshold: f64) -> bool {
239 if self.data.len() != other.data.len() {
240 return true;
241 }
242
243 let hash_diff = (self.hash as i64 - other.hash as i64).abs() as f64;
245 let max_hash = u64::MAX as f64;
246
247 (hash_diff / max_hash) > threshold
248 }
249}
250
251#[derive(Debug, Clone, Default, Serialize, Deserialize)]
253pub struct StreamStats {
254 pub frames_received: u64,
256
257 pub frames_processed: u64,
259
260 pub frames_skipped: u64,
262
263 pub total_processing_time: Duration,
265
266 pub avg_processing_time: Duration,
268
269 pub current_fps: f64,
271
272 pub buffer_overflows: u64,
274
275 pub processing_errors: u64,
277}
278
279impl StreamStats {
280 fn update_avg_processing_time(&mut self, new_time: Duration) {
282 let total_ms = self.total_processing_time.as_millis() as u64 + new_time.as_millis() as u64;
283 self.total_processing_time = Duration::from_millis(total_ms);
284
285 if self.frames_processed > 0 {
286 self.avg_processing_time = self.total_processing_time / self.frames_processed as u32;
287 }
288 }
289}
290
291pub struct StreamProcessor<P: VisionProvider> {
293 provider: Arc<P>,
295
296 config: StreamConfig,
298
299 buffer: Arc<Mutex<VecDeque<BufferedFrame>>>,
301
302 stats: Arc<Mutex<StreamStats>>,
304
305 last_processed: Arc<Mutex<Option<BufferedFrame>>>,
307
308 last_process_time: Arc<Mutex<Option<Instant>>>,
310
311 smoothing_buffer: Arc<Mutex<VecDeque<OcrResult>>>,
313
314 frame_counter: Arc<Mutex<u64>>,
316}
317
318impl<P: VisionProvider> StreamProcessor<P> {
319 pub fn new(provider: P, config: StreamConfig) -> Self {
321 Self {
322 provider: Arc::new(provider),
323 config,
324 buffer: Arc::new(Mutex::new(VecDeque::new())),
325 stats: Arc::new(Mutex::new(StreamStats::default())),
326 last_processed: Arc::new(Mutex::new(None)),
327 last_process_time: Arc::new(Mutex::new(None)),
328 smoothing_buffer: Arc::new(Mutex::new(VecDeque::new())),
329 frame_counter: Arc::new(Mutex::new(0)),
330 }
331 }
332
333 pub async fn process_frame(&self, frame_data: &[u8]) -> Result<Option<OcrResult>> {
335 let frame_number = {
337 let mut counter = self.frame_counter.lock().unwrap();
338 *counter += 1;
339 *counter
340 };
341
342 {
344 let mut stats = self.stats.lock().unwrap();
345 stats.frames_received += 1;
346 }
347
348 let frame = BufferedFrame::new(frame_data.to_vec(), frame_number);
350
351 if !self.should_process_frame(&frame)? {
353 let mut stats = self.stats.lock().unwrap();
354 stats.frames_skipped += 1;
355 return Ok(None);
356 }
357
358 self.add_to_buffer(frame.clone())?;
360
361 let start_time = Instant::now();
363
364 match self.provider.process_image(&frame.data).await {
365 Ok(mut result) => {
366 let processing_time = start_time.elapsed();
367
368 {
370 let mut stats = self.stats.lock().unwrap();
371 stats.frames_processed += 1;
372 stats.update_avg_processing_time(processing_time);
373
374 if let Some(last_time) = *self.last_process_time.lock().unwrap() {
376 let elapsed = start_time.duration_since(last_time);
377 if elapsed.as_secs_f64() > 0.0 {
378 stats.current_fps = 1.0 / elapsed.as_secs_f64();
379 }
380 }
381 }
382
383 *self.last_process_time.lock().unwrap() = Some(start_time);
385
386 *self.last_processed.lock().unwrap() = Some(frame);
388
389 if self.config.enable_smoothing {
391 result = self.apply_smoothing(result)?;
392 }
393
394 Ok(Some(result))
395 }
396 Err(e) => {
397 let mut stats = self.stats.lock().unwrap();
398 stats.processing_errors += 1;
399 Err(e)
400 }
401 }
402 }
403
404 fn should_process_frame(&self, frame: &BufferedFrame) -> Result<bool> {
406 match self.config.sampling_strategy {
407 SamplingStrategy::All => Ok(true),
408
409 SamplingStrategy::EveryNth(n) => {
410 Ok(frame.metadata.frame_number.is_multiple_of(n as u64))
411 }
412
413 SamplingStrategy::TimeInterval(interval_ms) => {
414 if let Some(last_time) = *self.last_process_time.lock().unwrap() {
415 let elapsed = last_time.elapsed();
416 Ok(elapsed.as_millis() >= interval_ms as u128)
417 } else {
418 Ok(true) }
420 }
421
422 SamplingStrategy::ChangeDetection => {
423 if let Some(last_frame) = self.last_processed.lock().unwrap().as_ref() {
424 Ok(frame.is_different_from(last_frame, self.config.change_threshold))
425 } else {
426 Ok(true) }
428 }
429
430 SamplingStrategy::Adaptive => {
431 let stats = self.stats.lock().unwrap();
433
434 if stats.avg_processing_time.as_secs_f64() > 0.0 {
435 let target_interval = 1.0 / self.config.max_fps;
436 let can_process = stats.avg_processing_time.as_secs_f64() <= target_interval;
437 Ok(can_process)
438 } else {
439 Ok(true)
440 }
441 }
442 }
443 }
444
445 fn add_to_buffer(&self, frame: BufferedFrame) -> Result<()> {
447 let mut buffer = self.buffer.lock().unwrap();
448
449 if buffer.len() >= self.config.buffer_size {
450 buffer.pop_front();
452
453 let mut stats = self.stats.lock().unwrap();
454 stats.buffer_overflows += 1;
455 }
456
457 buffer.push_back(frame);
458 Ok(())
459 }
460
461 fn apply_smoothing(&self, result: OcrResult) -> Result<OcrResult> {
463 let mut smoothing_buffer = self.smoothing_buffer.lock().unwrap();
464
465 smoothing_buffer.push_back(result.clone());
466
467 if smoothing_buffer.len() > self.config.smoothing_window {
468 smoothing_buffer.pop_front();
469 }
470
471 Ok(result)
477 }
478
479 pub fn get_stats(&self) -> StreamStats {
481 self.stats.lock().unwrap().clone()
482 }
483
484 pub fn reset_stats(&self) {
486 let mut stats = self.stats.lock().unwrap();
487 *stats = StreamStats::default();
488 }
489
490 pub fn buffer_size(&self) -> usize {
492 self.buffer.lock().unwrap().len()
493 }
494
495 pub fn clear_buffer(&self) {
497 self.buffer.lock().unwrap().clear();
498 }
499
500 pub fn config(&self) -> &StreamConfig {
502 &self.config
503 }
504
505 pub fn provider(&self) -> &P {
507 &self.provider
508 }
509}
510
511pub struct AsyncFrameStream<P: VisionProvider> {
513 processor: Arc<StreamProcessor<P>>,
515}
516
517impl<P: VisionProvider> AsyncFrameStream<P> {
518 pub fn new(provider: P, config: StreamConfig) -> Self {
520 Self {
521 processor: Arc::new(StreamProcessor::new(provider, config)),
522 }
523 }
524
525 pub async fn process_batch(&self, frames: Vec<Vec<u8>>) -> Result<Vec<Option<OcrResult>>> {
527 let mut results = Vec::new();
528
529 for frame in frames {
530 let result = self.processor.process_frame(&frame).await?;
531 results.push(result);
532 }
533
534 Ok(results)
535 }
536
537 pub fn stats(&self) -> StreamStats {
539 self.processor.get_stats()
540 }
541
542 pub fn processor(&self) -> &StreamProcessor<P> {
544 &self.processor
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551 use crate::providers::MockVisionProvider;
552
553 #[test]
554 fn test_stream_config_default() {
555 let config = StreamConfig::default();
556 assert_eq!(config.max_fps, 10.0);
557 assert_eq!(config.buffer_size, 30);
558 assert!(config.enable_smoothing);
559 }
560
561 #[test]
562 fn test_stream_config_builder() {
563 let config = StreamConfig::new()
564 .with_max_fps(30.0)
565 .with_buffer_size(50)
566 .with_smoothing(false)
567 .with_change_threshold(0.2);
568
569 assert_eq!(config.max_fps, 30.0);
570 assert_eq!(config.buffer_size, 50);
571 assert!(!config.enable_smoothing);
572 assert_eq!(config.change_threshold, 0.2);
573 }
574
575 #[test]
576 fn test_sampling_strategy() {
577 assert_eq!(
578 SamplingStrategy::default(),
579 SamplingStrategy::TimeInterval(100)
580 );
581 }
582
583 #[tokio::test]
584 async fn test_stream_processor_creation() {
585 let provider = MockVisionProvider::new();
586 let config = StreamConfig::default();
587 let processor = StreamProcessor::new(provider, config);
588
589 assert_eq!(processor.buffer_size(), 0);
590 let stats = processor.get_stats();
591 assert_eq!(stats.frames_received, 0);
592 }
593
594 #[tokio::test]
595 async fn test_process_single_frame() {
596 let provider = MockVisionProvider::new();
597 let config = StreamConfig::default();
598 let processor = StreamProcessor::new(provider, config);
599
600 let frame = vec![0u8; 100];
601 let result = processor.process_frame(&frame).await;
602
603 assert!(result.is_ok());
604
605 let stats = processor.get_stats();
606 assert_eq!(stats.frames_received, 1);
607 }
608
609 #[tokio::test]
610 async fn test_process_multiple_frames() {
611 let provider = MockVisionProvider::new();
612 let config = StreamConfig::default();
613 let processor = StreamProcessor::new(provider, config);
614
615 for i in 0..10 {
616 let frame = vec![i as u8; 100];
617 let _result = processor.process_frame(&frame).await;
618 }
619
620 let stats = processor.get_stats();
621 assert_eq!(stats.frames_received, 10);
622 }
623
624 #[tokio::test]
625 async fn test_buffer_overflow() {
626 let provider = MockVisionProvider::new();
627 let config = StreamConfig::default()
628 .with_buffer_size(5)
629 .with_sampling_strategy(SamplingStrategy::All); let processor = StreamProcessor::new(provider, config);
631
632 for i in 0..10 {
634 let frame = vec![i as u8; 100];
635 let _result = processor.process_frame(&frame).await;
636 }
637
638 let stats = processor.get_stats();
639 assert!(stats.buffer_overflows > 0);
640 assert_eq!(processor.buffer_size(), 5);
641 }
642
643 #[tokio::test]
644 async fn test_sampling_every_nth() {
645 let provider = MockVisionProvider::new();
646 let config = StreamConfig::default().with_sampling_strategy(SamplingStrategy::EveryNth(2));
647 let processor = StreamProcessor::new(provider, config);
648
649 for i in 0..10 {
650 let frame = vec![i as u8; 100];
651 let _result = processor.process_frame(&frame).await;
652 }
653
654 let stats = processor.get_stats();
655 assert_eq!(stats.frames_received, 10);
656 assert!(stats.frames_processed <= 5); }
658
659 #[tokio::test]
660 async fn test_sampling_all() {
661 let provider = MockVisionProvider::new();
662 let config = StreamConfig::default().with_sampling_strategy(SamplingStrategy::All);
663 let processor = StreamProcessor::new(provider, config);
664
665 for i in 0..5 {
666 let frame = vec![i as u8; 100];
667 let _result = processor.process_frame(&frame).await;
668 }
669
670 let stats = processor.get_stats();
671 assert_eq!(stats.frames_received, 5);
672 assert_eq!(stats.frames_processed, 5);
673 }
674
675 #[tokio::test]
676 async fn test_change_detection() {
677 let provider = MockVisionProvider::new();
678 let config = StreamConfig::default()
679 .with_sampling_strategy(SamplingStrategy::ChangeDetection)
680 .with_change_threshold(0.1);
681 let processor = StreamProcessor::new(provider, config);
682
683 let frame = vec![42u8; 100];
685 for _ in 0..5 {
686 let _result = processor.process_frame(&frame).await;
687 }
688
689 let stats = processor.get_stats();
690 assert!(stats.frames_skipped > 0);
692 }
693
694 #[tokio::test]
695 async fn test_stats_reset() {
696 let provider = MockVisionProvider::new();
697 let config = StreamConfig::default();
698 let processor = StreamProcessor::new(provider, config);
699
700 let frame = vec![0u8; 100];
701 let _result = processor.process_frame(&frame).await;
702
703 processor.reset_stats();
704 let stats = processor.get_stats();
705 assert_eq!(stats.frames_received, 0);
706 }
707
708 #[tokio::test]
709 async fn test_clear_buffer() {
710 let provider = MockVisionProvider::new();
711 let config = StreamConfig::default();
712 let processor = StreamProcessor::new(provider, config);
713
714 for i in 0..5 {
715 let frame = vec![i as u8; 100];
716 let _result = processor.process_frame(&frame).await;
717 }
718
719 assert!(processor.buffer_size() > 0);
720 processor.clear_buffer();
721 assert_eq!(processor.buffer_size(), 0);
722 }
723
724 #[tokio::test]
725 async fn test_async_frame_stream() {
726 let provider = MockVisionProvider::new();
727 let config = StreamConfig::default();
728 let stream = AsyncFrameStream::new(provider, config);
729
730 let frames = vec![vec![1u8; 100], vec![2u8; 100], vec![3u8; 100]];
731
732 let results = stream.process_batch(frames).await.unwrap();
733 assert_eq!(results.len(), 3);
734
735 let stats = stream.stats();
736 assert_eq!(stats.frames_received, 3);
737 }
738
739 #[tokio::test]
740 async fn test_buffered_frame_hash() {
741 let frame1 = BufferedFrame::new(vec![1, 2, 3, 4, 5], 1);
742 let frame2 = BufferedFrame::new(vec![1, 2, 3, 4, 5], 2);
743 let frame3 = BufferedFrame::new(vec![5, 4, 3, 2, 1], 3);
744
745 assert_eq!(frame1.hash, frame2.hash);
747
748 assert_ne!(frame1.hash, frame3.hash);
750 }
751
752 #[tokio::test]
753 async fn test_buffered_frame_difference() {
754 let frame1 = BufferedFrame::new(vec![1u8; 100], 1);
755 let frame2 = BufferedFrame::new(vec![1u8; 100], 2);
756 let frame3 = BufferedFrame::new(vec![2u8; 100], 3);
757
758 assert!(!frame1.is_different_from(&frame2, 0.1));
760
761 assert!(frame1.is_different_from(&frame3, 0.1));
763 }
764
765 #[test]
766 fn test_frame_metadata() {
767 let metadata = FrameMetadata {
768 frame_number: 42,
769 timestamp: std::time::SystemTime::now(),
770 size: 1024,
771 dimensions: Some((640, 480)),
772 processed: true,
773 processing_time: Some(Duration::from_millis(50)),
774 };
775
776 assert_eq!(metadata.frame_number, 42);
777 assert_eq!(metadata.size, 1024);
778 assert_eq!(metadata.dimensions, Some((640, 480)));
779 assert!(metadata.processed);
780 }
781
782 #[test]
783 fn test_stream_stats_update() {
784 let mut stats = StreamStats::default();
785
786 stats.update_avg_processing_time(Duration::from_millis(100));
787 stats.frames_processed = 1;
788
789 assert_eq!(stats.total_processing_time.as_millis(), 100);
790 }
791}