oxify_connect_vision/
streaming.rs

1//! Streaming Processing for Video Frames
2//!
3//! This module provides real-time OCR processing capabilities for video streams
4//! and frame sequences. It supports frame buffering, rate control, and async
5//! processing for efficient video analysis.
6//!
7//! # Features
8//!
9//! - Real-time video frame processing
10//! - Configurable frame buffering and rate limiting
11//! - Async stream processing with backpressure
12//! - Frame skipping and sampling strategies
13//! - Temporal smoothing for text stabilization
14//! - Change detection to avoid redundant processing
15//! - Performance metrics and statistics
16//!
17//! # Example
18//!
19//! ```rust,ignore
20//! use oxify_connect_vision::streaming::{StreamProcessor, StreamConfig};
21//!
22//! let config = StreamConfig::default()
23//!     .with_max_fps(30.0)
24//!     .with_buffer_size(10);
25//!
26//! let mut processor = StreamProcessor::new(provider, config);
27//!
28//! // Process frames from a video stream
29//! for frame in video_frames {
30//!     if let Some(result) = processor.process_frame(&frame).await? {
31//!         println!("Text: {}", result.text);
32//!     }
33//! }
34//! ```
35
36use crate::errors::Result;
37use crate::providers::VisionProvider;
38use crate::types::OcrResult;
39use serde::{Deserialize, Serialize};
40use std::collections::VecDeque;
41use std::sync::{Arc, Mutex};
42use std::time::{Duration, Instant};
43use thiserror::Error;
44
45/// Streaming errors
46#[derive(Debug, Error)]
47pub enum StreamError {
48    #[error("Buffer overflow: {0}")]
49    BufferOverflow(String),
50
51    #[error("Invalid frame rate: {0}")]
52    InvalidFrameRate(String),
53
54    #[error("Processing failed: {0}")]
55    ProcessingFailed(String),
56
57    #[error("Stream closed")]
58    StreamClosed,
59}
60
61/// Frame sampling strategy
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
63pub enum SamplingStrategy {
64    /// Process every frame
65    All,
66
67    /// Process every Nth frame
68    EveryNth(u32),
69
70    /// Process based on time interval (milliseconds)
71    TimeInterval(u64),
72
73    /// Process only when significant changes detected
74    ChangeDetection,
75
76    /// Adaptive based on processing speed
77    Adaptive,
78}
79
80impl Default for SamplingStrategy {
81    fn default() -> Self {
82        Self::TimeInterval(100) // 10 FPS default
83    }
84}
85
86/// Stream processing configuration
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct StreamConfig {
89    /// Maximum frames per second to process
90    pub max_fps: f64,
91
92    /// Frame buffer size
93    pub buffer_size: usize,
94
95    /// Sampling strategy
96    pub sampling_strategy: SamplingStrategy,
97
98    /// Enable temporal smoothing
99    pub enable_smoothing: bool,
100
101    /// Smoothing window size (frames)
102    pub smoothing_window: usize,
103
104    /// Change detection threshold (0.0 to 1.0)
105    pub change_threshold: f64,
106
107    /// Enable frame preprocessing
108    pub enable_preprocessing: bool,
109
110    /// Timeout for processing a single frame (milliseconds)
111    pub frame_timeout_ms: u64,
112}
113
114impl Default for StreamConfig {
115    fn default() -> Self {
116        Self {
117            max_fps: 10.0,
118            buffer_size: 30,
119            sampling_strategy: SamplingStrategy::default(),
120            enable_smoothing: true,
121            smoothing_window: 3,
122            change_threshold: 0.15,
123            enable_preprocessing: true,
124            frame_timeout_ms: 5000,
125        }
126    }
127}
128
129impl StreamConfig {
130    /// Create a new configuration
131    pub fn new() -> Self {
132        Self::default()
133    }
134
135    /// Set maximum FPS
136    pub fn with_max_fps(mut self, fps: f64) -> Self {
137        self.max_fps = fps.max(0.1);
138        self
139    }
140
141    /// Set buffer size
142    pub fn with_buffer_size(mut self, size: usize) -> Self {
143        self.buffer_size = size.max(1);
144        self
145    }
146
147    /// Set sampling strategy
148    pub fn with_sampling_strategy(mut self, strategy: SamplingStrategy) -> Self {
149        self.sampling_strategy = strategy;
150        self
151    }
152
153    /// Enable/disable smoothing
154    pub fn with_smoothing(mut self, enabled: bool) -> Self {
155        self.enable_smoothing = enabled;
156        self
157    }
158
159    /// Set change detection threshold
160    pub fn with_change_threshold(mut self, threshold: f64) -> Self {
161        self.change_threshold = threshold.clamp(0.0, 1.0);
162        self
163    }
164}
165
166/// Frame metadata
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct FrameMetadata {
169    /// Frame number in sequence
170    pub frame_number: u64,
171
172    /// Timestamp when frame was received
173    pub timestamp: std::time::SystemTime,
174
175    /// Frame size in bytes
176    pub size: usize,
177
178    /// Frame dimensions (width, height)
179    pub dimensions: Option<(u32, u32)>,
180
181    /// Whether this frame was processed or skipped
182    pub processed: bool,
183
184    /// Processing duration (if processed)
185    pub processing_time: Option<Duration>,
186}
187
188/// Buffered frame
189#[derive(Debug, Clone)]
190struct BufferedFrame {
191    /// Frame data
192    data: Vec<u8>,
193
194    /// Frame metadata
195    metadata: FrameMetadata,
196
197    /// Hash for change detection
198    hash: u64,
199}
200
201impl BufferedFrame {
202    /// Create a new buffered frame
203    fn new(data: Vec<u8>, frame_number: u64) -> Self {
204        let hash = Self::compute_hash(&data);
205        let size = data.len();
206
207        Self {
208            data,
209            metadata: FrameMetadata {
210                frame_number,
211                timestamp: std::time::SystemTime::now(),
212                size,
213                dimensions: None,
214                processed: false,
215                processing_time: None,
216            },
217            hash,
218        }
219    }
220
221    /// Compute a simple hash for change detection
222    fn compute_hash(data: &[u8]) -> u64 {
223        use std::collections::hash_map::DefaultHasher;
224        use std::hash::{Hash, Hasher};
225
226        let mut hasher = DefaultHasher::new();
227
228        // Sample bytes for faster hashing
229        let step = (data.len() / 100).max(1);
230        for i in (0..data.len()).step_by(step) {
231            data[i].hash(&mut hasher);
232        }
233
234        hasher.finish()
235    }
236
237    /// Check if frame is significantly different from another
238    fn is_different_from(&self, other: &BufferedFrame, threshold: f64) -> bool {
239        if self.data.len() != other.data.len() {
240            return true;
241        }
242
243        // Simple hash-based comparison
244        let hash_diff = (self.hash as i64 - other.hash as i64).abs() as f64;
245        let max_hash = u64::MAX as f64;
246
247        (hash_diff / max_hash) > threshold
248    }
249}
250
251/// Stream processing statistics
252#[derive(Debug, Clone, Default, Serialize, Deserialize)]
253pub struct StreamStats {
254    /// Total frames received
255    pub frames_received: u64,
256
257    /// Frames processed
258    pub frames_processed: u64,
259
260    /// Frames skipped
261    pub frames_skipped: u64,
262
263    /// Total processing time
264    pub total_processing_time: Duration,
265
266    /// Average processing time per frame
267    pub avg_processing_time: Duration,
268
269    /// Current FPS
270    pub current_fps: f64,
271
272    /// Buffer overflows
273    pub buffer_overflows: u64,
274
275    /// Processing errors
276    pub processing_errors: u64,
277}
278
279impl StreamStats {
280    /// Update average processing time
281    fn update_avg_processing_time(&mut self, new_time: Duration) {
282        let total_ms = self.total_processing_time.as_millis() as u64 + new_time.as_millis() as u64;
283        self.total_processing_time = Duration::from_millis(total_ms);
284
285        if self.frames_processed > 0 {
286            self.avg_processing_time = self.total_processing_time / self.frames_processed as u32;
287        }
288    }
289}
290
291/// Stream processor for real-time OCR
292pub struct StreamProcessor<P: VisionProvider> {
293    /// OCR provider
294    provider: Arc<P>,
295
296    /// Configuration
297    config: StreamConfig,
298
299    /// Frame buffer
300    buffer: Arc<Mutex<VecDeque<BufferedFrame>>>,
301
302    /// Processing statistics
303    stats: Arc<Mutex<StreamStats>>,
304
305    /// Last processed frame
306    last_processed: Arc<Mutex<Option<BufferedFrame>>>,
307
308    /// Last processing time
309    last_process_time: Arc<Mutex<Option<Instant>>>,
310
311    /// Result smoothing buffer
312    smoothing_buffer: Arc<Mutex<VecDeque<OcrResult>>>,
313
314    /// Frame counter
315    frame_counter: Arc<Mutex<u64>>,
316}
317
318impl<P: VisionProvider> StreamProcessor<P> {
319    /// Create a new stream processor
320    pub fn new(provider: P, config: StreamConfig) -> Self {
321        Self {
322            provider: Arc::new(provider),
323            config,
324            buffer: Arc::new(Mutex::new(VecDeque::new())),
325            stats: Arc::new(Mutex::new(StreamStats::default())),
326            last_processed: Arc::new(Mutex::new(None)),
327            last_process_time: Arc::new(Mutex::new(None)),
328            smoothing_buffer: Arc::new(Mutex::new(VecDeque::new())),
329            frame_counter: Arc::new(Mutex::new(0)),
330        }
331    }
332
333    /// Process a single frame
334    pub async fn process_frame(&self, frame_data: &[u8]) -> Result<Option<OcrResult>> {
335        // Update frame counter
336        let frame_number = {
337            let mut counter = self.frame_counter.lock().unwrap();
338            *counter += 1;
339            *counter
340        };
341
342        // Update stats
343        {
344            let mut stats = self.stats.lock().unwrap();
345            stats.frames_received += 1;
346        }
347
348        // Create buffered frame
349        let frame = BufferedFrame::new(frame_data.to_vec(), frame_number);
350
351        // Check if we should process this frame
352        if !self.should_process_frame(&frame)? {
353            let mut stats = self.stats.lock().unwrap();
354            stats.frames_skipped += 1;
355            return Ok(None);
356        }
357
358        // Add to buffer
359        self.add_to_buffer(frame.clone())?;
360
361        // Process the frame
362        let start_time = Instant::now();
363
364        match self.provider.process_image(&frame.data).await {
365            Ok(mut result) => {
366                let processing_time = start_time.elapsed();
367
368                // Update stats
369                {
370                    let mut stats = self.stats.lock().unwrap();
371                    stats.frames_processed += 1;
372                    stats.update_avg_processing_time(processing_time);
373
374                    // Calculate current FPS
375                    if let Some(last_time) = *self.last_process_time.lock().unwrap() {
376                        let elapsed = start_time.duration_since(last_time);
377                        if elapsed.as_secs_f64() > 0.0 {
378                            stats.current_fps = 1.0 / elapsed.as_secs_f64();
379                        }
380                    }
381                }
382
383                // Update last process time
384                *self.last_process_time.lock().unwrap() = Some(start_time);
385
386                // Update last processed frame
387                *self.last_processed.lock().unwrap() = Some(frame);
388
389                // Apply smoothing if enabled
390                if self.config.enable_smoothing {
391                    result = self.apply_smoothing(result)?;
392                }
393
394                Ok(Some(result))
395            }
396            Err(e) => {
397                let mut stats = self.stats.lock().unwrap();
398                stats.processing_errors += 1;
399                Err(e)
400            }
401        }
402    }
403
404    /// Check if frame should be processed based on sampling strategy
405    fn should_process_frame(&self, frame: &BufferedFrame) -> Result<bool> {
406        match self.config.sampling_strategy {
407            SamplingStrategy::All => Ok(true),
408
409            SamplingStrategy::EveryNth(n) => {
410                Ok(frame.metadata.frame_number.is_multiple_of(n as u64))
411            }
412
413            SamplingStrategy::TimeInterval(interval_ms) => {
414                if let Some(last_time) = *self.last_process_time.lock().unwrap() {
415                    let elapsed = last_time.elapsed();
416                    Ok(elapsed.as_millis() >= interval_ms as u128)
417                } else {
418                    Ok(true) // Process first frame
419                }
420            }
421
422            SamplingStrategy::ChangeDetection => {
423                if let Some(last_frame) = self.last_processed.lock().unwrap().as_ref() {
424                    Ok(frame.is_different_from(last_frame, self.config.change_threshold))
425                } else {
426                    Ok(true) // Process first frame
427                }
428            }
429
430            SamplingStrategy::Adaptive => {
431                // Adapt based on current processing speed
432                let stats = self.stats.lock().unwrap();
433
434                if stats.avg_processing_time.as_secs_f64() > 0.0 {
435                    let target_interval = 1.0 / self.config.max_fps;
436                    let can_process = stats.avg_processing_time.as_secs_f64() <= target_interval;
437                    Ok(can_process)
438                } else {
439                    Ok(true)
440                }
441            }
442        }
443    }
444
445    /// Add frame to buffer
446    fn add_to_buffer(&self, frame: BufferedFrame) -> Result<()> {
447        let mut buffer = self.buffer.lock().unwrap();
448
449        if buffer.len() >= self.config.buffer_size {
450            // Buffer is full, remove oldest frame
451            buffer.pop_front();
452
453            let mut stats = self.stats.lock().unwrap();
454            stats.buffer_overflows += 1;
455        }
456
457        buffer.push_back(frame);
458        Ok(())
459    }
460
461    /// Apply temporal smoothing to results
462    fn apply_smoothing(&self, result: OcrResult) -> Result<OcrResult> {
463        let mut smoothing_buffer = self.smoothing_buffer.lock().unwrap();
464
465        smoothing_buffer.push_back(result.clone());
466
467        if smoothing_buffer.len() > self.config.smoothing_window {
468            smoothing_buffer.pop_front();
469        }
470
471        // For now, just return the latest result
472        // In a more advanced implementation, we could:
473        // - Merge text from multiple frames
474        // - Average confidence scores
475        // - Use majority voting for text detection
476        Ok(result)
477    }
478
479    /// Get current statistics
480    pub fn get_stats(&self) -> StreamStats {
481        self.stats.lock().unwrap().clone()
482    }
483
484    /// Reset statistics
485    pub fn reset_stats(&self) {
486        let mut stats = self.stats.lock().unwrap();
487        *stats = StreamStats::default();
488    }
489
490    /// Get buffer size
491    pub fn buffer_size(&self) -> usize {
492        self.buffer.lock().unwrap().len()
493    }
494
495    /// Clear buffer
496    pub fn clear_buffer(&self) {
497        self.buffer.lock().unwrap().clear();
498    }
499
500    /// Get configuration
501    pub fn config(&self) -> &StreamConfig {
502        &self.config
503    }
504
505    /// Get provider reference
506    pub fn provider(&self) -> &P {
507        &self.provider
508    }
509}
510
511/// Async frame stream processor
512pub struct AsyncFrameStream<P: VisionProvider> {
513    /// Stream processor
514    processor: Arc<StreamProcessor<P>>,
515}
516
517impl<P: VisionProvider> AsyncFrameStream<P> {
518    /// Create a new async frame stream
519    pub fn new(provider: P, config: StreamConfig) -> Self {
520        Self {
521            processor: Arc::new(StreamProcessor::new(provider, config)),
522        }
523    }
524
525    /// Process a batch of frames
526    pub async fn process_batch(&self, frames: Vec<Vec<u8>>) -> Result<Vec<Option<OcrResult>>> {
527        let mut results = Vec::new();
528
529        for frame in frames {
530            let result = self.processor.process_frame(&frame).await?;
531            results.push(result);
532        }
533
534        Ok(results)
535    }
536
537    /// Get processor statistics
538    pub fn stats(&self) -> StreamStats {
539        self.processor.get_stats()
540    }
541
542    /// Get processor
543    pub fn processor(&self) -> &StreamProcessor<P> {
544        &self.processor
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551    use crate::providers::MockVisionProvider;
552
553    #[test]
554    fn test_stream_config_default() {
555        let config = StreamConfig::default();
556        assert_eq!(config.max_fps, 10.0);
557        assert_eq!(config.buffer_size, 30);
558        assert!(config.enable_smoothing);
559    }
560
561    #[test]
562    fn test_stream_config_builder() {
563        let config = StreamConfig::new()
564            .with_max_fps(30.0)
565            .with_buffer_size(50)
566            .with_smoothing(false)
567            .with_change_threshold(0.2);
568
569        assert_eq!(config.max_fps, 30.0);
570        assert_eq!(config.buffer_size, 50);
571        assert!(!config.enable_smoothing);
572        assert_eq!(config.change_threshold, 0.2);
573    }
574
575    #[test]
576    fn test_sampling_strategy() {
577        assert_eq!(
578            SamplingStrategy::default(),
579            SamplingStrategy::TimeInterval(100)
580        );
581    }
582
583    #[tokio::test]
584    async fn test_stream_processor_creation() {
585        let provider = MockVisionProvider::new();
586        let config = StreamConfig::default();
587        let processor = StreamProcessor::new(provider, config);
588
589        assert_eq!(processor.buffer_size(), 0);
590        let stats = processor.get_stats();
591        assert_eq!(stats.frames_received, 0);
592    }
593
594    #[tokio::test]
595    async fn test_process_single_frame() {
596        let provider = MockVisionProvider::new();
597        let config = StreamConfig::default();
598        let processor = StreamProcessor::new(provider, config);
599
600        let frame = vec![0u8; 100];
601        let result = processor.process_frame(&frame).await;
602
603        assert!(result.is_ok());
604
605        let stats = processor.get_stats();
606        assert_eq!(stats.frames_received, 1);
607    }
608
609    #[tokio::test]
610    async fn test_process_multiple_frames() {
611        let provider = MockVisionProvider::new();
612        let config = StreamConfig::default();
613        let processor = StreamProcessor::new(provider, config);
614
615        for i in 0..10 {
616            let frame = vec![i as u8; 100];
617            let _result = processor.process_frame(&frame).await;
618        }
619
620        let stats = processor.get_stats();
621        assert_eq!(stats.frames_received, 10);
622    }
623
624    #[tokio::test]
625    async fn test_buffer_overflow() {
626        let provider = MockVisionProvider::new();
627        let config = StreamConfig::default()
628            .with_buffer_size(5)
629            .with_sampling_strategy(SamplingStrategy::All); // Process all frames
630        let processor = StreamProcessor::new(provider, config);
631
632        // Process more frames than buffer size
633        for i in 0..10 {
634            let frame = vec![i as u8; 100];
635            let _result = processor.process_frame(&frame).await;
636        }
637
638        let stats = processor.get_stats();
639        assert!(stats.buffer_overflows > 0);
640        assert_eq!(processor.buffer_size(), 5);
641    }
642
643    #[tokio::test]
644    async fn test_sampling_every_nth() {
645        let provider = MockVisionProvider::new();
646        let config = StreamConfig::default().with_sampling_strategy(SamplingStrategy::EveryNth(2));
647        let processor = StreamProcessor::new(provider, config);
648
649        for i in 0..10 {
650            let frame = vec![i as u8; 100];
651            let _result = processor.process_frame(&frame).await;
652        }
653
654        let stats = processor.get_stats();
655        assert_eq!(stats.frames_received, 10);
656        assert!(stats.frames_processed <= 5); // Should process ~half
657    }
658
659    #[tokio::test]
660    async fn test_sampling_all() {
661        let provider = MockVisionProvider::new();
662        let config = StreamConfig::default().with_sampling_strategy(SamplingStrategy::All);
663        let processor = StreamProcessor::new(provider, config);
664
665        for i in 0..5 {
666            let frame = vec![i as u8; 100];
667            let _result = processor.process_frame(&frame).await;
668        }
669
670        let stats = processor.get_stats();
671        assert_eq!(stats.frames_received, 5);
672        assert_eq!(stats.frames_processed, 5);
673    }
674
675    #[tokio::test]
676    async fn test_change_detection() {
677        let provider = MockVisionProvider::new();
678        let config = StreamConfig::default()
679            .with_sampling_strategy(SamplingStrategy::ChangeDetection)
680            .with_change_threshold(0.1);
681        let processor = StreamProcessor::new(provider, config);
682
683        // Same frame multiple times
684        let frame = vec![42u8; 100];
685        for _ in 0..5 {
686            let _result = processor.process_frame(&frame).await;
687        }
688
689        let stats = processor.get_stats();
690        // Should skip similar frames after the first one
691        assert!(stats.frames_skipped > 0);
692    }
693
694    #[tokio::test]
695    async fn test_stats_reset() {
696        let provider = MockVisionProvider::new();
697        let config = StreamConfig::default();
698        let processor = StreamProcessor::new(provider, config);
699
700        let frame = vec![0u8; 100];
701        let _result = processor.process_frame(&frame).await;
702
703        processor.reset_stats();
704        let stats = processor.get_stats();
705        assert_eq!(stats.frames_received, 0);
706    }
707
708    #[tokio::test]
709    async fn test_clear_buffer() {
710        let provider = MockVisionProvider::new();
711        let config = StreamConfig::default();
712        let processor = StreamProcessor::new(provider, config);
713
714        for i in 0..5 {
715            let frame = vec![i as u8; 100];
716            let _result = processor.process_frame(&frame).await;
717        }
718
719        assert!(processor.buffer_size() > 0);
720        processor.clear_buffer();
721        assert_eq!(processor.buffer_size(), 0);
722    }
723
724    #[tokio::test]
725    async fn test_async_frame_stream() {
726        let provider = MockVisionProvider::new();
727        let config = StreamConfig::default();
728        let stream = AsyncFrameStream::new(provider, config);
729
730        let frames = vec![vec![1u8; 100], vec![2u8; 100], vec![3u8; 100]];
731
732        let results = stream.process_batch(frames).await.unwrap();
733        assert_eq!(results.len(), 3);
734
735        let stats = stream.stats();
736        assert_eq!(stats.frames_received, 3);
737    }
738
739    #[tokio::test]
740    async fn test_buffered_frame_hash() {
741        let frame1 = BufferedFrame::new(vec![1, 2, 3, 4, 5], 1);
742        let frame2 = BufferedFrame::new(vec![1, 2, 3, 4, 5], 2);
743        let frame3 = BufferedFrame::new(vec![5, 4, 3, 2, 1], 3);
744
745        // Same data should have same hash
746        assert_eq!(frame1.hash, frame2.hash);
747
748        // Different data should have different hash
749        assert_ne!(frame1.hash, frame3.hash);
750    }
751
752    #[tokio::test]
753    async fn test_buffered_frame_difference() {
754        let frame1 = BufferedFrame::new(vec![1u8; 100], 1);
755        let frame2 = BufferedFrame::new(vec![1u8; 100], 2);
756        let frame3 = BufferedFrame::new(vec![2u8; 100], 3);
757
758        // Same frames should not be different
759        assert!(!frame1.is_different_from(&frame2, 0.1));
760
761        // Different frames should be detected
762        assert!(frame1.is_different_from(&frame3, 0.1));
763    }
764
765    #[test]
766    fn test_frame_metadata() {
767        let metadata = FrameMetadata {
768            frame_number: 42,
769            timestamp: std::time::SystemTime::now(),
770            size: 1024,
771            dimensions: Some((640, 480)),
772            processed: true,
773            processing_time: Some(Duration::from_millis(50)),
774        };
775
776        assert_eq!(metadata.frame_number, 42);
777        assert_eq!(metadata.size, 1024);
778        assert_eq!(metadata.dimensions, Some((640, 480)));
779        assert!(metadata.processed);
780    }
781
782    #[test]
783    fn test_stream_stats_update() {
784        let mut stats = StreamStats::default();
785
786        stats.update_avg_processing_time(Duration::from_millis(100));
787        stats.frames_processed = 1;
788
789        assert_eq!(stats.total_processing_time.as_millis(), 100);
790    }
791}