use std::sync::Arc;
use std::time::{Duration, Instant};
use lamco_pipewire::VideoFrame;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tracing::{debug, trace, warn};
use crate::converter::{BitmapConverter, BitmapUpdate};
const DEFAULT_QUEUE_SIZE: usize = 30;
const DEFAULT_TARGET_FPS: u32 = 30;
const MAX_FRAME_AGE_MS: u64 = 100;
#[derive(Debug, Clone)]
pub struct ProcessorConfig {
pub target_fps: u32,
pub max_queue_depth: usize,
pub adaptive_quality: bool,
pub damage_threshold: f32,
pub drop_on_full_queue: bool,
pub enable_metrics: bool,
}
impl Default for ProcessorConfig {
fn default() -> Self {
Self {
target_fps: DEFAULT_TARGET_FPS,
max_queue_depth: DEFAULT_QUEUE_SIZE,
adaptive_quality: true,
damage_threshold: 0.05,
drop_on_full_queue: true,
enable_metrics: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ProcessingStats {
pub frames_received: u64,
pub frames_processed: u64,
pub frames_dropped_queue_full: u64,
pub frames_dropped_old: u64,
pub frames_skipped_no_change: u64,
pub total_processing_time_ns: u64,
pub current_queue_depth: usize,
pub peak_queue_depth: usize,
}
impl ProcessingStats {
pub fn avg_processing_time_ms(&self) -> f64 {
if self.frames_processed == 0 {
0.0
} else {
(self.total_processing_time_ns as f64 / self.frames_processed as f64) / 1_000_000.0
}
}
pub fn drop_rate(&self) -> f64 {
if self.frames_received == 0 {
0.0
} else {
let total_drops = self.frames_dropped_queue_full + self.frames_dropped_old;
total_drops as f64 / self.frames_received as f64
}
}
pub fn current_fps(&self) -> f64 {
if self.total_processing_time_ns == 0 {
0.0
} else {
(self.frames_processed as f64 * 1_000_000_000.0) / self.total_processing_time_ns as f64
}
}
}
struct QueuedFrame {
frame: VideoFrame,
enqueue_time: Instant,
}
impl QueuedFrame {
fn new(frame: VideoFrame) -> Self {
Self {
frame,
enqueue_time: Instant::now(),
}
}
fn age(&self) -> Duration {
self.enqueue_time.elapsed()
}
fn is_too_old(&self, max_age_ms: u64) -> bool {
self.age().as_millis() as u64 > max_age_ms
}
}
pub struct FrameProcessor {
config: ProcessorConfig,
converter: Arc<RwLock<BitmapConverter>>,
stats: Arc<RwLock<ProcessingStats>>,
last_frame_time: Arc<RwLock<Option<Instant>>>,
running: Arc<RwLock<bool>>,
}
impl FrameProcessor {
pub fn new(config: ProcessorConfig, width: u16, height: u16) -> Self {
Self {
config,
converter: Arc::new(RwLock::new(BitmapConverter::new(width, height))),
stats: Arc::new(RwLock::new(ProcessingStats::default())),
last_frame_time: Arc::new(RwLock::new(None)),
running: Arc::new(RwLock::new(false)),
}
}
pub async fn start(
self: Arc<Self>,
mut input: mpsc::Receiver<VideoFrame>,
output: mpsc::Sender<BitmapUpdate>,
) -> Result<(), ProcessingError> {
*self.running.write() = true;
debug!("Frame processor started with target {} FPS", self.config.target_fps);
while *self.running.read() {
match input.recv().await {
Some(frame) => {
let queue_depth = input.len();
{
let mut stats = self.stats.write();
stats.current_queue_depth = queue_depth;
if queue_depth > stats.peak_queue_depth {
stats.peak_queue_depth = queue_depth;
}
stats.frames_received += 1;
}
if queue_depth >= self.config.max_queue_depth && self.config.drop_on_full_queue {
warn!(
"Frame queue full ({} frames), dropping frame {}",
queue_depth, frame.frame_id
);
self.stats.write().frames_dropped_queue_full += 1;
continue;
}
let queued_frame = QueuedFrame::new(frame);
if queued_frame.is_too_old(MAX_FRAME_AGE_MS) {
trace!(
"Dropping old frame {} (age: {:?})",
queued_frame.frame.frame_id,
queued_frame.age()
);
self.stats.write().frames_dropped_old += 1;
continue;
}
if !self.should_process_frame(&queued_frame.frame) {
continue;
}
match self.process_frame(queued_frame.frame).await {
Ok(bitmap_update) => {
if bitmap_update.rectangles.is_empty() {
self.stats.write().frames_skipped_no_change += 1;
continue;
}
if let Err(e) = output.send(bitmap_update).await {
warn!("Failed to send bitmap update: {}", e);
break;
}
self.stats.write().frames_processed += 1;
}
Err(e) => {
warn!("Frame processing error: {}", e);
continue;
}
}
}
None => {
debug!("Input channel closed, stopping processor");
break;
}
}
}
*self.running.write() = false;
Ok(())
}
pub fn stop(&self) {
*self.running.write() = false;
}
fn should_process_frame(&self, _frame: &VideoFrame) -> bool {
let mut last_time = self.last_frame_time.write();
if last_time.is_none() {
*last_time = Some(Instant::now());
return true;
}
let last = last_time.unwrap();
let elapsed = last.elapsed();
let min_interval = Duration::from_nanos(1_000_000_000 / self.config.target_fps as u64);
if elapsed >= min_interval {
*last_time = Some(Instant::now());
true
} else {
false
}
}
async fn process_frame(&self, frame: VideoFrame) -> Result<BitmapUpdate, ProcessingError> {
let start_time = Instant::now();
trace!(
"Processing frame {} ({}x{}, format: {:?})",
frame.frame_id,
frame.width,
frame.height,
frame.format
);
if !frame.damage_regions.is_empty() {
let has_damage = frame.has_significant_damage(self.config.damage_threshold);
if !has_damage {
trace!("Frame {} has insignificant damage, skipping", frame.frame_id);
return Ok(BitmapUpdate { rectangles: vec![] });
}
}
let bitmap_update = self
.converter
.write()
.convert_frame(&frame)
.map_err(|e| ProcessingError::ConversionFailed(e.to_string()))?;
let elapsed = start_time.elapsed();
self.stats.write().total_processing_time_ns += elapsed.as_nanos() as u64;
trace!("Frame {} processed in {:?}", frame.frame_id, elapsed);
Ok(bitmap_update)
}
pub fn get_statistics(&self) -> ProcessingStats {
self.stats.read().clone()
}
pub fn reset_statistics(&self) {
let mut stats = self.stats.write();
*stats = ProcessingStats::default();
}
pub fn get_converter_statistics(&self) -> crate::converter::ConversionStats {
self.converter.read().get_statistics()
}
pub fn force_full_update(&self) {
self.converter.write().force_full_update();
}
pub fn is_running(&self) -> bool {
*self.running.read()
}
}
#[derive(Debug, thiserror::Error)]
pub enum ProcessingError {
#[error("Conversion failed: {0}")]
ConversionFailed(String),
#[error("Queue overflow: max depth {0} exceeded")]
QueueOverflow(usize),
#[error("Invalid frame: {0}")]
InvalidFrame(String),
#[error("Channel error: {0}")]
ChannelError(String),
#[error("Processor not running")]
NotRunning,
}
#[cfg(test)]
mod tests {
use lamco_pipewire::PixelFormat;
use super::*;
#[test]
fn test_processor_config() {
let config = ProcessorConfig::default();
assert_eq!(config.target_fps, DEFAULT_TARGET_FPS);
assert_eq!(config.max_queue_depth, DEFAULT_QUEUE_SIZE);
assert!(config.adaptive_quality);
}
#[test]
fn test_processing_stats() {
let mut stats = ProcessingStats::default();
stats.frames_received = 100;
stats.frames_processed = 90;
stats.frames_dropped_queue_full = 5;
stats.frames_dropped_old = 5;
stats.total_processing_time_ns = 500_000_000;
assert_eq!(stats.drop_rate(), 0.1); let avg_time = stats.avg_processing_time_ms();
let expected = 500.0 / 90.0; assert!((avg_time - expected).abs() < 1e-10);
}
#[test]
fn test_queued_frame() {
let frame = VideoFrame::new(1, 1920, 1080, 7680, PixelFormat::BGRA, 0);
let queued = QueuedFrame::new(frame);
assert!(!queued.is_too_old(MAX_FRAME_AGE_MS));
assert!(queued.age() < Duration::from_millis(10));
}
#[tokio::test]
async fn test_processor_creation() {
let config = ProcessorConfig::default();
let processor = Arc::new(FrameProcessor::new(config, 1920, 1080));
assert!(!processor.is_running());
let stats = processor.get_statistics();
assert_eq!(stats.frames_received, 0);
}
#[tokio::test]
async fn test_processor_lifecycle() {
let config = ProcessorConfig::default();
let processor = Arc::new(FrameProcessor::new(config, 1920, 1080));
let (input_tx, input_rx) = mpsc::channel(10);
let (output_tx, _output_rx) = mpsc::channel(10);
let processor_clone = processor.clone();
let handle = tokio::spawn(async move { processor_clone.start(input_rx, output_tx).await });
tokio::time::sleep(Duration::from_millis(10)).await;
processor.stop();
drop(input_tx);
let result = tokio::time::timeout(Duration::from_millis(100), handle).await;
assert!(result.is_ok());
}
}