oximedia_graph/
context.rs1use std::collections::HashMap;
6use std::time::{Duration, Instant};
7
8use crate::error::GraphResult;
9use crate::frame::FramePool;
10use crate::graph::FilterGraph;
11use crate::node::NodeId;
12
13#[allow(dead_code)]
15pub struct GraphContext {
16 graph: FilterGraph,
18 frame_pool: FramePool,
20 stats: ProcessingStats,
22 node_stats: HashMap<NodeId, NodeStats>,
24 running: bool,
26 start_time: Option<Instant>,
28}
29
30impl GraphContext {
31 #[must_use]
33 pub fn new(graph: FilterGraph) -> Self {
34 let node_stats = graph
35 .node_ids()
36 .into_iter()
37 .map(|id| (id, NodeStats::default()))
38 .collect();
39
40 Self {
41 graph,
42 frame_pool: FramePool::default(),
43 stats: ProcessingStats::default(),
44 node_stats,
45 running: false,
46 start_time: None,
47 }
48 }
49
50 #[must_use]
52 pub fn graph(&self) -> &FilterGraph {
53 &self.graph
54 }
55
56 pub fn graph_mut(&mut self) -> &mut FilterGraph {
58 &mut self.graph
59 }
60
61 #[must_use]
63 pub fn frame_pool(&self) -> &FramePool {
64 &self.frame_pool
65 }
66
67 pub fn frame_pool_mut(&mut self) -> &mut FramePool {
69 &mut self.frame_pool
70 }
71
72 #[must_use]
74 pub fn stats(&self) -> &ProcessingStats {
75 &self.stats
76 }
77
78 #[must_use]
80 pub fn node_stats(&self, id: NodeId) -> Option<&NodeStats> {
81 self.node_stats.get(&id)
82 }
83
84 #[must_use]
86 pub fn is_running(&self) -> bool {
87 self.running
88 }
89
90 pub fn initialize(&mut self) -> GraphResult<()> {
92 self.graph.initialize()?;
93 self.stats = ProcessingStats::default();
94 for stats in self.node_stats.values_mut() {
95 *stats = NodeStats::default();
96 }
97 self.running = false;
98 self.start_time = None;
99 Ok(())
100 }
101
102 pub fn start(&mut self) -> GraphResult<()> {
104 if !self.running {
105 self.running = true;
106 self.start_time = Some(Instant::now());
107 }
108 Ok(())
109 }
110
111 pub fn step(&mut self) -> GraphResult<bool> {
113 if !self.running {
114 self.start()?;
115 }
116
117 let step_start = Instant::now();
118 let processed = self.graph.process_step()?;
119 let step_duration = step_start.elapsed();
120
121 if processed {
122 self.stats.frames_processed += 1;
123 self.stats.total_processing_time += step_duration;
124 }
125
126 Ok(processed)
127 }
128
129 pub fn stop(&mut self) -> GraphResult<()> {
131 if self.running {
132 self.running = false;
133 if let Some(start) = self.start_time.take() {
134 self.stats.wall_clock_time = start.elapsed();
135 }
136 }
137 Ok(())
138 }
139
140 pub fn reset(&mut self) -> GraphResult<()> {
142 self.stop()?;
143 self.graph.reset()?;
144 self.frame_pool.clear();
145 self.stats = ProcessingStats::default();
146 for stats in self.node_stats.values_mut() {
147 *stats = NodeStats::default();
148 }
149 Ok(())
150 }
151
152 pub fn flush(&mut self) -> GraphResult<()> {
154 let _ = self.graph.flush()?;
155 Ok(())
156 }
157
158 pub fn run_to_completion(&mut self) -> GraphResult<()> {
160 self.initialize()?;
161 self.start()?;
162
163 loop {
164 if !self.step()? {
165 break;
166 }
167 }
168
169 self.flush()?;
170 self.stop()?;
171 Ok(())
172 }
173}
174
175#[derive(Clone, Debug, Default)]
177pub struct ProcessingStats {
178 pub frames_processed: u64,
180 pub total_processing_time: Duration,
182 pub wall_clock_time: Duration,
184 pub frames_dropped: u64,
186 pub errors: u64,
188}
189
190impl ProcessingStats {
191 #[must_use]
193 pub fn fps(&self) -> f64 {
194 if self.wall_clock_time.is_zero() {
195 0.0
196 } else {
197 #[allow(clippy::cast_precision_loss)]
198 let result = self.frames_processed as f64 / self.wall_clock_time.as_secs_f64();
199 result
200 }
201 }
202
203 #[must_use]
205 pub fn avg_frame_time(&self) -> Duration {
206 if self.frames_processed == 0 {
207 Duration::ZERO
208 } else {
209 self.total_processing_time / self.frames_processed as u32
210 }
211 }
212
213 #[must_use]
215 pub fn efficiency(&self) -> f64 {
216 if self.wall_clock_time.is_zero() {
217 0.0
218 } else {
219 self.total_processing_time.as_secs_f64() / self.wall_clock_time.as_secs_f64()
220 }
221 }
222}
223
224#[derive(Clone, Debug, Default)]
226pub struct NodeStats {
227 pub frames_processed: u64,
229 pub processing_time: Duration,
231 pub frames_dropped: u64,
233 pub frames_buffered: u64,
235}
236
237impl NodeStats {
238 #[must_use]
240 pub fn avg_frame_time(&self) -> Duration {
241 if self.frames_processed == 0 {
242 Duration::ZERO
243 } else {
244 self.processing_time / self.frames_processed as u32
245 }
246 }
247}
248
249#[allow(dead_code)]
254pub struct ThreadPoolConfig {
255 pub num_threads: usize,
257 pub stack_size: Option<usize>,
259 pub name_prefix: String,
261}
262
263impl Default for ThreadPoolConfig {
264 fn default() -> Self {
265 Self {
266 num_threads: num_cpus(),
267 stack_size: None,
268 name_prefix: "oximedia-worker".to_string(),
269 }
270 }
271}
272
273impl ThreadPoolConfig {
274 #[must_use]
276 pub fn new(num_threads: usize) -> Self {
277 Self {
278 num_threads,
279 ..Default::default()
280 }
281 }
282
283 #[must_use]
285 pub fn with_stack_size(mut self, size: usize) -> Self {
286 self.stack_size = Some(size);
287 self
288 }
289
290 #[must_use]
292 pub fn with_name_prefix(mut self, prefix: impl Into<String>) -> Self {
293 self.name_prefix = prefix.into();
294 self
295 }
296}
297
298fn num_cpus() -> usize {
300 std::thread::available_parallelism()
301 .map(|p| p.get())
302 .unwrap_or(1)
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crate::filters::video::{NullSink, PassthroughFilter};
309 use crate::graph::GraphBuilder;
310 use crate::node::NodeId;
311 use crate::port::PortId;
312
313 fn create_simple_graph() -> FilterGraph {
314 let source = PassthroughFilter::new_source(NodeId(0), "source");
315 let sink = NullSink::new(NodeId(0), "sink");
316
317 let (builder, source_id) = GraphBuilder::new().add_node(Box::new(source));
318 let (builder, sink_id) = builder.add_node(Box::new(sink));
319
320 builder
321 .connect(source_id, PortId(0), sink_id, PortId(0))
322 .expect("operation should succeed")
323 .build()
324 .expect("operation should succeed")
325 }
326
327 #[test]
328 fn test_context_creation() {
329 let graph = create_simple_graph();
330 let context = GraphContext::new(graph);
331
332 assert!(!context.is_running());
333 assert_eq!(context.stats().frames_processed, 0);
334 }
335
336 #[test]
337 fn test_context_initialize() {
338 let graph = create_simple_graph();
339 let mut context = GraphContext::new(graph);
340
341 context.initialize().expect("initialize should succeed");
342 assert!(!context.is_running());
343 }
344
345 #[test]
346 fn test_context_start_stop() {
347 let graph = create_simple_graph();
348 let mut context = GraphContext::new(graph);
349
350 context.initialize().expect("initialize should succeed");
351 context.start().expect("start should succeed");
352 assert!(context.is_running());
353
354 context.stop().expect("stop should succeed");
355 assert!(!context.is_running());
356 }
357
358 #[test]
359 fn test_context_reset() {
360 let graph = create_simple_graph();
361 let mut context = GraphContext::new(graph);
362
363 context.initialize().expect("initialize should succeed");
364 context.start().expect("start should succeed");
365 context.reset().expect("reset should succeed");
366
367 assert!(!context.is_running());
368 assert_eq!(context.stats().frames_processed, 0);
369 }
370
371 #[test]
372 fn test_processing_stats() {
373 let mut stats = ProcessingStats::default();
374 stats.frames_processed = 100;
375 stats.wall_clock_time = Duration::from_secs(1);
376 stats.total_processing_time = Duration::from_millis(500);
377
378 assert!((stats.fps() - 100.0).abs() < 0.01);
379 assert_eq!(stats.avg_frame_time(), Duration::from_millis(5));
380 assert!((stats.efficiency() - 0.5).abs() < 0.01);
381 }
382
383 #[test]
384 fn test_thread_pool_config() {
385 let config = ThreadPoolConfig::new(4)
386 .with_stack_size(1024 * 1024)
387 .with_name_prefix("test");
388
389 assert_eq!(config.num_threads, 4);
390 assert_eq!(config.stack_size, Some(1024 * 1024));
391 assert_eq!(config.name_prefix, "test");
392 }
393
394 #[test]
395 fn test_num_cpus() {
396 let cpus = num_cpus();
397 assert!(cpus >= 1);
398 }
399}