use std::collections::HashMap;
use std::time::{Duration, Instant};
use crate::error::GraphResult;
use crate::frame::FramePool;
use crate::graph::FilterGraph;
use crate::node::NodeId;
#[allow(dead_code)]
pub struct GraphContext {
graph: FilterGraph,
frame_pool: FramePool,
stats: ProcessingStats,
node_stats: HashMap<NodeId, NodeStats>,
running: bool,
start_time: Option<Instant>,
}
impl GraphContext {
#[must_use]
pub fn new(graph: FilterGraph) -> Self {
let node_stats = graph
.node_ids()
.into_iter()
.map(|id| (id, NodeStats::default()))
.collect();
Self {
graph,
frame_pool: FramePool::default(),
stats: ProcessingStats::default(),
node_stats,
running: false,
start_time: None,
}
}
#[must_use]
pub fn graph(&self) -> &FilterGraph {
&self.graph
}
pub fn graph_mut(&mut self) -> &mut FilterGraph {
&mut self.graph
}
#[must_use]
pub fn frame_pool(&self) -> &FramePool {
&self.frame_pool
}
pub fn frame_pool_mut(&mut self) -> &mut FramePool {
&mut self.frame_pool
}
#[must_use]
pub fn stats(&self) -> &ProcessingStats {
&self.stats
}
#[must_use]
pub fn node_stats(&self, id: NodeId) -> Option<&NodeStats> {
self.node_stats.get(&id)
}
#[must_use]
pub fn is_running(&self) -> bool {
self.running
}
pub fn initialize(&mut self) -> GraphResult<()> {
self.graph.initialize()?;
self.stats = ProcessingStats::default();
for stats in self.node_stats.values_mut() {
*stats = NodeStats::default();
}
self.running = false;
self.start_time = None;
Ok(())
}
pub fn start(&mut self) -> GraphResult<()> {
if !self.running {
self.running = true;
self.start_time = Some(Instant::now());
}
Ok(())
}
pub fn step(&mut self) -> GraphResult<bool> {
if !self.running {
self.start()?;
}
let step_start = Instant::now();
let processed = self.graph.process_step()?;
let step_duration = step_start.elapsed();
if processed {
self.stats.frames_processed += 1;
self.stats.total_processing_time += step_duration;
}
Ok(processed)
}
pub fn stop(&mut self) -> GraphResult<()> {
if self.running {
self.running = false;
if let Some(start) = self.start_time.take() {
self.stats.wall_clock_time = start.elapsed();
}
}
Ok(())
}
pub fn reset(&mut self) -> GraphResult<()> {
self.stop()?;
self.graph.reset()?;
self.frame_pool.clear();
self.stats = ProcessingStats::default();
for stats in self.node_stats.values_mut() {
*stats = NodeStats::default();
}
Ok(())
}
pub fn flush(&mut self) -> GraphResult<()> {
let _ = self.graph.flush()?;
Ok(())
}
pub fn run_to_completion(&mut self) -> GraphResult<()> {
self.initialize()?;
self.start()?;
loop {
if !self.step()? {
break;
}
}
self.flush()?;
self.stop()?;
Ok(())
}
}
#[derive(Clone, Debug, Default)]
pub struct ProcessingStats {
pub frames_processed: u64,
pub total_processing_time: Duration,
pub wall_clock_time: Duration,
pub frames_dropped: u64,
pub errors: u64,
}
impl ProcessingStats {
#[must_use]
pub fn fps(&self) -> f64 {
if self.wall_clock_time.is_zero() {
0.0
} else {
#[allow(clippy::cast_precision_loss)]
let result = self.frames_processed as f64 / self.wall_clock_time.as_secs_f64();
result
}
}
#[must_use]
pub fn avg_frame_time(&self) -> Duration {
if self.frames_processed == 0 {
Duration::ZERO
} else {
self.total_processing_time / self.frames_processed as u32
}
}
#[must_use]
pub fn efficiency(&self) -> f64 {
if self.wall_clock_time.is_zero() {
0.0
} else {
self.total_processing_time.as_secs_f64() / self.wall_clock_time.as_secs_f64()
}
}
}
#[derive(Clone, Debug, Default)]
pub struct NodeStats {
pub frames_processed: u64,
pub processing_time: Duration,
pub frames_dropped: u64,
pub frames_buffered: u64,
}
impl NodeStats {
#[must_use]
pub fn avg_frame_time(&self) -> Duration {
if self.frames_processed == 0 {
Duration::ZERO
} else {
self.processing_time / self.frames_processed as u32
}
}
}
#[allow(dead_code)]
pub struct ThreadPoolConfig {
pub num_threads: usize,
pub stack_size: Option<usize>,
pub name_prefix: String,
}
impl Default for ThreadPoolConfig {
fn default() -> Self {
Self {
num_threads: num_cpus(),
stack_size: None,
name_prefix: "oximedia-worker".to_string(),
}
}
}
impl ThreadPoolConfig {
#[must_use]
pub fn new(num_threads: usize) -> Self {
Self {
num_threads,
..Default::default()
}
}
#[must_use]
pub fn with_stack_size(mut self, size: usize) -> Self {
self.stack_size = Some(size);
self
}
#[must_use]
pub fn with_name_prefix(mut self, prefix: impl Into<String>) -> Self {
self.name_prefix = prefix.into();
self
}
}
fn num_cpus() -> usize {
std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(1)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::filters::video::{NullSink, PassthroughFilter};
use crate::graph::GraphBuilder;
use crate::node::NodeId;
use crate::port::PortId;
fn create_simple_graph() -> FilterGraph {
let source = PassthroughFilter::new_source(NodeId(0), "source");
let sink = NullSink::new(NodeId(0), "sink");
let (builder, source_id) = GraphBuilder::new().add_node(Box::new(source));
let (builder, sink_id) = builder.add_node(Box::new(sink));
builder
.connect(source_id, PortId(0), sink_id, PortId(0))
.expect("operation should succeed")
.build()
.expect("operation should succeed")
}
#[test]
fn test_context_creation() {
let graph = create_simple_graph();
let context = GraphContext::new(graph);
assert!(!context.is_running());
assert_eq!(context.stats().frames_processed, 0);
}
#[test]
fn test_context_initialize() {
let graph = create_simple_graph();
let mut context = GraphContext::new(graph);
context.initialize().expect("initialize should succeed");
assert!(!context.is_running());
}
#[test]
fn test_context_start_stop() {
let graph = create_simple_graph();
let mut context = GraphContext::new(graph);
context.initialize().expect("initialize should succeed");
context.start().expect("start should succeed");
assert!(context.is_running());
context.stop().expect("stop should succeed");
assert!(!context.is_running());
}
#[test]
fn test_context_reset() {
let graph = create_simple_graph();
let mut context = GraphContext::new(graph);
context.initialize().expect("initialize should succeed");
context.start().expect("start should succeed");
context.reset().expect("reset should succeed");
assert!(!context.is_running());
assert_eq!(context.stats().frames_processed, 0);
}
#[test]
fn test_processing_stats() {
let mut stats = ProcessingStats::default();
stats.frames_processed = 100;
stats.wall_clock_time = Duration::from_secs(1);
stats.total_processing_time = Duration::from_millis(500);
assert!((stats.fps() - 100.0).abs() < 0.01);
assert_eq!(stats.avg_frame_time(), Duration::from_millis(5));
assert!((stats.efficiency() - 0.5).abs() < 0.01);
}
#[test]
fn test_thread_pool_config() {
let config = ThreadPoolConfig::new(4)
.with_stack_size(1024 * 1024)
.with_name_prefix("test");
assert_eq!(config.num_threads, 4);
assert_eq!(config.stack_size, Some(1024 * 1024));
assert_eq!(config.name_prefix, "test");
}
#[test]
fn test_num_cpus() {
let cpus = num_cpus();
assert!(cpus >= 1);
}
}