Skip to main content

scirs2_vision/streaming_modules/
core.rs

1//! Core streaming infrastructure
2//!
3//! This module provides the fundamental building blocks for streaming video processing,
4//! including frame structures, processing pipeline architecture, and performance metrics.
5
6use crate::error::Result;
7use crossbeam_channel::{bounded, Receiver};
8use scirs2_core::ndarray::Array2;
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12
13/// Frame type for streaming processing
14#[derive(Clone)]
15pub struct Frame {
16    /// Frame data as 2D array
17    pub data: Array2<f32>,
18    /// Frame timestamp
19    pub timestamp: Instant,
20    /// Frame index
21    pub index: usize,
22    /// Optional metadata
23    pub metadata: Option<FrameMetadata>,
24}
25
26/// Frame metadata
27#[derive(Clone, Debug)]
28pub struct FrameMetadata {
29    /// Frame width
30    pub width: u32,
31    /// Frame height
32    pub height: u32,
33    /// Frames per second
34    pub fps: f32,
35    /// Color channels
36    pub channels: u8,
37}
38
39/// Processing stage trait
40pub trait ProcessingStage: Send + 'static {
41    /// Process a single frame
42    fn process(&mut self, frame: Frame) -> Result<Frame>;
43
44    /// Get stage name for monitoring
45    fn name(&self) -> &str;
46}
47
48/// Stream processing pipeline
49pub struct StreamPipeline {
50    pub(crate) stages: Vec<Box<dyn ProcessingStage>>,
51    pub(crate) buffer_size: usize,
52    pub(crate) num_threads: usize,
53    pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
54}
55
56/// Pipeline performance metrics
57#[derive(Default, Clone)]
58pub struct PipelineMetrics {
59    /// Total frames processed
60    pub frames_processed: usize,
61    /// Average processing time per frame
62    pub avg_processing_time: Duration,
63    /// Peak processing time
64    pub peak_processing_time: Duration,
65    /// Frames per second
66    pub fps: f32,
67    /// Dropped frames
68    pub dropped_frames: usize,
69}
70
71impl Default for StreamPipeline {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77impl StreamPipeline {
78    /// Create a new streaming pipeline
79    pub fn new() -> Self {
80        Self {
81            stages: Vec::new(),
82            buffer_size: 10,
83            num_threads: num_cpus::get(),
84            metrics: Arc::new(Mutex::new(PipelineMetrics::default())),
85        }
86    }
87
88    /// Set buffer size for inter-stage communication
89    pub fn with_buffer_size(mut self, size: usize) -> Self {
90        self.buffer_size = size;
91        self
92    }
93
94    /// Set number of worker threads
95    pub fn with_num_threads(mut self, threads: usize) -> Self {
96        self.num_threads = threads;
97        self
98    }
99
100    /// Add a processing stage to the pipeline
101    pub fn add_stage<S: ProcessingStage>(mut self, stage: S) -> Self {
102        self.stages.push(Box::new(stage));
103        self
104    }
105
106    /// Process a stream of frames
107    pub fn process_stream<I>(&mut self, input: I) -> StreamProcessor
108    where
109        I: Iterator<Item = Frame> + Send + 'static,
110    {
111        let (tx, rx) = bounded(self.buffer_size);
112        let metrics = Arc::clone(&self.metrics);
113
114        // Create pipeline stages with channels
115        let mut channels = vec![rx];
116
117        for stage in self.stages.drain(..) {
118            let (stage_tx, stage_rx) = bounded(self.buffer_size);
119            channels.push(stage_rx);
120
121            let stage_metrics = Arc::clone(&metrics);
122            let stagename = stage.name().to_string();
123            let prev_rx = channels[channels.len() - 2].clone();
124
125            // Spawn worker thread for this stage
126            thread::spawn(move || {
127                let mut stage = stage;
128                while let Ok(frame) = prev_rx.recv() {
129                    let start = Instant::now();
130
131                    match stage.process(frame) {
132                        Ok(processed) => {
133                            let duration = start.elapsed();
134
135                            // Update metrics
136                            if let Ok(mut m) = stage_metrics.lock() {
137                                m.frames_processed += 1;
138                                m.avg_processing_time = Duration::from_secs_f64(
139                                    (m.avg_processing_time.as_secs_f64()
140                                        * (m.frames_processed - 1) as f64
141                                        + duration.as_secs_f64())
142                                        / m.frames_processed as f64,
143                                );
144                                if duration > m.peak_processing_time {
145                                    m.peak_processing_time = duration;
146                                }
147                            }
148
149                            if stage_tx.send(processed).is_err() {
150                                break;
151                            }
152                        }
153                        Err(e) => {
154                            eprintln!("Stage {stagename} error: {e}");
155                            if let Ok(mut m) = stage_metrics.lock() {
156                                m.dropped_frames += 1;
157                            }
158                        }
159                    }
160                }
161            });
162        }
163
164        let output_rx = channels.pop().expect("Operation failed");
165
166        // Input thread
167        thread::spawn(move || {
168            for frame in input {
169                if tx.send(frame).is_err() {
170                    break;
171                }
172            }
173        });
174
175        // Return processor with output channel
176        StreamProcessor {
177            output: output_rx,
178            metrics,
179        }
180    }
181
182    /// Get current pipeline metrics
183    pub fn metrics(&self) -> PipelineMetrics {
184        self.metrics.lock().expect("Operation failed").clone()
185    }
186}
187
188/// Stream processor handle
189pub struct StreamProcessor {
190    pub(crate) output: Receiver<Frame>,
191    pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
192}
193
194impl StreamProcessor {
195    /// Get the next processed frame
196    pub fn next(&self) -> Option<Frame> {
197        self.output.recv().ok()
198    }
199
200    /// Try to get the next frame without blocking
201    pub fn try_next(&self) -> Option<Frame> {
202        self.output.try_recv().ok()
203    }
204
205    /// Get current metrics
206    pub fn metrics(&self) -> PipelineMetrics {
207        self.metrics.lock().expect("Operation failed").clone()
208    }
209}
210
211impl Iterator for StreamProcessor {
212    type Item = Frame;
213
214    fn next(&mut self) -> Option<Self::Item> {
215        self.output.recv().ok()
216    }
217}