Skip to main content

oximedia_graph/
node.rs

1//! Node types for the filter graph.
2//!
3//! Nodes are the processing units in a filter graph. Each node has input and output
4//! ports and implements the [`Node`] trait to process frames.
5
6use std::collections::HashMap;
7use std::fmt;
8
9use crate::error::{GraphError, GraphResult};
10use crate::frame::FilterFrame;
11use crate::port::{InputPort, OutputPort, PortId, PortType};
12
13/// Unique identifier for a node in the graph.
14#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
15pub struct NodeId(pub u64);
16
17impl fmt::Display for NodeId {
18    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19        write!(f, "Node({})", self.0)
20    }
21}
22
23/// Type of node in the filter graph.
24#[derive(Clone, Copy, Debug, PartialEq, Eq)]
25pub enum NodeType {
26    /// Source node that produces frames (e.g., decoder).
27    Source,
28    /// Filter node that transforms frames.
29    Filter,
30    /// Sink node that consumes frames (e.g., encoder, display).
31    Sink,
32}
33
34impl fmt::Display for NodeType {
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        match self {
37            Self::Source => write!(f, "Source"),
38            Self::Filter => write!(f, "Filter"),
39            Self::Sink => write!(f, "Sink"),
40        }
41    }
42}
43
44/// State of a node during graph execution.
45#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
46pub enum NodeState {
47    /// Node is idle and ready to process.
48    #[default]
49    Idle,
50    /// Node is currently processing.
51    Processing,
52    /// Node has finished processing (end of stream).
53    Done,
54    /// Node encountered an error.
55    Error,
56}
57
58impl fmt::Display for NodeState {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        match self {
61            Self::Idle => write!(f, "Idle"),
62            Self::Processing => write!(f, "Processing"),
63            Self::Done => write!(f, "Done"),
64            Self::Error => write!(f, "Error"),
65        }
66    }
67}
68
69impl NodeState {
70    /// Check if the node can transition to the given state.
71    #[must_use]
72    #[allow(clippy::match_same_arms)]
73    pub fn can_transition_to(&self, new_state: Self) -> bool {
74        match (self, new_state) {
75            // From Idle - can transition to any state
76            (Self::Idle, Self::Processing | Self::Done | Self::Error) => true,
77            // From Processing - can transition to any state
78            (Self::Processing, Self::Idle | Self::Done | Self::Error) => true,
79            // From Done or Error - can only reset to Idle
80            (Self::Done | Self::Error, Self::Idle) => true,
81            // Same state is always ok
82            (a, b) if *a == b => true,
83            _ => false,
84        }
85    }
86}
87
88/// Configuration for a node.
89#[derive(Clone, Debug, Default)]
90pub struct NodeConfig {
91    /// Human-readable name for the node.
92    pub name: String,
93    /// Type of the node.
94    pub node_type: Option<NodeType>,
95    /// Custom configuration options.
96    pub options: HashMap<String, ConfigValue>,
97}
98
99impl NodeConfig {
100    /// Create a new node configuration with the given name.
101    #[must_use]
102    pub fn new(name: impl Into<String>) -> Self {
103        Self {
104            name: name.into(),
105            node_type: None,
106            options: HashMap::new(),
107        }
108    }
109
110    /// Set the node type.
111    #[must_use]
112    pub fn with_type(mut self, node_type: NodeType) -> Self {
113        self.node_type = Some(node_type);
114        self
115    }
116
117    /// Add a configuration option.
118    #[must_use]
119    pub fn with_option(mut self, key: impl Into<String>, value: ConfigValue) -> Self {
120        self.options.insert(key.into(), value);
121        self
122    }
123
124    /// Get a configuration option.
125    #[must_use]
126    pub fn get_option(&self, key: &str) -> Option<&ConfigValue> {
127        self.options.get(key)
128    }
129
130    /// Get an integer option.
131    #[must_use]
132    pub fn get_int(&self, key: &str) -> Option<i64> {
133        self.options.get(key).and_then(ConfigValue::as_int)
134    }
135
136    /// Get a float option.
137    #[must_use]
138    pub fn get_float(&self, key: &str) -> Option<f64> {
139        self.options.get(key).and_then(ConfigValue::as_float)
140    }
141
142    /// Get a string option.
143    #[must_use]
144    pub fn get_string(&self, key: &str) -> Option<&str> {
145        self.options.get(key).and_then(ConfigValue::as_string)
146    }
147
148    /// Get a boolean option.
149    #[must_use]
150    pub fn get_bool(&self, key: &str) -> Option<bool> {
151        self.options.get(key).and_then(ConfigValue::as_bool)
152    }
153}
154
155/// Configuration value type.
156#[derive(Clone, Debug, PartialEq)]
157pub enum ConfigValue {
158    /// Integer value.
159    Int(i64),
160    /// Floating point value.
161    Float(f64),
162    /// String value.
163    String(String),
164    /// Boolean value.
165    Bool(bool),
166}
167
168impl ConfigValue {
169    /// Get value as integer if possible.
170    #[must_use]
171    pub fn as_int(&self) -> Option<i64> {
172        match self {
173            Self::Int(v) => Some(*v),
174            _ => None,
175        }
176    }
177
178    /// Get value as float if possible.
179    #[must_use]
180    pub fn as_float(&self) -> Option<f64> {
181        match self {
182            Self::Float(v) => Some(*v),
183            Self::Int(v) => Some(*v as f64),
184            _ => None,
185        }
186    }
187
188    /// Get value as string if possible.
189    #[must_use]
190    pub fn as_string(&self) -> Option<&str> {
191        match self {
192            Self::String(v) => Some(v),
193            _ => None,
194        }
195    }
196
197    /// Get value as boolean if possible.
198    #[must_use]
199    pub fn as_bool(&self) -> Option<bool> {
200        match self {
201            Self::Bool(v) => Some(*v),
202            _ => None,
203        }
204    }
205}
206
207impl From<i64> for ConfigValue {
208    fn from(v: i64) -> Self {
209        Self::Int(v)
210    }
211}
212
213impl From<f64> for ConfigValue {
214    fn from(v: f64) -> Self {
215        Self::Float(v)
216    }
217}
218
219impl From<String> for ConfigValue {
220    fn from(v: String) -> Self {
221        Self::String(v)
222    }
223}
224
225impl From<&str> for ConfigValue {
226    fn from(v: &str) -> Self {
227        Self::String(v.to_string())
228    }
229}
230
231impl From<bool> for ConfigValue {
232    fn from(v: bool) -> Self {
233        Self::Bool(v)
234    }
235}
236
237/// Trait for filter graph nodes.
238///
239/// Nodes implement this trait to participate in the filter graph.
240/// The graph will call [`Node::process`] to push frames through the pipeline.
241pub trait Node: Send + Sync {
242    /// Get the node's unique identifier.
243    fn id(&self) -> NodeId;
244
245    /// Get the node's name.
246    fn name(&self) -> &str;
247
248    /// Get the node type.
249    fn node_type(&self) -> NodeType;
250
251    /// Get the current state of the node.
252    fn state(&self) -> NodeState;
253
254    /// Set the node state.
255    fn set_state(&mut self, state: NodeState) -> GraphResult<()>;
256
257    /// Get the node's input ports.
258    fn inputs(&self) -> &[InputPort];
259
260    /// Get the node's output ports.
261    fn outputs(&self) -> &[OutputPort];
262
263    /// Initialize the node before processing starts.
264    fn initialize(&mut self) -> GraphResult<()> {
265        Ok(())
266    }
267
268    /// Process available input and produce output.
269    ///
270    /// Returns `Ok(Some(frame))` if a frame was produced, `Ok(None)` if no
271    /// frame is ready (need more input), or `Err` on error.
272    fn process(&mut self, input: Option<FilterFrame>) -> GraphResult<Option<FilterFrame>>;
273
274    /// Flush any buffered data.
275    fn flush(&mut self) -> GraphResult<Vec<FilterFrame>> {
276        Ok(Vec::new())
277    }
278
279    /// Reset the node to initial state.
280    fn reset(&mut self) -> GraphResult<()> {
281        self.set_state(NodeState::Idle)
282    }
283
284    /// Get input port by ID.
285    fn input_port(&self, id: PortId) -> Option<&InputPort> {
286        self.inputs().iter().find(|p| p.id == id)
287    }
288
289    /// Get output port by ID.
290    fn output_port(&self, id: PortId) -> Option<&OutputPort> {
291        self.outputs().iter().find(|p| p.id == id)
292    }
293
294    /// Check if node accepts the given port type as input.
295    fn accepts_input(&self, port_type: PortType) -> bool {
296        self.inputs().iter().any(|p| p.port_type == port_type)
297    }
298
299    /// Check if node produces the given port type as output.
300    fn produces_output(&self, port_type: PortType) -> bool {
301        self.outputs().iter().any(|p| p.port_type == port_type)
302    }
303}
304
305/// Runtime state for a node in the graph.
306#[allow(dead_code)]
307pub struct NodeRuntime {
308    /// The node implementation.
309    node: Box<dyn Node>,
310    /// Input buffers indexed by port ID.
311    input_buffers: HashMap<PortId, Vec<FilterFrame>>,
312    /// Output buffers indexed by port ID.
313    output_buffers: HashMap<PortId, Vec<FilterFrame>>,
314    /// Frames processed count.
315    frames_processed: u64,
316}
317
318impl NodeRuntime {
319    /// Create a new node runtime.
320    pub fn new(node: Box<dyn Node>) -> Self {
321        let input_buffers = node.inputs().iter().map(|p| (p.id, Vec::new())).collect();
322        let output_buffers = node.outputs().iter().map(|p| (p.id, Vec::new())).collect();
323
324        Self {
325            node,
326            input_buffers,
327            output_buffers,
328            frames_processed: 0,
329        }
330    }
331
332    /// Get the underlying node.
333    #[must_use]
334    pub fn node(&self) -> &dyn Node {
335        self.node.as_ref()
336    }
337
338    /// Get mutable reference to the underlying node.
339    pub fn node_mut(&mut self) -> &mut dyn Node {
340        self.node.as_mut()
341    }
342
343    /// Push a frame to an input port.
344    pub fn push_input(&mut self, port: PortId, frame: FilterFrame) -> GraphResult<()> {
345        self.input_buffers
346            .get_mut(&port)
347            .ok_or(GraphError::PortNotFound {
348                node: self.node.id(),
349                port,
350            })?
351            .push(frame);
352        Ok(())
353    }
354
355    /// Pop a frame from an output port.
356    pub fn pop_output(&mut self, port: PortId) -> GraphResult<Option<FilterFrame>> {
357        let buffer = self
358            .output_buffers
359            .get_mut(&port)
360            .ok_or(GraphError::PortNotFound {
361                node: self.node.id(),
362                port,
363            })?;
364
365        Ok(if buffer.is_empty() {
366            None
367        } else {
368            Some(buffer.remove(0))
369        })
370    }
371
372    /// Process the node.
373    pub fn process(&mut self) -> GraphResult<()> {
374        // Get input frame from first input port if available
375        let input = self.input_buffers.values_mut().find_map(|buf| {
376            if buf.is_empty() {
377                None
378            } else {
379                Some(buf.remove(0))
380            }
381        });
382
383        // Process
384        if let Some(output) = self.node.process(input)? {
385            // Push to first output port
386            if let Some(buf) = self.output_buffers.values_mut().next() {
387                buf.push(output);
388            }
389            self.frames_processed += 1;
390        }
391
392        Ok(())
393    }
394
395    /// Get the number of frames processed.
396    #[must_use]
397    pub fn frames_processed(&self) -> u64 {
398        self.frames_processed
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_node_id_display() {
408        let id = NodeId(42);
409        assert_eq!(format!("{id}"), "Node(42)");
410    }
411
412    #[test]
413    fn test_node_type_display() {
414        assert_eq!(format!("{}", NodeType::Source), "Source");
415        assert_eq!(format!("{}", NodeType::Filter), "Filter");
416        assert_eq!(format!("{}", NodeType::Sink), "Sink");
417    }
418
419    #[test]
420    fn test_node_state_transitions() {
421        let state = NodeState::Idle;
422        assert!(state.can_transition_to(NodeState::Processing));
423        assert!(state.can_transition_to(NodeState::Done));
424        assert!(state.can_transition_to(NodeState::Error));
425
426        let state = NodeState::Processing;
427        assert!(state.can_transition_to(NodeState::Idle));
428        assert!(state.can_transition_to(NodeState::Done));
429
430        let state = NodeState::Done;
431        assert!(state.can_transition_to(NodeState::Idle));
432        assert!(!state.can_transition_to(NodeState::Processing));
433    }
434
435    #[test]
436    fn test_node_config() {
437        let config = NodeConfig::new("test")
438            .with_type(NodeType::Filter)
439            .with_option("quality", ConfigValue::Int(80))
440            .with_option("name", ConfigValue::String("test".into()));
441
442        assert_eq!(config.name, "test");
443        assert_eq!(config.node_type, Some(NodeType::Filter));
444        assert_eq!(config.get_int("quality"), Some(80));
445        assert_eq!(config.get_string("name"), Some("test"));
446    }
447
448    #[test]
449    fn test_config_value_conversions() {
450        let int_val = ConfigValue::Int(42);
451        assert_eq!(int_val.as_int(), Some(42));
452        assert_eq!(int_val.as_float(), Some(42.0));
453        assert_eq!(int_val.as_string(), None);
454
455        let float_val = ConfigValue::Float(3.14);
456        assert_eq!(float_val.as_float(), Some(3.14));
457        assert_eq!(float_val.as_int(), None);
458
459        let str_val = ConfigValue::String("hello".into());
460        assert_eq!(str_val.as_string(), Some("hello"));
461
462        let bool_val = ConfigValue::Bool(true);
463        assert_eq!(bool_val.as_bool(), Some(true));
464    }
465
466    #[test]
467    fn test_config_value_from() {
468        let _: ConfigValue = 42i64.into();
469        let _: ConfigValue = 3.14f64.into();
470        let _: ConfigValue = "test".into();
471        let _: ConfigValue = String::from("test").into();
472        let _: ConfigValue = true.into();
473    }
474}