use crate::{Result, VisionError};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use torsh_tensor::Tensor;
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub buffer_size: usize,
pub target_fps: f32,
pub enable_frame_drop: bool,
pub use_gpu: bool,
pub num_threads: usize,
pub quality_adaptation: QualityAdaptation,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
buffer_size: 30,
target_fps: 30.0,
enable_frame_drop: true,
use_gpu: false,
num_threads: 4,
quality_adaptation: QualityAdaptation::None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QualityAdaptation {
None,
ResolutionScaling,
KeyframeOnly,
Dynamic,
}
#[derive(Debug, Clone)]
pub struct FrameMetadata {
pub frame_number: u64,
pub timestamp: Instant,
pub width: usize,
pub height: usize,
pub is_keyframe: bool,
pub priority: u8,
}
#[derive(Debug, Clone)]
pub struct Frame {
pub data: Tensor,
pub metadata: FrameMetadata,
}
#[derive(Debug, Clone)]
pub struct StreamStats {
pub avg_processing_time: f32,
pub current_fps: f32,
pub dropped_frames: u64,
pub total_frames: u64,
pub buffer_utilization: f32,
pub num_adaptations: u64,
}
impl Default for StreamStats {
fn default() -> Self {
Self {
avg_processing_time: 0.0,
current_fps: 0.0,
dropped_frames: 0,
total_frames: 0,
buffer_utilization: 0.0,
num_adaptations: 0,
}
}
}
pub struct StreamProcessor {
config: StreamConfig,
stats: Arc<Mutex<StreamStats>>,
frame_buffer: Arc<Mutex<VecDeque<Frame>>>,
processing_times: Arc<Mutex<VecDeque<Duration>>>,
}
impl StreamProcessor {
pub fn new(config: StreamConfig) -> Result<Self> {
let buffer_size = config.buffer_size;
Ok(Self {
config,
stats: Arc::new(Mutex::new(StreamStats::default())),
frame_buffer: Arc::new(Mutex::new(VecDeque::with_capacity(buffer_size))),
processing_times: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
})
}
pub fn stats(&self) -> StreamStats {
self.stats.lock().map(|s| s.clone()).unwrap_or_default()
}
pub fn reset_stats(&self) {
if let Ok(mut stats) = self.stats.lock() {
*stats = StreamStats::default();
}
if let Ok(mut times) = self.processing_times.lock() {
times.clear();
}
}
pub fn push_frame(&self, frame: Frame) -> Result<()> {
let mut buffer = self.frame_buffer.lock().map_err(|_| {
VisionError::InvalidParameter("Failed to lock frame buffer".to_string())
})?;
if buffer.len() >= self.config.buffer_size {
if self.config.enable_frame_drop {
let mut dropped = false;
for i in 0..buffer.len() {
if !buffer[i].metadata.is_keyframe {
buffer.remove(i);
dropped = true;
if let Ok(mut stats) = self.stats.lock() {
stats.dropped_frames += 1;
}
break;
}
}
if !dropped {
buffer.pop_front();
if let Ok(mut stats) = self.stats.lock() {
stats.dropped_frames += 1;
}
}
} else {
return Err(VisionError::InvalidParameter(
"Frame buffer full and dropping disabled".to_string(),
));
}
}
buffer.push_back(frame);
if let Ok(mut stats) = self.stats.lock() {
stats.buffer_utilization = buffer.len() as f32 / self.config.buffer_size as f32;
}
Ok(())
}
pub fn pop_frame(&self) -> Result<Option<Frame>> {
let mut buffer = self.frame_buffer.lock().map_err(|_| {
VisionError::InvalidParameter("Failed to lock frame buffer".to_string())
})?;
Ok(buffer.pop_front())
}
pub fn record_processing_time(&self, duration: Duration) {
if let Ok(mut times) = self.processing_times.lock() {
times.push_back(duration);
while times.len() > 100 {
times.pop_front();
}
if let Ok(mut stats) = self.stats.lock() {
let sum: Duration = times.iter().sum();
stats.avg_processing_time = sum.as_secs_f32() * 1000.0 / times.len() as f32;
if stats.avg_processing_time > 0.0 {
stats.current_fps = 1000.0 / stats.avg_processing_time;
}
}
}
}
pub fn process_frame<F, T>(&self, frame: Frame, process_fn: F) -> Result<T>
where
F: FnOnce(&Frame) -> Result<T>,
{
let start = Instant::now();
let adapted_frame = self.adapt_frame_quality(&frame)?;
let result = process_fn(&adapted_frame)?;
let elapsed = start.elapsed();
self.record_processing_time(elapsed);
if let Ok(mut stats) = self.stats.lock() {
stats.total_frames += 1;
}
Ok(result)
}
fn adapt_frame_quality(&self, frame: &Frame) -> Result<Frame> {
match self.config.quality_adaptation {
QualityAdaptation::None => Ok(frame.clone()),
QualityAdaptation::ResolutionScaling => {
let stats = self.stats();
if stats.current_fps < self.config.target_fps * 0.8 {
let _new_width = (frame.metadata.width as f32 * 0.75) as usize;
let _new_height = (frame.metadata.height as f32 * 0.75) as usize;
if let Ok(mut stats) = self.stats.lock() {
stats.num_adaptations += 1;
}
}
Ok(frame.clone())
}
QualityAdaptation::KeyframeOnly => {
let stats = self.stats();
if !frame.metadata.is_keyframe && stats.current_fps < self.config.target_fps * 0.9 {
if let Ok(mut stats_lock) = self.stats.lock() {
stats_lock.num_adaptations += 1;
}
}
Ok(frame.clone())
}
QualityAdaptation::Dynamic => {
let stats = self.stats();
let performance_ratio = stats.current_fps / self.config.target_fps;
if performance_ratio < 0.7 {
if let Ok(mut stats_lock) = self.stats.lock() {
stats_lock.num_adaptations += 1;
}
} else if performance_ratio < 0.9 && !frame.metadata.is_keyframe {
if let Ok(mut stats_lock) = self.stats.lock() {
stats_lock.num_adaptations += 1;
}
}
Ok(frame.clone())
}
}
}
pub fn is_realtime(&self) -> bool {
let stats = self.stats();
stats.current_fps >= self.config.target_fps * 0.95
}
pub fn recommend_config_adjustments(&self) -> Vec<String> {
let stats = self.stats();
let mut recommendations = Vec::new();
if stats.current_fps < self.config.target_fps * 0.8 {
recommendations
.push("Consider reducing target_fps or enabling quality adaptation".to_string());
}
if stats.buffer_utilization > 0.9 {
recommendations.push("Buffer is frequently full - consider increasing buffer_size or enabling frame dropping".to_string());
}
if stats.total_frames > 0 {
let drop_rate = stats.dropped_frames as f32 / stats.total_frames as f32;
if drop_rate > 0.1 {
recommendations.push(format!(
"High frame drop rate ({:.1}%) - consider reducing input rate or optimizing processing",
drop_rate * 100.0
));
}
}
recommendations
}
}
pub struct FramePreprocessor {
pub target_size: Option<(usize, usize)>,
pub normalize_mean: Option<Vec<f32>>,
pub normalize_std: Option<Vec<f32>>,
pub to_grayscale: bool,
}
impl Default for FramePreprocessor {
fn default() -> Self {
Self {
target_size: None,
normalize_mean: None,
normalize_std: None,
to_grayscale: false,
}
}
}
impl FramePreprocessor {
pub fn new() -> Self {
Self::default()
}
pub fn with_resize(mut self, width: usize, height: usize) -> Self {
self.target_size = Some((width, height));
self
}
pub fn with_normalize(mut self, mean: Vec<f32>, std: Vec<f32>) -> Self {
self.normalize_mean = Some(mean);
self.normalize_std = Some(std);
self
}
pub fn with_grayscale(mut self) -> Self {
self.to_grayscale = true;
self
}
pub fn preprocess(&self, frame: &Frame) -> Result<Frame> {
let processed = frame.clone();
Ok(processed)
}
}
pub struct BatchProcessor {
batch_size: usize,
frames: Vec<Frame>,
}
impl BatchProcessor {
pub fn new(batch_size: usize) -> Self {
Self {
batch_size,
frames: Vec::with_capacity(batch_size),
}
}
pub fn add_frame(&mut self, frame: Frame) -> Option<Vec<Frame>> {
self.frames.push(frame);
if self.frames.len() >= self.batch_size {
Some(self.flush())
} else {
None
}
}
pub fn current_batch(&self) -> &[Frame] {
&self.frames
}
pub fn flush(&mut self) -> Vec<Frame> {
std::mem::replace(&mut self.frames, Vec::with_capacity(self.batch_size))
}
pub fn is_full(&self) -> bool {
self.frames.len() >= self.batch_size
}
pub fn len(&self) -> usize {
self.frames.len()
}
pub fn is_empty(&self) -> bool {
self.frames.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_dummy_frame(frame_number: u64) -> Frame {
use torsh_core::DeviceType;
Frame {
data: Tensor::zeros(&[224, 224, 3], DeviceType::Cpu).expect("Failed to create tensor"),
metadata: FrameMetadata {
frame_number,
timestamp: Instant::now(),
width: 224,
height: 224,
is_keyframe: frame_number % 10 == 0,
priority: 1,
},
}
}
#[test]
fn test_stream_config_default() {
let config = StreamConfig::default();
assert_eq!(config.buffer_size, 30);
assert_eq!(config.target_fps, 30.0);
assert!(config.enable_frame_drop);
}
#[test]
fn test_stream_processor_creation() {
let processor = StreamProcessor::new(StreamConfig::default());
assert!(processor.is_ok());
}
#[test]
fn test_push_pop_frame() {
let processor =
StreamProcessor::new(StreamConfig::default()).expect("Failed to create processor");
let frame = create_dummy_frame(1);
processor
.push_frame(frame.clone())
.expect("Failed to push frame");
let popped = processor.pop_frame().expect("Failed to pop frame");
assert!(popped.is_some());
assert_eq!(popped.unwrap().metadata.frame_number, 1);
}
#[test]
fn test_frame_dropping() {
let mut config = StreamConfig::default();
config.buffer_size = 2;
config.enable_frame_drop = true;
let processor = StreamProcessor::new(config).expect("Failed to create processor");
for i in 0..5 {
let frame = create_dummy_frame(i);
processor.push_frame(frame).expect("Failed to push frame");
}
let stats = processor.stats();
assert!(stats.dropped_frames > 0);
}
#[test]
fn test_processing_time_recording() {
let processor =
StreamProcessor::new(StreamConfig::default()).expect("Failed to create processor");
processor.record_processing_time(Duration::from_millis(10));
processor.record_processing_time(Duration::from_millis(20));
let stats = processor.stats();
assert!(stats.avg_processing_time > 0.0);
}
#[test]
fn test_batch_processor() {
let mut batch = BatchProcessor::new(3);
assert!(batch.is_empty());
assert!(!batch.is_full());
batch.add_frame(create_dummy_frame(1));
batch.add_frame(create_dummy_frame(2));
assert_eq!(batch.len(), 2);
assert!(!batch.is_full());
let result = batch.add_frame(create_dummy_frame(3));
assert!(result.is_some());
assert_eq!(result.unwrap().len(), 3);
assert!(batch.is_empty());
}
#[test]
fn test_frame_preprocessor() {
let preprocessor = FramePreprocessor::new()
.with_resize(224, 224)
.with_grayscale();
let frame = create_dummy_frame(1);
let result = preprocessor.preprocess(&frame);
assert!(result.is_ok());
}
#[test]
fn test_quality_adaptation_variants() {
assert_eq!(QualityAdaptation::None, QualityAdaptation::None);
assert_ne!(
QualityAdaptation::None,
QualityAdaptation::ResolutionScaling
);
}
#[test]
fn test_is_realtime() {
let processor =
StreamProcessor::new(StreamConfig::default()).expect("Failed to create processor");
processor.record_processing_time(Duration::from_millis(10));
let is_rt = processor.is_realtime();
assert!(is_rt); }
#[test]
fn test_stream_stats_default() {
let stats = StreamStats::default();
assert_eq!(stats.total_frames, 0);
assert_eq!(stats.dropped_frames, 0);
assert_eq!(stats.current_fps, 0.0);
}
}