scirs2_vision/streaming_modules/
memory.rs1use super::core::{Frame, FrameMetadata, PipelineMetrics, ProcessingStage};
7use crate::error::Result;
8use crossbeam_channel::{bounded, Receiver};
9use scirs2_core::ndarray::Array2;
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Instant;
13
14pub struct AdvancedStreamPipeline {
16 pub(crate) stages: Vec<Box<dyn ProcessingStage>>,
17 pub(crate) buffer_size: usize,
18 pub(crate) num_threads: usize,
19 pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
20 pub(crate) frame_pool: Arc<Mutex<FramePool>>,
21 pub(crate) memory_profiler: Arc<Mutex<MemoryProfiler>>,
22}
23
24pub struct FramePool {
26 pub(crate) available_frames: Vec<Frame>,
27 pub(crate) max_pool_size: usize,
28 pub(crate) frame_dimensions: Option<(usize, usize)>,
29}
30
31impl FramePool {
32 pub fn new() -> Self {
38 Self {
39 available_frames: Vec::new(),
40 max_pool_size: 50,
41 frame_dimensions: None,
42 }
43 }
44
45 pub fn get_frame(&mut self, width: usize, height: usize) -> Frame {
56 if let Some(frame_dims) = self.frame_dimensions {
58 if frame_dims == (height, width) && !self.available_frames.is_empty() {
59 let mut frame = self.available_frames.pop().expect("Operation failed");
60 frame.data.fill(0.0);
62 frame.timestamp = Instant::now();
63 return frame;
64 }
65 }
66
67 Frame {
69 data: Array2::zeros((height, width)),
70 timestamp: Instant::now(),
71 index: 0,
72 metadata: Some(FrameMetadata {
73 width: width as u32,
74 height: height as u32,
75 fps: 30.0,
76 channels: 1,
77 }),
78 }
79 }
80
81 pub fn return_frame(&mut self, frame: Frame) {
87 if self.available_frames.len() < self.max_pool_size {
88 let (height, width) = frame.data.dim();
89 self.frame_dimensions = Some((height, width));
90 self.available_frames.push(frame);
91 }
92 }
93}
94
95impl Default for FramePool {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101pub struct MemoryProfiler {
103 peak_memory: usize,
104 current_memory: usize,
105 allocation_count: usize,
106 memory_timeline: Vec<(Instant, usize)>,
107}
108
109impl MemoryProfiler {
110 pub fn new() -> Self {
116 Self {
117 peak_memory: 0,
118 current_memory: 0,
119 allocation_count: 0,
120 memory_timeline: Vec::new(),
121 }
122 }
123
124 pub fn record_allocation(&mut self, size: usize) {
130 self.current_memory += size;
131 self.allocation_count += 1;
132 if self.current_memory > self.peak_memory {
133 self.peak_memory = self.current_memory;
134 }
135 self.memory_timeline
136 .push((Instant::now(), self.current_memory));
137 }
138
139 pub fn record_deallocation(&mut self, size: usize) {
145 self.current_memory = self.current_memory.saturating_sub(size);
146 self.memory_timeline
147 .push((Instant::now(), self.current_memory));
148 }
149
150 pub fn get_stats(&self) -> MemoryStats {
156 MemoryStats {
157 peak_memory: self.peak_memory,
158 current_memory: self.current_memory,
159 allocation_count: self.allocation_count,
160 average_memory: if !self.memory_timeline.is_empty() {
161 self.memory_timeline
162 .iter()
163 .map(|(_, mem)| *mem)
164 .sum::<usize>()
165 / self.memory_timeline.len()
166 } else {
167 0
168 },
169 }
170 }
171
172 pub fn reset(&mut self) {
174 self.peak_memory = 0;
175 self.current_memory = 0;
176 self.allocation_count = 0;
177 self.memory_timeline.clear();
178 }
179}
180
181impl Default for MemoryProfiler {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187#[derive(Debug, Clone)]
189pub struct MemoryStats {
190 pub peak_memory: usize,
192 pub current_memory: usize,
194 pub allocation_count: usize,
196 pub average_memory: usize,
198}
199
200impl Default for AdvancedStreamPipeline {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl AdvancedStreamPipeline {
207 pub fn new() -> Self {
209 Self {
210 stages: Vec::new(),
211 buffer_size: 10,
212 num_threads: num_cpus::get(),
213 metrics: Arc::new(Mutex::new(PipelineMetrics::default())),
214 frame_pool: Arc::new(Mutex::new(FramePool::new())),
215 memory_profiler: Arc::new(Mutex::new(MemoryProfiler::new())),
216 }
217 }
218
219 pub fn with_buffer_size(mut self, size: usize) -> Self {
229 self.buffer_size = size;
230 self
231 }
232
233 pub fn with_num_threads(mut self, threads: usize) -> Self {
243 self.num_threads = threads;
244 self
245 }
246
247 pub fn with_zero_copy(self) -> Self {
249 {
251 let mut pool = self.frame_pool.lock().expect("Operation failed");
252
253 let common_sizes = [(480, 640), (720, 1280), (1080, 1920), (240, 320)];
255
256 for &(height, width) in &common_sizes {
257 for _ in 0..5 {
258 let frame = Frame {
259 data: Array2::zeros((height, width)),
260 timestamp: Instant::now(),
261 index: 0,
262 metadata: Some(FrameMetadata {
263 width: width as u32,
264 height: height as u32,
265 fps: 30.0,
266 channels: 1,
267 }),
268 };
269 pool.available_frames.push(frame);
270 }
271 }
272 } self
275 }
276
277 pub fn add_simd_stage<S: ProcessingStage>(mut self, stage: S) -> Self {
279 self.stages.push(Box::new(stage));
280 self
281 }
282
283 pub fn process_advanced_stream<I>(&mut self, input: I) -> AdvancedStreamProcessor
285 where
286 I: Iterator<Item = Frame> + Send + 'static,
287 {
288 let (tx, rx) = bounded::<Frame>(self.buffer_size);
289 let metrics = Arc::clone(&self.metrics);
290 let frame_pool = Arc::clone(&self.frame_pool);
291 let memory_profiler = Arc::clone(&self.memory_profiler);
292
293 let mut channels = vec![rx];
295 let mut worker_handles = Vec::new();
296
297 for stage in self.stages.drain(..) {
298 let (stage_tx, stage_rx) = bounded(self.buffer_size);
299 channels.push(stage_rx);
300
301 let stage_metrics = Arc::clone(&metrics);
302 let _stage_frame_pool = Arc::clone(&frame_pool);
303 let stage_memory_profiler = Arc::clone(&memory_profiler);
304 let stagename = stage.name().to_string();
305 let prev_rx = channels[channels.len() - 2].clone();
306
307 let handle = thread::spawn(move || {
309 let mut stage = stage;
310 let _local_frame_buffer: Vec<Frame> = Vec::with_capacity(10);
311
312 while let Ok(frame) = prev_rx.recv() {
313 let start = Instant::now();
314 let frame_size = frame.data.len() * std::mem::size_of::<f32>();
315
316 if let Ok(mut profiler) = stage_memory_profiler.lock() {
318 profiler.record_allocation(frame_size);
319 }
320
321 match stage.process(frame) {
322 Ok(processed) => {
323 let duration = start.elapsed();
324
325 if let Ok(mut m) = stage_metrics.try_lock() {
327 m.frames_processed += 1;
328 m.avg_processing_time = std::time::Duration::from_secs_f64(
329 (m.avg_processing_time.as_secs_f64()
330 * (m.frames_processed - 1) as f64
331 + duration.as_secs_f64())
332 / m.frames_processed as f64,
333 );
334 if duration > m.peak_processing_time {
335 m.peak_processing_time = duration;
336 }
337
338 let fps = (1.0 / duration.as_secs_f64()) as f32;
340 m.fps = m.fps * 0.9 + fps * 0.1; }
342
343 if stage_tx.send(processed).is_err() {
344 break;
345 }
346 }
347 Err(e) => {
348 eprintln!("Stage {stagename} error: {e}");
349 if let Ok(mut m) = stage_metrics.try_lock() {
350 m.dropped_frames += 1;
351 }
352 }
353 }
354
355 if let Ok(mut profiler) = stage_memory_profiler.lock() {
357 profiler.record_deallocation(frame_size);
358 }
359 }
360 });
361
362 worker_handles.push(handle);
363 }
364
365 let output_rx = channels.pop().expect("Operation failed");
366
367 thread::spawn(move || {
369 let mut frame_batch = Vec::with_capacity(4);
370
371 for frame in input {
372 frame_batch.push(frame);
373
374 if frame_batch.len() >= 4 {
376 for frame in frame_batch.drain(..) {
377 if tx.send(frame).is_err() {
378 return;
379 }
380 }
381 }
382 }
383
384 for frame in frame_batch {
386 if tx.send(frame).is_err() {
387 break;
388 }
389 }
390 });
391
392 AdvancedStreamProcessor {
393 output: output_rx,
394 metrics,
395 frame_pool,
396 memory_profiler,
397 worker_handles,
398 }
399 }
400
401 pub fn memory_stats(&self) -> MemoryStats {
403 self.memory_profiler
404 .lock()
405 .expect("Operation failed")
406 .get_stats()
407 }
408
409 pub fn reset_memory_stats(&self) {
411 self.memory_profiler
412 .lock()
413 .expect("Operation failed")
414 .reset();
415 }
416
417 pub fn metrics(&self) -> PipelineMetrics {
419 self.metrics.lock().expect("Operation failed").clone()
420 }
421}
422
423pub struct AdvancedStreamProcessor {
425 pub(crate) output: Receiver<Frame>,
426 pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
427 pub(crate) frame_pool: Arc<Mutex<FramePool>>,
428 pub(crate) memory_profiler: Arc<Mutex<MemoryProfiler>>,
429 #[allow(dead_code)]
430 pub(crate) worker_handles: Vec<thread::JoinHandle<()>>,
431}
432
433impl AdvancedStreamProcessor {
434 pub fn next_zero_copy(&self) -> Option<Frame> {
436 self.output.recv().ok()
437 }
438
439 pub fn try_next(&self) -> Option<Frame> {
441 self.output.try_recv().ok()
442 }
443
444 pub fn next_batch(&self, batchsize: usize) -> Vec<Frame> {
454 let mut batch = Vec::with_capacity(batchsize);
455
456 for _ in 0..batchsize {
457 if let Some(frame) = self.try_next() {
458 batch.push(frame);
459 } else {
460 break;
461 }
462 }
463
464 batch
465 }
466
467 pub fn return_frame(&self, frame: Frame) {
473 if let Ok(mut pool) = self.frame_pool.lock() {
474 pool.return_frame(frame);
475 }
476 }
477
478 pub fn metrics(&self) -> PipelineMetrics {
480 self.metrics.lock().expect("Operation failed").clone()
481 }
482
483 pub fn memory_stats(&self) -> MemoryStats {
485 self.memory_profiler
486 .lock()
487 .expect("Operation failed")
488 .get_stats()
489 }
490
491 pub fn reset_memory_stats(&self) {
493 self.memory_profiler
494 .lock()
495 .expect("Operation failed")
496 .reset();
497 }
498}
499
500impl Iterator for AdvancedStreamProcessor {
501 type Item = Frame;
502
503 fn next(&mut self) -> Option<Self::Item> {
504 self.output.recv().ok()
505 }
506}