Skip to main content

scirs2_vision/streaming_modules/
memory.rs

1//! Memory management and pooling for streaming
2//!
3//! This module provides advanced memory management capabilities including frame pooling,
4//! zero-copy processing, and memory profiling for high-performance streaming applications.
5
6use super::core::{Frame, FrameMetadata, PipelineMetrics, ProcessingStage};
7use crate::error::Result;
8use crossbeam_channel::{bounded, Receiver};
9use scirs2_core::ndarray::Array2;
10use std::sync::{Arc, Mutex};
11use std::thread;
12use std::time::Instant;
13
14/// Advanced streaming pipeline with memory optimization
15pub struct AdvancedStreamPipeline {
16    pub(crate) stages: Vec<Box<dyn ProcessingStage>>,
17    pub(crate) buffer_size: usize,
18    pub(crate) num_threads: usize,
19    pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
20    pub(crate) frame_pool: Arc<Mutex<FramePool>>,
21    pub(crate) memory_profiler: Arc<Mutex<MemoryProfiler>>,
22}
23
24/// Frame pool for memory reuse and zero-copy operations
25pub struct FramePool {
26    pub(crate) available_frames: Vec<Frame>,
27    pub(crate) max_pool_size: usize,
28    pub(crate) frame_dimensions: Option<(usize, usize)>,
29}
30
31impl FramePool {
32    /// Create a new frame pool
33    ///
34    /// # Returns
35    ///
36    /// * New frame pool instance
37    pub fn new() -> Self {
38        Self {
39            available_frames: Vec::new(),
40            max_pool_size: 50,
41            frame_dimensions: None,
42        }
43    }
44
45    /// Get a frame from the pool or create a new one
46    ///
47    /// # Arguments
48    ///
49    /// * `width` - Frame width
50    /// * `height` - Frame height
51    ///
52    /// # Returns
53    ///
54    /// * Frame ready for use
55    pub fn get_frame(&mut self, width: usize, height: usize) -> Frame {
56        // Try to reuse an existing frame with matching dimensions
57        if let Some(frame_dims) = self.frame_dimensions {
58            if frame_dims == (height, width) && !self.available_frames.is_empty() {
59                let mut frame = self.available_frames.pop().expect("Operation failed");
60                // Reset the frame data
61                frame.data.fill(0.0);
62                frame.timestamp = Instant::now();
63                return frame;
64            }
65        }
66
67        // Create new frame if none available
68        Frame {
69            data: Array2::zeros((height, width)),
70            timestamp: Instant::now(),
71            index: 0,
72            metadata: Some(FrameMetadata {
73                width: width as u32,
74                height: height as u32,
75                fps: 30.0,
76                channels: 1,
77            }),
78        }
79    }
80
81    /// Return a frame to the pool
82    ///
83    /// # Arguments
84    ///
85    /// * `frame` - Frame to return to the pool
86    pub fn return_frame(&mut self, frame: Frame) {
87        if self.available_frames.len() < self.max_pool_size {
88            let (height, width) = frame.data.dim();
89            self.frame_dimensions = Some((height, width));
90            self.available_frames.push(frame);
91        }
92    }
93}
94
95impl Default for FramePool {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101/// Memory usage profiler for streaming operations
102pub struct MemoryProfiler {
103    peak_memory: usize,
104    current_memory: usize,
105    allocation_count: usize,
106    memory_timeline: Vec<(Instant, usize)>,
107}
108
109impl MemoryProfiler {
110    /// Create a new memory profiler
111    ///
112    /// # Returns
113    ///
114    /// * New memory profiler instance
115    pub fn new() -> Self {
116        Self {
117            peak_memory: 0,
118            current_memory: 0,
119            allocation_count: 0,
120            memory_timeline: Vec::new(),
121        }
122    }
123
124    /// Record a memory allocation
125    ///
126    /// # Arguments
127    ///
128    /// * `size` - Size of allocation in bytes
129    pub fn record_allocation(&mut self, size: usize) {
130        self.current_memory += size;
131        self.allocation_count += 1;
132        if self.current_memory > self.peak_memory {
133            self.peak_memory = self.current_memory;
134        }
135        self.memory_timeline
136            .push((Instant::now(), self.current_memory));
137    }
138
139    /// Record a memory deallocation
140    ///
141    /// # Arguments
142    ///
143    /// * `size` - Size of deallocation in bytes
144    pub fn record_deallocation(&mut self, size: usize) {
145        self.current_memory = self.current_memory.saturating_sub(size);
146        self.memory_timeline
147            .push((Instant::now(), self.current_memory));
148    }
149
150    /// Get current memory statistics
151    ///
152    /// # Returns
153    ///
154    /// * Memory usage statistics
155    pub fn get_stats(&self) -> MemoryStats {
156        MemoryStats {
157            peak_memory: self.peak_memory,
158            current_memory: self.current_memory,
159            allocation_count: self.allocation_count,
160            average_memory: if !self.memory_timeline.is_empty() {
161                self.memory_timeline
162                    .iter()
163                    .map(|(_, mem)| *mem)
164                    .sum::<usize>()
165                    / self.memory_timeline.len()
166            } else {
167                0
168            },
169        }
170    }
171
172    /// Reset profiler statistics
173    pub fn reset(&mut self) {
174        self.peak_memory = 0;
175        self.current_memory = 0;
176        self.allocation_count = 0;
177        self.memory_timeline.clear();
178    }
179}
180
181impl Default for MemoryProfiler {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187/// Memory usage statistics
188#[derive(Debug, Clone)]
189pub struct MemoryStats {
190    /// Peak memory usage observed
191    pub peak_memory: usize,
192    /// Current memory usage
193    pub current_memory: usize,
194    /// Total number of allocations
195    pub allocation_count: usize,
196    /// Average memory usage across all operations
197    pub average_memory: usize,
198}
199
200impl Default for AdvancedStreamPipeline {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl AdvancedStreamPipeline {
207    /// Create a new advanced-performance streaming pipeline
208    pub fn new() -> Self {
209        Self {
210            stages: Vec::new(),
211            buffer_size: 10,
212            num_threads: num_cpus::get(),
213            metrics: Arc::new(Mutex::new(PipelineMetrics::default())),
214            frame_pool: Arc::new(Mutex::new(FramePool::new())),
215            memory_profiler: Arc::new(Mutex::new(MemoryProfiler::new())),
216        }
217    }
218
219    /// Set buffer size for inter-stage communication
220    ///
221    /// # Arguments
222    ///
223    /// * `size` - Buffer size for channels
224    ///
225    /// # Returns
226    ///
227    /// * Self for method chaining
228    pub fn with_buffer_size(mut self, size: usize) -> Self {
229        self.buffer_size = size;
230        self
231    }
232
233    /// Set number of worker threads
234    ///
235    /// # Arguments
236    ///
237    /// * `threads` - Number of worker threads
238    ///
239    /// # Returns
240    ///
241    /// * Self for method chaining
242    pub fn with_num_threads(mut self, threads: usize) -> Self {
243        self.num_threads = threads;
244        self
245    }
246
247    /// Enable zero-copy processing with memory pooling
248    pub fn with_zero_copy(self) -> Self {
249        // Pre-allocate frame pool for common video sizes
250        {
251            let mut pool = self.frame_pool.lock().expect("Operation failed");
252
253            // Common video resolutions
254            let common_sizes = [(480, 640), (720, 1280), (1080, 1920), (240, 320)];
255
256            for &(height, width) in &common_sizes {
257                for _ in 0..5 {
258                    let frame = Frame {
259                        data: Array2::zeros((height, width)),
260                        timestamp: Instant::now(),
261                        index: 0,
262                        metadata: Some(FrameMetadata {
263                            width: width as u32,
264                            height: height as u32,
265                            fps: 30.0,
266                            channels: 1,
267                        }),
268                    };
269                    pool.available_frames.push(frame);
270                }
271            }
272        } // Drop the lock here
273
274        self
275    }
276
277    /// Add a SIMD-optimized processing stage
278    pub fn add_simd_stage<S: ProcessingStage>(mut self, stage: S) -> Self {
279        self.stages.push(Box::new(stage));
280        self
281    }
282
283    /// Process stream with advanced-performance optimizations
284    pub fn process_advanced_stream<I>(&mut self, input: I) -> AdvancedStreamProcessor
285    where
286        I: Iterator<Item = Frame> + Send + 'static,
287    {
288        let (tx, rx) = bounded::<Frame>(self.buffer_size);
289        let metrics = Arc::clone(&self.metrics);
290        let frame_pool = Arc::clone(&self.frame_pool);
291        let memory_profiler = Arc::clone(&self.memory_profiler);
292
293        // Create optimized pipeline with pre-allocated channels
294        let mut channels = vec![rx];
295        let mut worker_handles = Vec::new();
296
297        for stage in self.stages.drain(..) {
298            let (stage_tx, stage_rx) = bounded(self.buffer_size);
299            channels.push(stage_rx);
300
301            let stage_metrics = Arc::clone(&metrics);
302            let _stage_frame_pool = Arc::clone(&frame_pool);
303            let stage_memory_profiler = Arc::clone(&memory_profiler);
304            let stagename = stage.name().to_string();
305            let prev_rx = channels[channels.len() - 2].clone();
306
307            // Spawn optimized worker thread
308            let handle = thread::spawn(move || {
309                let mut stage = stage;
310                let _local_frame_buffer: Vec<Frame> = Vec::with_capacity(10);
311
312                while let Ok(frame) = prev_rx.recv() {
313                    let start = Instant::now();
314                    let frame_size = frame.data.len() * std::mem::size_of::<f32>();
315
316                    // Record memory usage
317                    if let Ok(mut profiler) = stage_memory_profiler.lock() {
318                        profiler.record_allocation(frame_size);
319                    }
320
321                    match stage.process(frame) {
322                        Ok(processed) => {
323                            let duration = start.elapsed();
324
325                            // Update metrics with lock optimization
326                            if let Ok(mut m) = stage_metrics.try_lock() {
327                                m.frames_processed += 1;
328                                m.avg_processing_time = std::time::Duration::from_secs_f64(
329                                    (m.avg_processing_time.as_secs_f64()
330                                        * (m.frames_processed - 1) as f64
331                                        + duration.as_secs_f64())
332                                        / m.frames_processed as f64,
333                                );
334                                if duration > m.peak_processing_time {
335                                    m.peak_processing_time = duration;
336                                }
337
338                                // Calculate FPS
339                                let fps = (1.0 / duration.as_secs_f64()) as f32;
340                                m.fps = m.fps * 0.9 + fps * 0.1; // Smooth FPS calculation
341                            }
342
343                            if stage_tx.send(processed).is_err() {
344                                break;
345                            }
346                        }
347                        Err(e) => {
348                            eprintln!("Stage {stagename} error: {e}");
349                            if let Ok(mut m) = stage_metrics.try_lock() {
350                                m.dropped_frames += 1;
351                            }
352                        }
353                    }
354
355                    // Record memory deallocation
356                    if let Ok(mut profiler) = stage_memory_profiler.lock() {
357                        profiler.record_deallocation(frame_size);
358                    }
359                }
360            });
361
362            worker_handles.push(handle);
363        }
364
365        let output_rx = channels.pop().expect("Operation failed");
366
367        // Optimized input thread with batching
368        thread::spawn(move || {
369            let mut frame_batch = Vec::with_capacity(4);
370
371            for frame in input {
372                frame_batch.push(frame);
373
374                // Process in small batches for better cache locality
375                if frame_batch.len() >= 4 {
376                    for frame in frame_batch.drain(..) {
377                        if tx.send(frame).is_err() {
378                            return;
379                        }
380                    }
381                }
382            }
383
384            // Send remaining frames
385            for frame in frame_batch {
386                if tx.send(frame).is_err() {
387                    break;
388                }
389            }
390        });
391
392        AdvancedStreamProcessor {
393            output: output_rx,
394            metrics,
395            frame_pool,
396            memory_profiler,
397            worker_handles,
398        }
399    }
400
401    /// Get current memory usage statistics
402    pub fn memory_stats(&self) -> MemoryStats {
403        self.memory_profiler
404            .lock()
405            .expect("Operation failed")
406            .get_stats()
407    }
408
409    /// Reset memory profiler statistics
410    pub fn reset_memory_stats(&self) {
411        self.memory_profiler
412            .lock()
413            .expect("Operation failed")
414            .reset();
415    }
416
417    /// Get current pipeline metrics
418    pub fn metrics(&self) -> PipelineMetrics {
419        self.metrics.lock().expect("Operation failed").clone()
420    }
421}
422
423/// Advanced-high performance stream processor
424pub struct AdvancedStreamProcessor {
425    pub(crate) output: Receiver<Frame>,
426    pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
427    pub(crate) frame_pool: Arc<Mutex<FramePool>>,
428    pub(crate) memory_profiler: Arc<Mutex<MemoryProfiler>>,
429    #[allow(dead_code)]
430    pub(crate) worker_handles: Vec<thread::JoinHandle<()>>,
431}
432
433impl AdvancedStreamProcessor {
434    /// Get next frame with zero-copy optimization
435    pub fn next_zero_copy(&self) -> Option<Frame> {
436        self.output.recv().ok()
437    }
438
439    /// Try to get the next frame without blocking
440    pub fn try_next(&self) -> Option<Frame> {
441        self.output.try_recv().ok()
442    }
443
444    /// Get batch of frames for efficient processing
445    ///
446    /// # Arguments
447    ///
448    /// * `batchsize` - Number of frames to retrieve
449    ///
450    /// # Returns
451    ///
452    /// * Vector of frames
453    pub fn next_batch(&self, batchsize: usize) -> Vec<Frame> {
454        let mut batch = Vec::with_capacity(batchsize);
455
456        for _ in 0..batchsize {
457            if let Some(frame) = self.try_next() {
458                batch.push(frame);
459            } else {
460                break;
461            }
462        }
463
464        batch
465    }
466
467    /// Return frame to memory pool
468    ///
469    /// # Arguments
470    ///
471    /// * `frame` - Frame to return to the pool
472    pub fn return_frame(&self, frame: Frame) {
473        if let Ok(mut pool) = self.frame_pool.lock() {
474            pool.return_frame(frame);
475        }
476    }
477
478    /// Get current metrics
479    pub fn metrics(&self) -> PipelineMetrics {
480        self.metrics.lock().expect("Operation failed").clone()
481    }
482
483    /// Get current memory statistics
484    pub fn memory_stats(&self) -> MemoryStats {
485        self.memory_profiler
486            .lock()
487            .expect("Operation failed")
488            .get_stats()
489    }
490
491    /// Reset memory profiler statistics
492    pub fn reset_memory_stats(&self) {
493        self.memory_profiler
494            .lock()
495            .expect("Operation failed")
496            .reset();
497    }
498}
499
500impl Iterator for AdvancedStreamProcessor {
501    type Item = Frame;
502
503    fn next(&mut self) -> Option<Self::Item> {
504        self.output.recv().ok()
505    }
506}