use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use lamco_pipewire::VideoFrame;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tracing::{debug, error, trace, warn};
const DEFAULT_CHANNEL_SIZE: usize = 30;
const MAX_FRAME_AGE_MS: u64 = 150;
const HIGH_WATER_MARK: f32 = 0.8;
const LOW_WATER_MARK: f32 = 0.5;
#[derive(Debug, Clone)]
pub struct DispatcherConfig {
pub channel_size: usize,
pub priority_dispatch: bool,
pub max_frame_age_ms: u64,
pub enable_backpressure: bool,
pub high_water_mark: f32,
pub low_water_mark: f32,
pub load_balancing: bool,
}
impl Default for DispatcherConfig {
fn default() -> Self {
Self {
channel_size: DEFAULT_CHANNEL_SIZE,
priority_dispatch: true,
max_frame_age_ms: MAX_FRAME_AGE_MS,
enable_backpressure: true,
high_water_mark: HIGH_WATER_MARK,
low_water_mark: LOW_WATER_MARK,
load_balancing: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct DispatcherStats {
pub frames_received: u64,
pub frames_dispatched: u64,
pub frames_dropped_age: u64,
pub frames_dropped_backpressure: u64,
pub active_streams: usize,
pub total_dispatch_time_ns: u64,
pub backpressure_active: bool,
}
impl DispatcherStats {
pub fn avg_dispatch_time_us(&self) -> f64 {
if self.frames_dispatched == 0 {
0.0
} else {
(self.total_dispatch_time_ns as f64 / self.frames_dispatched as f64) / 1_000.0
}
}
pub fn drop_rate(&self) -> f64 {
if self.frames_received == 0 {
0.0
} else {
let total_drops = self.frames_dropped_age + self.frames_dropped_backpressure;
total_drops as f64 / self.frames_received as f64
}
}
pub fn dispatch_rate(&self) -> f64 {
if self.frames_received == 0 {
0.0
} else {
self.frames_dispatched as f64 / self.frames_received as f64
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum StreamPriority {
Low = 0,
Normal = 1,
High = 2,
}
struct DispatchFrame {
frame: VideoFrame,
priority: StreamPriority,
enqueue_time: Instant,
}
impl DispatchFrame {
fn new(frame: VideoFrame, priority: StreamPriority) -> Self {
Self {
frame,
priority,
enqueue_time: Instant::now(),
}
}
fn age(&self) -> Duration {
self.enqueue_time.elapsed()
}
fn is_too_old(&self, max_age_ms: u64) -> bool {
self.age().as_millis() as u64 > max_age_ms
}
}
struct StreamState {
priority: StreamPriority,
frame_count: u64,
last_frame_time: Option<Instant>,
backpressure_active: bool,
}
impl StreamState {
fn new(priority: StreamPriority) -> Self {
Self {
priority,
frame_count: 0,
last_frame_time: None,
backpressure_active: false,
}
}
fn update_frame_received(&mut self) {
self.frame_count += 1;
self.last_frame_time = Some(Instant::now());
}
}
pub struct FrameDispatcher {
config: DispatcherConfig,
streams: Arc<RwLock<HashMap<u32, StreamState>>>,
priority_queue: Arc<RwLock<VecDeque<DispatchFrame>>>,
stats: Arc<RwLock<DispatcherStats>>,
running: Arc<RwLock<bool>>,
}
impl FrameDispatcher {
pub fn new(config: DispatcherConfig) -> Self {
Self {
config,
streams: Arc::new(RwLock::new(HashMap::new())),
priority_queue: Arc::new(RwLock::new(VecDeque::new())),
stats: Arc::new(RwLock::new(DispatcherStats::default())),
running: Arc::new(RwLock::new(false)),
}
}
pub fn register_stream(&self, stream_id: u32, priority: StreamPriority) {
self.streams.write().insert(stream_id, StreamState::new(priority));
debug!("Registered stream {} with priority {:?}", stream_id, priority);
}
pub fn unregister_stream(&self, stream_id: u32) {
self.streams.write().remove(&stream_id);
debug!("Unregistered stream {}", stream_id);
}
pub async fn start(
self: Arc<Self>,
mut input: mpsc::Receiver<VideoFrame>,
output: mpsc::Sender<VideoFrame>,
) -> Result<(), DispatchError> {
*self.running.write() = true;
debug!("Frame dispatcher started");
while *self.running.read() {
match input.recv().await {
Some(frame) => {
self.handle_incoming_frame(frame).await;
}
None => {
debug!("Input channel closed, stopping dispatcher");
break;
}
}
self.dispatch_frames(&output).await?;
}
*self.running.write() = false;
Ok(())
}
pub fn stop(&self) {
*self.running.write() = false;
}
async fn handle_incoming_frame(&self, frame: VideoFrame) {
let start_time = Instant::now();
self.stats.write().frames_received += 1;
let stream_id = frame.monitor_index;
let priority = {
let mut streams = self.streams.write();
let state = streams
.entry(stream_id)
.or_insert_with(|| StreamState::new(StreamPriority::Normal));
state.update_frame_received();
if self.config.enable_backpressure {
let queue = self.priority_queue.read();
let queue_usage = queue.len() as f32 / self.config.channel_size as f32;
if !state.backpressure_active && queue_usage >= self.config.high_water_mark {
state.backpressure_active = true;
self.stats.write().backpressure_active = true;
warn!(
"Backpressure activated for stream {} (queue usage: {:.1}%)",
stream_id,
queue_usage * 100.0
);
} else if state.backpressure_active && queue_usage <= self.config.low_water_mark {
state.backpressure_active = false;
self.stats.write().backpressure_active = false;
debug!(
"Backpressure released for stream {} (queue usage: {:.1}%)",
stream_id,
queue_usage * 100.0
);
}
if state.backpressure_active {
trace!(
"Dropping frame {} from stream {} due to backpressure",
frame.frame_id,
stream_id
);
self.stats.write().frames_dropped_backpressure += 1;
return;
}
}
state.priority
};
let dispatch_frame = DispatchFrame::new(frame, priority);
self.enqueue_frame(dispatch_frame);
let elapsed = start_time.elapsed();
self.stats.write().total_dispatch_time_ns += elapsed.as_nanos() as u64;
}
fn enqueue_frame(&self, frame: DispatchFrame) {
let mut queue = self.priority_queue.write();
if self.config.priority_dispatch {
let mut insert_idx = queue.len();
for (idx, queued) in queue.iter().enumerate() {
if frame.priority > queued.priority {
insert_idx = idx;
break;
}
}
queue.insert(insert_idx, frame);
} else {
queue.push_back(frame);
}
let active_streams = self.streams.read().len();
self.stats.write().active_streams = active_streams;
}
async fn dispatch_frames(&self, output: &mpsc::Sender<VideoFrame>) -> Result<(), DispatchError> {
let mut queue = self.priority_queue.write();
while let Some(dispatch_frame) = queue.pop_front() {
if dispatch_frame.is_too_old(self.config.max_frame_age_ms) {
trace!(
"Dropping old frame {} (age: {:?})",
dispatch_frame.frame.frame_id,
dispatch_frame.age()
);
self.stats.write().frames_dropped_age += 1;
continue;
}
match output.try_send(dispatch_frame.frame.clone()) {
Ok(_) => {
trace!(
"Dispatched frame {} with priority {:?}",
dispatch_frame.frame.frame_id,
dispatch_frame.priority
);
self.stats.write().frames_dispatched += 1;
}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!("Output channel full, requeueing frame");
queue.push_front(dispatch_frame);
break;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
error!("Output channel closed");
return Err(DispatchError::ChannelClosed);
}
}
}
Ok(())
}
pub fn get_statistics(&self) -> DispatcherStats {
self.stats.read().clone()
}
pub fn reset_statistics(&self) {
let mut stats = self.stats.write();
*stats = DispatcherStats::default();
}
pub fn is_running(&self) -> bool {
*self.running.read()
}
pub fn active_stream_count(&self) -> usize {
self.streams.read().len()
}
pub fn queue_depth(&self) -> usize {
self.priority_queue.read().len()
}
}
#[derive(Debug, thiserror::Error)]
pub enum DispatchError {
#[error("Channel closed")]
ChannelClosed,
#[error("Stream {0} not found")]
StreamNotFound(u32),
#[error("Queue overflow: {0} frames")]
QueueOverflow(usize),
#[error("Dispatcher not running")]
NotRunning,
#[error("Invalid priority: {0}")]
InvalidPriority(String),
}
#[cfg(test)]
mod tests {
use lamco_pipewire::PixelFormat;
use super::*;
#[test]
fn test_dispatcher_config() {
let config = DispatcherConfig::default();
assert_eq!(config.channel_size, DEFAULT_CHANNEL_SIZE);
assert!(config.priority_dispatch);
assert!(config.enable_backpressure);
}
#[test]
fn test_dispatcher_stats() {
let mut stats = DispatcherStats::default();
stats.frames_received = 100;
stats.frames_dispatched = 90;
stats.frames_dropped_age = 5;
stats.frames_dropped_backpressure = 5;
assert_eq!(stats.drop_rate(), 0.1);
assert_eq!(stats.dispatch_rate(), 0.9);
}
#[test]
fn test_stream_priority() {
assert!(StreamPriority::High > StreamPriority::Normal);
assert!(StreamPriority::Normal > StreamPriority::Low);
}
#[test]
fn test_dispatch_frame() {
let frame = VideoFrame::new(1, 1920, 1080, 7680, PixelFormat::BGRA, 0);
let dispatch = DispatchFrame::new(frame, StreamPriority::High);
assert_eq!(dispatch.priority, StreamPriority::High);
assert!(!dispatch.is_too_old(MAX_FRAME_AGE_MS));
}
#[test]
fn test_dispatcher_creation() {
let config = DispatcherConfig::default();
let dispatcher = FrameDispatcher::new(config);
assert!(!dispatcher.is_running());
assert_eq!(dispatcher.active_stream_count(), 0);
assert_eq!(dispatcher.queue_depth(), 0);
}
#[test]
fn test_stream_registration() {
let config = DispatcherConfig::default();
let dispatcher = FrameDispatcher::new(config);
dispatcher.register_stream(0, StreamPriority::High);
assert_eq!(dispatcher.active_stream_count(), 1);
dispatcher.register_stream(1, StreamPriority::Normal);
assert_eq!(dispatcher.active_stream_count(), 2);
dispatcher.unregister_stream(0);
assert_eq!(dispatcher.active_stream_count(), 1);
}
#[tokio::test]
async fn test_dispatcher_lifecycle() {
let config = DispatcherConfig::default();
let dispatcher = Arc::new(FrameDispatcher::new(config));
let (input_tx, input_rx) = mpsc::channel(10);
let (output_tx, _output_rx) = mpsc::channel(10);
let dispatcher_clone = dispatcher.clone();
let handle = tokio::spawn(async move { dispatcher_clone.start(input_rx, output_tx).await });
tokio::time::sleep(Duration::from_millis(10)).await;
dispatcher.stop();
drop(input_tx);
let result = tokio::time::timeout(Duration::from_millis(100), handle).await;
assert!(result.is_ok());
}
}