Skip to main content

torsh_vision/
streaming.rs

1//! Real-Time Video Stream Processing
2//!
3//! This module provides efficient stream processing capabilities for real-time computer vision,
4//! integrated with scirs2-vision 0.1.5's streaming features.
5//!
6//! # Features
7//! - Video frame buffering and preprocessing pipelines
8//! - Real-time performance monitoring and adaptation
9//! - Async/parallel processing for low latency
10//! - GPU-accelerated stream processing
11//! - Frame dropping and quality adaptation strategies
12//!
13//! # Examples
14//!
15//! ```rust,ignore
16//! use torsh_vision::streaming::*;
17//!
18//! // Create a video stream processor
19//! let processor = StreamProcessor::new(config)?;
20//! processor.process_stream(video_source, |frame| {
21//!     // Process each frame
22//!     detect_objects(frame)
23//! })?;
24//! ```
25
26use crate::{Result, VisionError};
27use std::collections::VecDeque;
28use std::sync::{Arc, Mutex};
29use std::time::{Duration, Instant};
30use torsh_tensor::Tensor;
31
32/// Stream processing configuration
33#[derive(Debug, Clone)]
34pub struct StreamConfig {
35    /// Maximum frames to buffer
36    pub buffer_size: usize,
37    /// Target frames per second
38    pub target_fps: f32,
39    /// Enable frame dropping if processing too slow
40    pub enable_frame_drop: bool,
41    /// Enable GPU acceleration
42    pub use_gpu: bool,
43    /// Number of parallel processing threads
44    pub num_threads: usize,
45    /// Quality adaptation strategy
46    pub quality_adaptation: QualityAdaptation,
47}
48
49impl Default for StreamConfig {
50    fn default() -> Self {
51        Self {
52            buffer_size: 30,
53            target_fps: 30.0,
54            enable_frame_drop: true,
55            use_gpu: false,
56            num_threads: 4,
57            quality_adaptation: QualityAdaptation::None,
58        }
59    }
60}
61
62/// Quality adaptation strategies for maintaining real-time performance
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum QualityAdaptation {
65    /// No adaptation
66    None,
67    /// Reduce resolution when falling behind
68    ResolutionScaling,
69    /// Skip non-keyframe processing
70    KeyframeOnly,
71    /// Adaptive quality based on complexity
72    Dynamic,
73}
74
75/// Frame metadata for processing pipeline
76#[derive(Debug, Clone)]
77pub struct FrameMetadata {
78    /// Frame number in stream
79    pub frame_number: u64,
80    /// Timestamp when frame was captured
81    pub timestamp: Instant,
82    /// Frame width
83    pub width: usize,
84    /// Frame height
85    pub height: usize,
86    /// Is this a keyframe
87    pub is_keyframe: bool,
88    /// Processing priority (higher = more important)
89    pub priority: u8,
90}
91
92/// Frame with metadata
93#[derive(Debug, Clone)]
94pub struct Frame {
95    /// Frame data as tensor
96    pub data: Tensor,
97    /// Frame metadata
98    pub metadata: FrameMetadata,
99}
100
101/// Performance statistics for stream processing
102#[derive(Debug, Clone)]
103pub struct StreamStats {
104    /// Average processing time per frame (ms)
105    pub avg_processing_time: f32,
106    /// Current frames per second
107    pub current_fps: f32,
108    /// Number of dropped frames
109    pub dropped_frames: u64,
110    /// Total frames processed
111    pub total_frames: u64,
112    /// Buffer utilization (0.0 to 1.0)
113    pub buffer_utilization: f32,
114    /// Number of quality adaptations
115    pub num_adaptations: u64,
116}
117
118impl Default for StreamStats {
119    fn default() -> Self {
120        Self {
121            avg_processing_time: 0.0,
122            current_fps: 0.0,
123            dropped_frames: 0,
124            total_frames: 0,
125            buffer_utilization: 0.0,
126            num_adaptations: 0,
127        }
128    }
129}
130
131/// Real-time stream processor
132pub struct StreamProcessor {
133    config: StreamConfig,
134    stats: Arc<Mutex<StreamStats>>,
135    frame_buffer: Arc<Mutex<VecDeque<Frame>>>,
136    processing_times: Arc<Mutex<VecDeque<Duration>>>,
137}
138
139impl StreamProcessor {
140    /// Create a new stream processor
141    pub fn new(config: StreamConfig) -> Result<Self> {
142        let buffer_size = config.buffer_size;
143        Ok(Self {
144            config,
145            stats: Arc::new(Mutex::new(StreamStats::default())),
146            frame_buffer: Arc::new(Mutex::new(VecDeque::with_capacity(buffer_size))),
147            processing_times: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
148        })
149    }
150
151    /// Get current performance statistics
152    pub fn stats(&self) -> StreamStats {
153        self.stats.lock().map(|s| s.clone()).unwrap_or_default()
154    }
155
156    /// Reset statistics
157    pub fn reset_stats(&self) {
158        if let Ok(mut stats) = self.stats.lock() {
159            *stats = StreamStats::default();
160        }
161        if let Ok(mut times) = self.processing_times.lock() {
162            times.clear();
163        }
164    }
165
166    /// Add a frame to the processing buffer
167    pub fn push_frame(&self, frame: Frame) -> Result<()> {
168        let mut buffer = self.frame_buffer.lock().map_err(|_| {
169            VisionError::InvalidParameter("Failed to lock frame buffer".to_string())
170        })?;
171
172        // Check buffer capacity
173        if buffer.len() >= self.config.buffer_size {
174            if self.config.enable_frame_drop {
175                // Drop oldest non-keyframe
176                let mut dropped = false;
177                for i in 0..buffer.len() {
178                    if !buffer[i].metadata.is_keyframe {
179                        buffer.remove(i);
180                        dropped = true;
181
182                        // Update stats
183                        if let Ok(mut stats) = self.stats.lock() {
184                            stats.dropped_frames += 1;
185                        }
186                        break;
187                    }
188                }
189
190                if !dropped {
191                    // All are keyframes, drop oldest
192                    buffer.pop_front();
193                    if let Ok(mut stats) = self.stats.lock() {
194                        stats.dropped_frames += 1;
195                    }
196                }
197            } else {
198                return Err(VisionError::InvalidParameter(
199                    "Frame buffer full and dropping disabled".to_string(),
200                ));
201            }
202        }
203
204        buffer.push_back(frame);
205
206        // Update buffer utilization
207        if let Ok(mut stats) = self.stats.lock() {
208            stats.buffer_utilization = buffer.len() as f32 / self.config.buffer_size as f32;
209        }
210
211        Ok(())
212    }
213
214    /// Get next frame from buffer
215    pub fn pop_frame(&self) -> Result<Option<Frame>> {
216        let mut buffer = self.frame_buffer.lock().map_err(|_| {
217            VisionError::InvalidParameter("Failed to lock frame buffer".to_string())
218        })?;
219
220        Ok(buffer.pop_front())
221    }
222
223    /// Record processing time for a frame
224    pub fn record_processing_time(&self, duration: Duration) {
225        if let Ok(mut times) = self.processing_times.lock() {
226            times.push_back(duration);
227
228            // Keep only recent times (last 100 frames)
229            while times.len() > 100 {
230                times.pop_front();
231            }
232
233            // Update stats
234            if let Ok(mut stats) = self.stats.lock() {
235                // Calculate average processing time
236                let sum: Duration = times.iter().sum();
237                stats.avg_processing_time = sum.as_secs_f32() * 1000.0 / times.len() as f32;
238
239                // Calculate FPS
240                if stats.avg_processing_time > 0.0 {
241                    stats.current_fps = 1000.0 / stats.avg_processing_time;
242                }
243            }
244        }
245    }
246
247    /// Process a frame and update statistics
248    pub fn process_frame<F, T>(&self, frame: Frame, process_fn: F) -> Result<T>
249    where
250        F: FnOnce(&Frame) -> Result<T>,
251    {
252        let start = Instant::now();
253
254        // Apply quality adaptation if needed
255        let adapted_frame = self.adapt_frame_quality(&frame)?;
256
257        // Process the frame
258        let result = process_fn(&adapted_frame)?;
259
260        // Record processing time
261        let elapsed = start.elapsed();
262        self.record_processing_time(elapsed);
263
264        // Update total frames
265        if let Ok(mut stats) = self.stats.lock() {
266            stats.total_frames += 1;
267        }
268
269        Ok(result)
270    }
271
272    /// Adapt frame quality based on performance
273    fn adapt_frame_quality(&self, frame: &Frame) -> Result<Frame> {
274        match self.config.quality_adaptation {
275            QualityAdaptation::None => Ok(frame.clone()),
276
277            QualityAdaptation::ResolutionScaling => {
278                // Check if we're falling behind
279                let stats = self.stats();
280                if stats.current_fps < self.config.target_fps * 0.8 {
281                    // Reduce resolution by 25%
282                    let _new_width = (frame.metadata.width as f32 * 0.75) as usize;
283                    let _new_height = (frame.metadata.height as f32 * 0.75) as usize;
284
285                    // TODO: Implement actual downscaling
286                    // For now, just return original frame
287                    if let Ok(mut stats) = self.stats.lock() {
288                        stats.num_adaptations += 1;
289                    }
290                }
291                Ok(frame.clone())
292            }
293
294            QualityAdaptation::KeyframeOnly => {
295                // Skip non-keyframes if falling behind
296                let stats = self.stats();
297                if !frame.metadata.is_keyframe && stats.current_fps < self.config.target_fps * 0.9 {
298                    // Mark for skipping (caller should handle)
299                    if let Ok(mut stats_lock) = self.stats.lock() {
300                        stats_lock.num_adaptations += 1;
301                    }
302                }
303                Ok(frame.clone())
304            }
305
306            QualityAdaptation::Dynamic => {
307                // Combine strategies based on performance
308                let stats = self.stats();
309                let performance_ratio = stats.current_fps / self.config.target_fps;
310
311                if performance_ratio < 0.7 {
312                    // Severe degradation: reduce resolution
313                    // TODO: Implement downscaling
314                    if let Ok(mut stats_lock) = self.stats.lock() {
315                        stats_lock.num_adaptations += 1;
316                    }
317                } else if performance_ratio < 0.9 && !frame.metadata.is_keyframe {
318                    // Moderate degradation: skip non-keyframes
319                    if let Ok(mut stats_lock) = self.stats.lock() {
320                        stats_lock.num_adaptations += 1;
321                    }
322                }
323
324                Ok(frame.clone())
325            }
326        }
327    }
328
329    /// Check if processing is keeping up with target FPS
330    pub fn is_realtime(&self) -> bool {
331        let stats = self.stats();
332        stats.current_fps >= self.config.target_fps * 0.95
333    }
334
335    /// Get recommended adjustments for configuration
336    pub fn recommend_config_adjustments(&self) -> Vec<String> {
337        let stats = self.stats();
338        let mut recommendations = Vec::new();
339
340        // Check FPS performance
341        if stats.current_fps < self.config.target_fps * 0.8 {
342            recommendations
343                .push("Consider reducing target_fps or enabling quality adaptation".to_string());
344        }
345
346        // Check buffer utilization
347        if stats.buffer_utilization > 0.9 {
348            recommendations.push("Buffer is frequently full - consider increasing buffer_size or enabling frame dropping".to_string());
349        }
350
351        // Check dropped frames
352        if stats.total_frames > 0 {
353            let drop_rate = stats.dropped_frames as f32 / stats.total_frames as f32;
354            if drop_rate > 0.1 {
355                recommendations.push(format!(
356                    "High frame drop rate ({:.1}%) - consider reducing input rate or optimizing processing",
357                    drop_rate * 100.0
358                ));
359            }
360        }
361
362        recommendations
363    }
364}
365
366/// Frame preprocessor for common vision tasks
367pub struct FramePreprocessor {
368    /// Target size for resizing (width, height)
369    pub target_size: Option<(usize, usize)>,
370    /// Normalization mean values
371    pub normalize_mean: Option<Vec<f32>>,
372    /// Normalization std values
373    pub normalize_std: Option<Vec<f32>>,
374    /// Convert to grayscale
375    pub to_grayscale: bool,
376}
377
378impl Default for FramePreprocessor {
379    fn default() -> Self {
380        Self {
381            target_size: None,
382            normalize_mean: None,
383            normalize_std: None,
384            to_grayscale: false,
385        }
386    }
387}
388
389impl FramePreprocessor {
390    /// Create a new preprocessor with default settings
391    pub fn new() -> Self {
392        Self::default()
393    }
394
395    /// Set target resize dimensions
396    pub fn with_resize(mut self, width: usize, height: usize) -> Self {
397        self.target_size = Some((width, height));
398        self
399    }
400
401    /// Set normalization parameters
402    pub fn with_normalize(mut self, mean: Vec<f32>, std: Vec<f32>) -> Self {
403        self.normalize_mean = Some(mean);
404        self.normalize_std = Some(std);
405        self
406    }
407
408    /// Enable grayscale conversion
409    pub fn with_grayscale(mut self) -> Self {
410        self.to_grayscale = true;
411        self
412    }
413
414    /// Preprocess a frame
415    pub fn preprocess(&self, frame: &Frame) -> Result<Frame> {
416        let processed = frame.clone();
417
418        // TODO: Implement actual preprocessing operations
419        // - Resize if target_size is set
420        // - Normalize if mean/std are set
421        // - Convert to grayscale if enabled
422
423        Ok(processed)
424    }
425}
426
427/// Batch processor for efficient multi-frame processing
428pub struct BatchProcessor {
429    batch_size: usize,
430    frames: Vec<Frame>,
431}
432
433impl BatchProcessor {
434    /// Create a new batch processor
435    pub fn new(batch_size: usize) -> Self {
436        Self {
437            batch_size,
438            frames: Vec::with_capacity(batch_size),
439        }
440    }
441
442    /// Add a frame to the batch
443    pub fn add_frame(&mut self, frame: Frame) -> Option<Vec<Frame>> {
444        self.frames.push(frame);
445
446        if self.frames.len() >= self.batch_size {
447            Some(self.flush())
448        } else {
449            None
450        }
451    }
452
453    /// Get the current batch without clearing
454    pub fn current_batch(&self) -> &[Frame] {
455        &self.frames
456    }
457
458    /// Flush the current batch
459    pub fn flush(&mut self) -> Vec<Frame> {
460        std::mem::replace(&mut self.frames, Vec::with_capacity(self.batch_size))
461    }
462
463    /// Check if batch is full
464    pub fn is_full(&self) -> bool {
465        self.frames.len() >= self.batch_size
466    }
467
468    /// Get number of frames in current batch
469    pub fn len(&self) -> usize {
470        self.frames.len()
471    }
472
473    /// Check if batch is empty
474    pub fn is_empty(&self) -> bool {
475        self.frames.is_empty()
476    }
477}
478
479#[cfg(test)]
480mod tests {
481    use super::*;
482
483    fn create_dummy_frame(frame_number: u64) -> Frame {
484        use torsh_core::DeviceType;
485        Frame {
486            data: Tensor::zeros(&[224, 224, 3], DeviceType::Cpu).expect("Failed to create tensor"),
487            metadata: FrameMetadata {
488                frame_number,
489                timestamp: Instant::now(),
490                width: 224,
491                height: 224,
492                is_keyframe: frame_number % 10 == 0,
493                priority: 1,
494            },
495        }
496    }
497
498    #[test]
499    fn test_stream_config_default() {
500        let config = StreamConfig::default();
501        assert_eq!(config.buffer_size, 30);
502        assert_eq!(config.target_fps, 30.0);
503        assert!(config.enable_frame_drop);
504    }
505
506    #[test]
507    fn test_stream_processor_creation() {
508        let processor = StreamProcessor::new(StreamConfig::default());
509        assert!(processor.is_ok());
510    }
511
512    #[test]
513    fn test_push_pop_frame() {
514        let processor =
515            StreamProcessor::new(StreamConfig::default()).expect("Failed to create processor");
516
517        let frame = create_dummy_frame(1);
518        processor
519            .push_frame(frame.clone())
520            .expect("Failed to push frame");
521
522        let popped = processor.pop_frame().expect("Failed to pop frame");
523        assert!(popped.is_some());
524        assert_eq!(popped.unwrap().metadata.frame_number, 1);
525    }
526
527    #[test]
528    fn test_frame_dropping() {
529        let mut config = StreamConfig::default();
530        config.buffer_size = 2;
531        config.enable_frame_drop = true;
532
533        let processor = StreamProcessor::new(config).expect("Failed to create processor");
534
535        // Fill buffer beyond capacity
536        for i in 0..5 {
537            let frame = create_dummy_frame(i);
538            processor.push_frame(frame).expect("Failed to push frame");
539        }
540
541        let stats = processor.stats();
542        assert!(stats.dropped_frames > 0);
543    }
544
545    #[test]
546    fn test_processing_time_recording() {
547        let processor =
548            StreamProcessor::new(StreamConfig::default()).expect("Failed to create processor");
549
550        processor.record_processing_time(Duration::from_millis(10));
551        processor.record_processing_time(Duration::from_millis(20));
552
553        let stats = processor.stats();
554        assert!(stats.avg_processing_time > 0.0);
555    }
556
557    #[test]
558    fn test_batch_processor() {
559        let mut batch = BatchProcessor::new(3);
560
561        assert!(batch.is_empty());
562        assert!(!batch.is_full());
563
564        batch.add_frame(create_dummy_frame(1));
565        batch.add_frame(create_dummy_frame(2));
566
567        assert_eq!(batch.len(), 2);
568        assert!(!batch.is_full());
569
570        let result = batch.add_frame(create_dummy_frame(3));
571        assert!(result.is_some());
572        assert_eq!(result.unwrap().len(), 3);
573        assert!(batch.is_empty());
574    }
575
576    #[test]
577    fn test_frame_preprocessor() {
578        let preprocessor = FramePreprocessor::new()
579            .with_resize(224, 224)
580            .with_grayscale();
581
582        let frame = create_dummy_frame(1);
583        let result = preprocessor.preprocess(&frame);
584        assert!(result.is_ok());
585    }
586
587    #[test]
588    fn test_quality_adaptation_variants() {
589        assert_eq!(QualityAdaptation::None, QualityAdaptation::None);
590        assert_ne!(
591            QualityAdaptation::None,
592            QualityAdaptation::ResolutionScaling
593        );
594    }
595
596    #[test]
597    fn test_is_realtime() {
598        let processor =
599            StreamProcessor::new(StreamConfig::default()).expect("Failed to create processor");
600
601        // Initially no frames processed, should return true (no data)
602        // After processing, FPS will be calculated
603        processor.record_processing_time(Duration::from_millis(10));
604        let is_rt = processor.is_realtime();
605        // Result depends on target FPS (30) vs current FPS (100 from 10ms processing)
606        assert!(is_rt); // 100 FPS > 30 FPS target
607    }
608
609    #[test]
610    fn test_stream_stats_default() {
611        let stats = StreamStats::default();
612        assert_eq!(stats.total_frames, 0);
613        assert_eq!(stats.dropped_frames, 0);
614        assert_eq!(stats.current_fps, 0.0);
615    }
616}