Skip to main content

oximedia_graph/
context.rs

1//! Execution context for filter graphs.
2//!
3//! The context provides runtime state and statistics for graph execution.
4
5use std::collections::HashMap;
6use std::time::{Duration, Instant};
7
8use crate::error::GraphResult;
9use crate::frame::FramePool;
10use crate::graph::FilterGraph;
11use crate::node::NodeId;
12
13/// Execution context for running filter graphs.
14#[allow(dead_code)]
15pub struct GraphContext {
16    /// The filter graph being executed.
17    graph: FilterGraph,
18    /// Frame pool for allocation reuse.
19    frame_pool: FramePool,
20    /// Processing statistics.
21    stats: ProcessingStats,
22    /// Per-node statistics.
23    node_stats: HashMap<NodeId, NodeStats>,
24    /// Whether the context is currently running.
25    running: bool,
26    /// Start time of processing.
27    start_time: Option<Instant>,
28}
29
30impl GraphContext {
31    /// Create a new execution context for a graph.
32    #[must_use]
33    pub fn new(graph: FilterGraph) -> Self {
34        let node_stats = graph
35            .node_ids()
36            .into_iter()
37            .map(|id| (id, NodeStats::default()))
38            .collect();
39
40        Self {
41            graph,
42            frame_pool: FramePool::default(),
43            stats: ProcessingStats::default(),
44            node_stats,
45            running: false,
46            start_time: None,
47        }
48    }
49
50    /// Get a reference to the underlying graph.
51    #[must_use]
52    pub fn graph(&self) -> &FilterGraph {
53        &self.graph
54    }
55
56    /// Get a mutable reference to the underlying graph.
57    pub fn graph_mut(&mut self) -> &mut FilterGraph {
58        &mut self.graph
59    }
60
61    /// Get the frame pool.
62    #[must_use]
63    pub fn frame_pool(&self) -> &FramePool {
64        &self.frame_pool
65    }
66
67    /// Get mutable access to the frame pool.
68    pub fn frame_pool_mut(&mut self) -> &mut FramePool {
69        &mut self.frame_pool
70    }
71
72    /// Get processing statistics.
73    #[must_use]
74    pub fn stats(&self) -> &ProcessingStats {
75        &self.stats
76    }
77
78    /// Get statistics for a specific node.
79    #[must_use]
80    pub fn node_stats(&self, id: NodeId) -> Option<&NodeStats> {
81        self.node_stats.get(&id)
82    }
83
84    /// Check if the context is currently running.
85    #[must_use]
86    pub fn is_running(&self) -> bool {
87        self.running
88    }
89
90    /// Initialize the context for processing.
91    pub fn initialize(&mut self) -> GraphResult<()> {
92        self.graph.initialize()?;
93        self.stats = ProcessingStats::default();
94        for stats in self.node_stats.values_mut() {
95            *stats = NodeStats::default();
96        }
97        self.running = false;
98        self.start_time = None;
99        Ok(())
100    }
101
102    /// Start processing.
103    pub fn start(&mut self) -> GraphResult<()> {
104        if !self.running {
105            self.running = true;
106            self.start_time = Some(Instant::now());
107        }
108        Ok(())
109    }
110
111    /// Process one step.
112    pub fn step(&mut self) -> GraphResult<bool> {
113        if !self.running {
114            self.start()?;
115        }
116
117        let step_start = Instant::now();
118        let processed = self.graph.process_step()?;
119        let step_duration = step_start.elapsed();
120
121        if processed {
122            self.stats.frames_processed += 1;
123            self.stats.total_processing_time += step_duration;
124        }
125
126        Ok(processed)
127    }
128
129    /// Stop processing.
130    pub fn stop(&mut self) -> GraphResult<()> {
131        if self.running {
132            self.running = false;
133            if let Some(start) = self.start_time.take() {
134                self.stats.wall_clock_time = start.elapsed();
135            }
136        }
137        Ok(())
138    }
139
140    /// Reset the context.
141    pub fn reset(&mut self) -> GraphResult<()> {
142        self.stop()?;
143        self.graph.reset()?;
144        self.frame_pool.clear();
145        self.stats = ProcessingStats::default();
146        for stats in self.node_stats.values_mut() {
147            *stats = NodeStats::default();
148        }
149        Ok(())
150    }
151
152    /// Flush all pending frames.
153    pub fn flush(&mut self) -> GraphResult<()> {
154        let _ = self.graph.flush()?;
155        Ok(())
156    }
157
158    /// Run the graph until completion or error.
159    pub fn run_to_completion(&mut self) -> GraphResult<()> {
160        self.initialize()?;
161        self.start()?;
162
163        loop {
164            if !self.step()? {
165                break;
166            }
167        }
168
169        self.flush()?;
170        self.stop()?;
171        Ok(())
172    }
173}
174
175/// Statistics for graph processing.
176#[derive(Clone, Debug, Default)]
177pub struct ProcessingStats {
178    /// Total frames processed.
179    pub frames_processed: u64,
180    /// Total processing time (not wall clock).
181    pub total_processing_time: Duration,
182    /// Wall clock time from start to finish.
183    pub wall_clock_time: Duration,
184    /// Number of dropped frames.
185    pub frames_dropped: u64,
186    /// Number of errors encountered.
187    pub errors: u64,
188}
189
190impl ProcessingStats {
191    /// Get frames per second based on wall clock time.
192    #[must_use]
193    pub fn fps(&self) -> f64 {
194        if self.wall_clock_time.is_zero() {
195            0.0
196        } else {
197            #[allow(clippy::cast_precision_loss)]
198            let result = self.frames_processed as f64 / self.wall_clock_time.as_secs_f64();
199            result
200        }
201    }
202
203    /// Get average processing time per frame.
204    #[must_use]
205    pub fn avg_frame_time(&self) -> Duration {
206        if self.frames_processed == 0 {
207            Duration::ZERO
208        } else {
209            self.total_processing_time / self.frames_processed as u32
210        }
211    }
212
213    /// Get processing efficiency (processing time / wall clock time).
214    #[must_use]
215    pub fn efficiency(&self) -> f64 {
216        if self.wall_clock_time.is_zero() {
217            0.0
218        } else {
219            self.total_processing_time.as_secs_f64() / self.wall_clock_time.as_secs_f64()
220        }
221    }
222}
223
224/// Statistics for individual node processing.
225#[derive(Clone, Debug, Default)]
226pub struct NodeStats {
227    /// Frames processed by this node.
228    pub frames_processed: u64,
229    /// Total processing time for this node.
230    pub processing_time: Duration,
231    /// Frames dropped by this node.
232    pub frames_dropped: u64,
233    /// Frames buffered in this node.
234    pub frames_buffered: u64,
235}
236
237impl NodeStats {
238    /// Get average processing time per frame.
239    #[must_use]
240    pub fn avg_frame_time(&self) -> Duration {
241        if self.frames_processed == 0 {
242            Duration::ZERO
243        } else {
244            self.processing_time / self.frames_processed as u32
245        }
246    }
247}
248
249/// Placeholder for thread pool integration.
250///
251/// This will be expanded to support parallel processing of independent
252/// branches in the filter graph.
253#[allow(dead_code)]
254pub struct ThreadPoolConfig {
255    /// Number of worker threads.
256    pub num_threads: usize,
257    /// Stack size per thread in bytes.
258    pub stack_size: Option<usize>,
259    /// Thread name prefix.
260    pub name_prefix: String,
261}
262
263impl Default for ThreadPoolConfig {
264    fn default() -> Self {
265        Self {
266            num_threads: num_cpus(),
267            stack_size: None,
268            name_prefix: "oximedia-worker".to_string(),
269        }
270    }
271}
272
273impl ThreadPoolConfig {
274    /// Create a new thread pool configuration.
275    #[must_use]
276    pub fn new(num_threads: usize) -> Self {
277        Self {
278            num_threads,
279            ..Default::default()
280        }
281    }
282
283    /// Set the stack size.
284    #[must_use]
285    pub fn with_stack_size(mut self, size: usize) -> Self {
286        self.stack_size = Some(size);
287        self
288    }
289
290    /// Set the name prefix.
291    #[must_use]
292    pub fn with_name_prefix(mut self, prefix: impl Into<String>) -> Self {
293        self.name_prefix = prefix.into();
294        self
295    }
296}
297
298/// Get the number of available CPUs.
299fn num_cpus() -> usize {
300    std::thread::available_parallelism()
301        .map(|p| p.get())
302        .unwrap_or(1)
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crate::filters::video::{NullSink, PassthroughFilter};
309    use crate::graph::GraphBuilder;
310    use crate::node::NodeId;
311    use crate::port::PortId;
312
313    fn create_simple_graph() -> FilterGraph {
314        let source = PassthroughFilter::new_source(NodeId(0), "source");
315        let sink = NullSink::new(NodeId(0), "sink");
316
317        let (builder, source_id) = GraphBuilder::new().add_node(Box::new(source));
318        let (builder, sink_id) = builder.add_node(Box::new(sink));
319
320        builder
321            .connect(source_id, PortId(0), sink_id, PortId(0))
322            .expect("operation should succeed")
323            .build()
324            .expect("operation should succeed")
325    }
326
327    #[test]
328    fn test_context_creation() {
329        let graph = create_simple_graph();
330        let context = GraphContext::new(graph);
331
332        assert!(!context.is_running());
333        assert_eq!(context.stats().frames_processed, 0);
334    }
335
336    #[test]
337    fn test_context_initialize() {
338        let graph = create_simple_graph();
339        let mut context = GraphContext::new(graph);
340
341        context.initialize().expect("initialize should succeed");
342        assert!(!context.is_running());
343    }
344
345    #[test]
346    fn test_context_start_stop() {
347        let graph = create_simple_graph();
348        let mut context = GraphContext::new(graph);
349
350        context.initialize().expect("initialize should succeed");
351        context.start().expect("start should succeed");
352        assert!(context.is_running());
353
354        context.stop().expect("stop should succeed");
355        assert!(!context.is_running());
356    }
357
358    #[test]
359    fn test_context_reset() {
360        let graph = create_simple_graph();
361        let mut context = GraphContext::new(graph);
362
363        context.initialize().expect("initialize should succeed");
364        context.start().expect("start should succeed");
365        context.reset().expect("reset should succeed");
366
367        assert!(!context.is_running());
368        assert_eq!(context.stats().frames_processed, 0);
369    }
370
371    #[test]
372    fn test_processing_stats() {
373        let mut stats = ProcessingStats::default();
374        stats.frames_processed = 100;
375        stats.wall_clock_time = Duration::from_secs(1);
376        stats.total_processing_time = Duration::from_millis(500);
377
378        assert!((stats.fps() - 100.0).abs() < 0.01);
379        assert_eq!(stats.avg_frame_time(), Duration::from_millis(5));
380        assert!((stats.efficiency() - 0.5).abs() < 0.01);
381    }
382
383    #[test]
384    fn test_thread_pool_config() {
385        let config = ThreadPoolConfig::new(4)
386            .with_stack_size(1024 * 1024)
387            .with_name_prefix("test");
388
389        assert_eq!(config.num_threads, 4);
390        assert_eq!(config.stack_size, Some(1024 * 1024));
391        assert_eq!(config.name_prefix, "test");
392    }
393
394    #[test]
395    fn test_num_cpus() {
396        let cpus = num_cpus();
397        assert!(cpus >= 1);
398    }
399}