scirs2_vision/streaming_modules/
core.rs1use crate::error::Result;
7use crossbeam_channel::{bounded, Receiver};
8use scirs2_core::ndarray::Array2;
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12
13#[derive(Clone)]
15pub struct Frame {
16 pub data: Array2<f32>,
18 pub timestamp: Instant,
20 pub index: usize,
22 pub metadata: Option<FrameMetadata>,
24}
25
26#[derive(Clone, Debug)]
28pub struct FrameMetadata {
29 pub width: u32,
31 pub height: u32,
33 pub fps: f32,
35 pub channels: u8,
37}
38
39pub trait ProcessingStage: Send + 'static {
41 fn process(&mut self, frame: Frame) -> Result<Frame>;
43
44 fn name(&self) -> &str;
46}
47
48pub struct StreamPipeline {
50 pub(crate) stages: Vec<Box<dyn ProcessingStage>>,
51 pub(crate) buffer_size: usize,
52 pub(crate) num_threads: usize,
53 pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
54}
55
56#[derive(Default, Clone)]
58pub struct PipelineMetrics {
59 pub frames_processed: usize,
61 pub avg_processing_time: Duration,
63 pub peak_processing_time: Duration,
65 pub fps: f32,
67 pub dropped_frames: usize,
69}
70
71impl Default for StreamPipeline {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77impl StreamPipeline {
78 pub fn new() -> Self {
80 Self {
81 stages: Vec::new(),
82 buffer_size: 10,
83 num_threads: num_cpus::get(),
84 metrics: Arc::new(Mutex::new(PipelineMetrics::default())),
85 }
86 }
87
88 pub fn with_buffer_size(mut self, size: usize) -> Self {
90 self.buffer_size = size;
91 self
92 }
93
94 pub fn with_num_threads(mut self, threads: usize) -> Self {
96 self.num_threads = threads;
97 self
98 }
99
100 pub fn add_stage<S: ProcessingStage>(mut self, stage: S) -> Self {
102 self.stages.push(Box::new(stage));
103 self
104 }
105
106 pub fn process_stream<I>(&mut self, input: I) -> StreamProcessor
108 where
109 I: Iterator<Item = Frame> + Send + 'static,
110 {
111 let (tx, rx) = bounded(self.buffer_size);
112 let metrics = Arc::clone(&self.metrics);
113
114 let mut channels = vec![rx];
116
117 for stage in self.stages.drain(..) {
118 let (stage_tx, stage_rx) = bounded(self.buffer_size);
119 channels.push(stage_rx);
120
121 let stage_metrics = Arc::clone(&metrics);
122 let stagename = stage.name().to_string();
123 let prev_rx = channels[channels.len() - 2].clone();
124
125 thread::spawn(move || {
127 let mut stage = stage;
128 while let Ok(frame) = prev_rx.recv() {
129 let start = Instant::now();
130
131 match stage.process(frame) {
132 Ok(processed) => {
133 let duration = start.elapsed();
134
135 if let Ok(mut m) = stage_metrics.lock() {
137 m.frames_processed += 1;
138 m.avg_processing_time = Duration::from_secs_f64(
139 (m.avg_processing_time.as_secs_f64()
140 * (m.frames_processed - 1) as f64
141 + duration.as_secs_f64())
142 / m.frames_processed as f64,
143 );
144 if duration > m.peak_processing_time {
145 m.peak_processing_time = duration;
146 }
147 }
148
149 if stage_tx.send(processed).is_err() {
150 break;
151 }
152 }
153 Err(e) => {
154 eprintln!("Stage {stagename} error: {e}");
155 if let Ok(mut m) = stage_metrics.lock() {
156 m.dropped_frames += 1;
157 }
158 }
159 }
160 }
161 });
162 }
163
164 let output_rx = channels.pop().expect("Operation failed");
165
166 thread::spawn(move || {
168 for frame in input {
169 if tx.send(frame).is_err() {
170 break;
171 }
172 }
173 });
174
175 StreamProcessor {
177 output: output_rx,
178 metrics,
179 }
180 }
181
182 pub fn metrics(&self) -> PipelineMetrics {
184 self.metrics.lock().expect("Operation failed").clone()
185 }
186}
187
188pub struct StreamProcessor {
190 pub(crate) output: Receiver<Frame>,
191 pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
192}
193
194impl StreamProcessor {
195 pub fn next(&self) -> Option<Frame> {
197 self.output.recv().ok()
198 }
199
200 pub fn try_next(&self) -> Option<Frame> {
202 self.output.try_recv().ok()
203 }
204
205 pub fn metrics(&self) -> PipelineMetrics {
207 self.metrics.lock().expect("Operation failed").clone()
208 }
209}
210
211impl Iterator for StreamProcessor {
212 type Item = Frame;
213
214 fn next(&mut self) -> Option<Self::Item> {
215 self.output.recv().ok()
216 }
217}