1use crate::{Result, VisionError};
27use std::collections::VecDeque;
28use std::sync::{Arc, Mutex};
29use std::time::{Duration, Instant};
30use torsh_tensor::Tensor;
31
32#[derive(Debug, Clone)]
34pub struct StreamConfig {
35 pub buffer_size: usize,
37 pub target_fps: f32,
39 pub enable_frame_drop: bool,
41 pub use_gpu: bool,
43 pub num_threads: usize,
45 pub quality_adaptation: QualityAdaptation,
47}
48
49impl Default for StreamConfig {
50 fn default() -> Self {
51 Self {
52 buffer_size: 30,
53 target_fps: 30.0,
54 enable_frame_drop: true,
55 use_gpu: false,
56 num_threads: 4,
57 quality_adaptation: QualityAdaptation::None,
58 }
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum QualityAdaptation {
65 None,
67 ResolutionScaling,
69 KeyframeOnly,
71 Dynamic,
73}
74
75#[derive(Debug, Clone)]
77pub struct FrameMetadata {
78 pub frame_number: u64,
80 pub timestamp: Instant,
82 pub width: usize,
84 pub height: usize,
86 pub is_keyframe: bool,
88 pub priority: u8,
90}
91
92#[derive(Debug, Clone)]
94pub struct Frame {
95 pub data: Tensor,
97 pub metadata: FrameMetadata,
99}
100
101#[derive(Debug, Clone)]
103pub struct StreamStats {
104 pub avg_processing_time: f32,
106 pub current_fps: f32,
108 pub dropped_frames: u64,
110 pub total_frames: u64,
112 pub buffer_utilization: f32,
114 pub num_adaptations: u64,
116}
117
118impl Default for StreamStats {
119 fn default() -> Self {
120 Self {
121 avg_processing_time: 0.0,
122 current_fps: 0.0,
123 dropped_frames: 0,
124 total_frames: 0,
125 buffer_utilization: 0.0,
126 num_adaptations: 0,
127 }
128 }
129}
130
131pub struct StreamProcessor {
133 config: StreamConfig,
134 stats: Arc<Mutex<StreamStats>>,
135 frame_buffer: Arc<Mutex<VecDeque<Frame>>>,
136 processing_times: Arc<Mutex<VecDeque<Duration>>>,
137}
138
139impl StreamProcessor {
140 pub fn new(config: StreamConfig) -> Result<Self> {
142 let buffer_size = config.buffer_size;
143 Ok(Self {
144 config,
145 stats: Arc::new(Mutex::new(StreamStats::default())),
146 frame_buffer: Arc::new(Mutex::new(VecDeque::with_capacity(buffer_size))),
147 processing_times: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
148 })
149 }
150
151 pub fn stats(&self) -> StreamStats {
153 self.stats.lock().map(|s| s.clone()).unwrap_or_default()
154 }
155
156 pub fn reset_stats(&self) {
158 if let Ok(mut stats) = self.stats.lock() {
159 *stats = StreamStats::default();
160 }
161 if let Ok(mut times) = self.processing_times.lock() {
162 times.clear();
163 }
164 }
165
166 pub fn push_frame(&self, frame: Frame) -> Result<()> {
168 let mut buffer = self.frame_buffer.lock().map_err(|_| {
169 VisionError::InvalidParameter("Failed to lock frame buffer".to_string())
170 })?;
171
172 if buffer.len() >= self.config.buffer_size {
174 if self.config.enable_frame_drop {
175 let mut dropped = false;
177 for i in 0..buffer.len() {
178 if !buffer[i].metadata.is_keyframe {
179 buffer.remove(i);
180 dropped = true;
181
182 if let Ok(mut stats) = self.stats.lock() {
184 stats.dropped_frames += 1;
185 }
186 break;
187 }
188 }
189
190 if !dropped {
191 buffer.pop_front();
193 if let Ok(mut stats) = self.stats.lock() {
194 stats.dropped_frames += 1;
195 }
196 }
197 } else {
198 return Err(VisionError::InvalidParameter(
199 "Frame buffer full and dropping disabled".to_string(),
200 ));
201 }
202 }
203
204 buffer.push_back(frame);
205
206 if let Ok(mut stats) = self.stats.lock() {
208 stats.buffer_utilization = buffer.len() as f32 / self.config.buffer_size as f32;
209 }
210
211 Ok(())
212 }
213
214 pub fn pop_frame(&self) -> Result<Option<Frame>> {
216 let mut buffer = self.frame_buffer.lock().map_err(|_| {
217 VisionError::InvalidParameter("Failed to lock frame buffer".to_string())
218 })?;
219
220 Ok(buffer.pop_front())
221 }
222
223 pub fn record_processing_time(&self, duration: Duration) {
225 if let Ok(mut times) = self.processing_times.lock() {
226 times.push_back(duration);
227
228 while times.len() > 100 {
230 times.pop_front();
231 }
232
233 if let Ok(mut stats) = self.stats.lock() {
235 let sum: Duration = times.iter().sum();
237 stats.avg_processing_time = sum.as_secs_f32() * 1000.0 / times.len() as f32;
238
239 if stats.avg_processing_time > 0.0 {
241 stats.current_fps = 1000.0 / stats.avg_processing_time;
242 }
243 }
244 }
245 }
246
247 pub fn process_frame<F, T>(&self, frame: Frame, process_fn: F) -> Result<T>
249 where
250 F: FnOnce(&Frame) -> Result<T>,
251 {
252 let start = Instant::now();
253
254 let adapted_frame = self.adapt_frame_quality(&frame)?;
256
257 let result = process_fn(&adapted_frame)?;
259
260 let elapsed = start.elapsed();
262 self.record_processing_time(elapsed);
263
264 if let Ok(mut stats) = self.stats.lock() {
266 stats.total_frames += 1;
267 }
268
269 Ok(result)
270 }
271
272 fn adapt_frame_quality(&self, frame: &Frame) -> Result<Frame> {
274 match self.config.quality_adaptation {
275 QualityAdaptation::None => Ok(frame.clone()),
276
277 QualityAdaptation::ResolutionScaling => {
278 let stats = self.stats();
280 if stats.current_fps < self.config.target_fps * 0.8 {
281 let _new_width = (frame.metadata.width as f32 * 0.75) as usize;
283 let _new_height = (frame.metadata.height as f32 * 0.75) as usize;
284
285 if let Ok(mut stats) = self.stats.lock() {
288 stats.num_adaptations += 1;
289 }
290 }
291 Ok(frame.clone())
292 }
293
294 QualityAdaptation::KeyframeOnly => {
295 let stats = self.stats();
297 if !frame.metadata.is_keyframe && stats.current_fps < self.config.target_fps * 0.9 {
298 if let Ok(mut stats_lock) = self.stats.lock() {
300 stats_lock.num_adaptations += 1;
301 }
302 }
303 Ok(frame.clone())
304 }
305
306 QualityAdaptation::Dynamic => {
307 let stats = self.stats();
309 let performance_ratio = stats.current_fps / self.config.target_fps;
310
311 if performance_ratio < 0.7 {
312 if let Ok(mut stats_lock) = self.stats.lock() {
315 stats_lock.num_adaptations += 1;
316 }
317 } else if performance_ratio < 0.9 && !frame.metadata.is_keyframe {
318 if let Ok(mut stats_lock) = self.stats.lock() {
320 stats_lock.num_adaptations += 1;
321 }
322 }
323
324 Ok(frame.clone())
325 }
326 }
327 }
328
329 pub fn is_realtime(&self) -> bool {
331 let stats = self.stats();
332 stats.current_fps >= self.config.target_fps * 0.95
333 }
334
335 pub fn recommend_config_adjustments(&self) -> Vec<String> {
337 let stats = self.stats();
338 let mut recommendations = Vec::new();
339
340 if stats.current_fps < self.config.target_fps * 0.8 {
342 recommendations
343 .push("Consider reducing target_fps or enabling quality adaptation".to_string());
344 }
345
346 if stats.buffer_utilization > 0.9 {
348 recommendations.push("Buffer is frequently full - consider increasing buffer_size or enabling frame dropping".to_string());
349 }
350
351 if stats.total_frames > 0 {
353 let drop_rate = stats.dropped_frames as f32 / stats.total_frames as f32;
354 if drop_rate > 0.1 {
355 recommendations.push(format!(
356 "High frame drop rate ({:.1}%) - consider reducing input rate or optimizing processing",
357 drop_rate * 100.0
358 ));
359 }
360 }
361
362 recommendations
363 }
364}
365
366pub struct FramePreprocessor {
368 pub target_size: Option<(usize, usize)>,
370 pub normalize_mean: Option<Vec<f32>>,
372 pub normalize_std: Option<Vec<f32>>,
374 pub to_grayscale: bool,
376}
377
378impl Default for FramePreprocessor {
379 fn default() -> Self {
380 Self {
381 target_size: None,
382 normalize_mean: None,
383 normalize_std: None,
384 to_grayscale: false,
385 }
386 }
387}
388
389impl FramePreprocessor {
390 pub fn new() -> Self {
392 Self::default()
393 }
394
395 pub fn with_resize(mut self, width: usize, height: usize) -> Self {
397 self.target_size = Some((width, height));
398 self
399 }
400
401 pub fn with_normalize(mut self, mean: Vec<f32>, std: Vec<f32>) -> Self {
403 self.normalize_mean = Some(mean);
404 self.normalize_std = Some(std);
405 self
406 }
407
408 pub fn with_grayscale(mut self) -> Self {
410 self.to_grayscale = true;
411 self
412 }
413
414 pub fn preprocess(&self, frame: &Frame) -> Result<Frame> {
416 let processed = frame.clone();
417
418 Ok(processed)
424 }
425}
426
427pub struct BatchProcessor {
429 batch_size: usize,
430 frames: Vec<Frame>,
431}
432
433impl BatchProcessor {
434 pub fn new(batch_size: usize) -> Self {
436 Self {
437 batch_size,
438 frames: Vec::with_capacity(batch_size),
439 }
440 }
441
442 pub fn add_frame(&mut self, frame: Frame) -> Option<Vec<Frame>> {
444 self.frames.push(frame);
445
446 if self.frames.len() >= self.batch_size {
447 Some(self.flush())
448 } else {
449 None
450 }
451 }
452
453 pub fn current_batch(&self) -> &[Frame] {
455 &self.frames
456 }
457
458 pub fn flush(&mut self) -> Vec<Frame> {
460 std::mem::replace(&mut self.frames, Vec::with_capacity(self.batch_size))
461 }
462
463 pub fn is_full(&self) -> bool {
465 self.frames.len() >= self.batch_size
466 }
467
468 pub fn len(&self) -> usize {
470 self.frames.len()
471 }
472
473 pub fn is_empty(&self) -> bool {
475 self.frames.is_empty()
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482
483 fn create_dummy_frame(frame_number: u64) -> Frame {
484 use torsh_core::DeviceType;
485 Frame {
486 data: Tensor::zeros(&[224, 224, 3], DeviceType::Cpu).expect("Failed to create tensor"),
487 metadata: FrameMetadata {
488 frame_number,
489 timestamp: Instant::now(),
490 width: 224,
491 height: 224,
492 is_keyframe: frame_number % 10 == 0,
493 priority: 1,
494 },
495 }
496 }
497
498 #[test]
499 fn test_stream_config_default() {
500 let config = StreamConfig::default();
501 assert_eq!(config.buffer_size, 30);
502 assert_eq!(config.target_fps, 30.0);
503 assert!(config.enable_frame_drop);
504 }
505
506 #[test]
507 fn test_stream_processor_creation() {
508 let processor = StreamProcessor::new(StreamConfig::default());
509 assert!(processor.is_ok());
510 }
511
512 #[test]
513 fn test_push_pop_frame() {
514 let processor =
515 StreamProcessor::new(StreamConfig::default()).expect("Failed to create processor");
516
517 let frame = create_dummy_frame(1);
518 processor
519 .push_frame(frame.clone())
520 .expect("Failed to push frame");
521
522 let popped = processor.pop_frame().expect("Failed to pop frame");
523 assert!(popped.is_some());
524 assert_eq!(popped.unwrap().metadata.frame_number, 1);
525 }
526
527 #[test]
528 fn test_frame_dropping() {
529 let mut config = StreamConfig::default();
530 config.buffer_size = 2;
531 config.enable_frame_drop = true;
532
533 let processor = StreamProcessor::new(config).expect("Failed to create processor");
534
535 for i in 0..5 {
537 let frame = create_dummy_frame(i);
538 processor.push_frame(frame).expect("Failed to push frame");
539 }
540
541 let stats = processor.stats();
542 assert!(stats.dropped_frames > 0);
543 }
544
545 #[test]
546 fn test_processing_time_recording() {
547 let processor =
548 StreamProcessor::new(StreamConfig::default()).expect("Failed to create processor");
549
550 processor.record_processing_time(Duration::from_millis(10));
551 processor.record_processing_time(Duration::from_millis(20));
552
553 let stats = processor.stats();
554 assert!(stats.avg_processing_time > 0.0);
555 }
556
557 #[test]
558 fn test_batch_processor() {
559 let mut batch = BatchProcessor::new(3);
560
561 assert!(batch.is_empty());
562 assert!(!batch.is_full());
563
564 batch.add_frame(create_dummy_frame(1));
565 batch.add_frame(create_dummy_frame(2));
566
567 assert_eq!(batch.len(), 2);
568 assert!(!batch.is_full());
569
570 let result = batch.add_frame(create_dummy_frame(3));
571 assert!(result.is_some());
572 assert_eq!(result.unwrap().len(), 3);
573 assert!(batch.is_empty());
574 }
575
576 #[test]
577 fn test_frame_preprocessor() {
578 let preprocessor = FramePreprocessor::new()
579 .with_resize(224, 224)
580 .with_grayscale();
581
582 let frame = create_dummy_frame(1);
583 let result = preprocessor.preprocess(&frame);
584 assert!(result.is_ok());
585 }
586
587 #[test]
588 fn test_quality_adaptation_variants() {
589 assert_eq!(QualityAdaptation::None, QualityAdaptation::None);
590 assert_ne!(
591 QualityAdaptation::None,
592 QualityAdaptation::ResolutionScaling
593 );
594 }
595
596 #[test]
597 fn test_is_realtime() {
598 let processor =
599 StreamProcessor::new(StreamConfig::default()).expect("Failed to create processor");
600
601 processor.record_processing_time(Duration::from_millis(10));
604 let is_rt = processor.is_realtime();
605 assert!(is_rt); }
608
609 #[test]
610 fn test_stream_stats_default() {
611 let stats = StreamStats::default();
612 assert_eq!(stats.total_frames, 0);
613 assert_eq!(stats.dropped_frames, 0);
614 assert_eq!(stats.current_fps, 0.0);
615 }
616}