Skip to main content

oximedia_graph/
processing_graph.rs

1//! Media processing graph with nodes, edges, and topological execution ordering.
2//!
3//! This module models a directed acyclic graph (DAG) of media processing nodes.
4//! Nodes represent processing stages (source, filter, encoder, etc.), and edges
5//! represent data flow between them.
6
7/// Classification of a processing node.
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum NodeType {
10    /// Produces media data (e.g. file reader, camera capture).
11    Source,
12    /// Decodes compressed media into raw frames.
13    Decoder,
14    /// Transforms media data (e.g. scaler, colour converter).
15    Filter,
16    /// Encodes raw frames into a compressed format.
17    Encoder,
18    /// Consumes media data (e.g. file writer, display).
19    Sink,
20    /// Combines multiple input streams into one.
21    Mixer,
22    /// Distributes one input stream to multiple outputs.
23    Splitter,
24}
25
26impl NodeType {
27    /// Maximum number of input connections accepted by this node type.
28    pub fn max_inputs(&self) -> usize {
29        match self {
30            Self::Source => 0,
31            Self::Decoder => 1,
32            Self::Filter => 1,
33            Self::Encoder => 1,
34            Self::Sink => 1,
35            Self::Mixer => 8,
36            Self::Splitter => 1,
37        }
38    }
39
40    /// Maximum number of output connections this node type can produce.
41    pub fn max_outputs(&self) -> usize {
42        match self {
43            Self::Source => 1,
44            Self::Decoder => 1,
45            Self::Filter => 1,
46            Self::Encoder => 1,
47            Self::Sink => 0,
48            Self::Mixer => 1,
49            Self::Splitter => 8,
50        }
51    }
52}
53
54/// A single node in a media [`ProcessingGraph`].
55#[derive(Debug, Clone)]
56pub struct GraphNode {
57    /// Unique identifier for this node within the graph.
58    pub id: u64,
59    /// Human-readable name.
60    pub name: String,
61    /// Functional type of this node.
62    pub node_type: NodeType,
63    /// Whether this node should participate in processing.
64    pub enabled: bool,
65    /// Arbitrary key-value configuration parameters.
66    pub params: Vec<(String, String)>,
67}
68
69impl GraphNode {
70    /// Creates a new, enabled node with no parameters.
71    pub fn new(id: u64, name: &str, node_type: NodeType) -> Self {
72        Self {
73            id,
74            name: name.to_string(),
75            node_type,
76            enabled: true,
77            params: Vec::new(),
78        }
79    }
80
81    /// Returns the value for `key`, or `None` if not set.
82    pub fn get_param(&self, key: &str) -> Option<&str> {
83        self.params
84            .iter()
85            .find(|(k, _)| k == key)
86            .map(|(_, v)| v.as_str())
87    }
88
89    /// Sets (or updates) `key` to `value`.
90    pub fn set_param(&mut self, key: &str, value: &str) {
91        if let Some(entry) = self.params.iter_mut().find(|(k, _)| k == key) {
92            entry.1 = value.to_string();
93        } else {
94            self.params.push((key.to_string(), value.to_string()));
95        }
96    }
97}
98
99/// A directed connection between two ports on two nodes.
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct GraphEdge {
102    /// Source node identifier.
103    pub from_node: u64,
104    /// Output port index on the source node.
105    pub from_port: u32,
106    /// Destination node identifier.
107    pub to_node: u64,
108    /// Input port index on the destination node.
109    pub to_port: u32,
110}
111
112impl GraphEdge {
113    /// Returns `true` if this edge goes from `from` to `to`.
114    pub fn connects(&self, from: u64, to: u64) -> bool {
115        self.from_node == from && self.to_node == to
116    }
117}
118
119/// A directed acyclic graph of media processing nodes.
120#[derive(Debug, Default)]
121pub struct ProcessingGraph {
122    /// All nodes in the graph.
123    pub nodes: Vec<GraphNode>,
124    /// All edges in the graph.
125    pub edges: Vec<GraphEdge>,
126}
127
128impl ProcessingGraph {
129    /// Creates an empty processing graph.
130    pub fn new() -> Self {
131        Self::default()
132    }
133
134    /// Adds `node` to the graph.  Duplicate IDs are allowed but discouraged.
135    pub fn add_node(&mut self, node: GraphNode) {
136        self.nodes.push(node);
137    }
138
139    /// Removes the node with `id` and all edges referencing it.
140    ///
141    /// Returns `true` if a node was removed.
142    pub fn remove_node(&mut self, id: u64) -> bool {
143        let before = self.nodes.len();
144        self.nodes.retain(|n| n.id != id);
145        self.edges.retain(|e| e.from_node != id && e.to_node != id);
146        self.nodes.len() < before
147    }
148
149    /// Adds an edge from `(from, from_port)` to `(to, to_port)`.
150    ///
151    /// Returns `false` if either node does not exist; `true` on success.
152    pub fn connect(&mut self, from: u64, from_port: u32, to: u64, to_port: u32) -> bool {
153        let has_from = self.nodes.iter().any(|n| n.id == from);
154        let has_to = self.nodes.iter().any(|n| n.id == to);
155        if !has_from || !has_to {
156            return false;
157        }
158        self.edges.push(GraphEdge {
159            from_node: from,
160            from_port,
161            to_node: to,
162            to_port,
163        });
164        true
165    }
166
167    /// Removes all edges from node `from` to node `to`.
168    ///
169    /// Returns `true` if at least one edge was removed.
170    pub fn disconnect(&mut self, from: u64, to: u64) -> bool {
171        let before = self.edges.len();
172        self.edges.retain(|e| !e.connects(from, to));
173        self.edges.len() < before
174    }
175
176    /// Returns references to all nodes whose type has zero inputs (source nodes).
177    pub fn source_nodes(&self) -> Vec<&GraphNode> {
178        self.nodes
179            .iter()
180            .filter(|n| n.node_type.max_inputs() == 0)
181            .collect()
182    }
183
184    /// Returns references to all nodes whose type has zero outputs (sink nodes).
185    pub fn sink_nodes(&self) -> Vec<&GraphNode> {
186        self.nodes
187            .iter()
188            .filter(|n| n.node_type.max_outputs() == 0)
189            .collect()
190    }
191
192    /// Returns node IDs in topological execution order (Kahn's algorithm).
193    ///
194    /// Nodes not reachable from any source, or that form cycles, are appended
195    /// in arbitrary order at the end.
196    pub fn execution_order(&self) -> Vec<u64> {
197        use std::collections::{HashMap, VecDeque};
198
199        // Count incoming edges per node (enabled nodes only).
200        let mut in_degree: HashMap<u64, usize> = self
201            .nodes
202            .iter()
203            .filter(|n| n.enabled)
204            .map(|n| (n.id, 0))
205            .collect();
206
207        for edge in &self.edges {
208            if in_degree.contains_key(&edge.from_node) && in_degree.contains_key(&edge.to_node) {
209                *in_degree.entry(edge.to_node).or_insert(0) += 1;
210            }
211        }
212
213        // Seed the queue with zero-in-degree nodes.
214        let mut queue: VecDeque<u64> = in_degree
215            .iter()
216            .filter(|(_, &deg)| deg == 0)
217            .map(|(&id, _)| id)
218            .collect();
219
220        // Sort for determinism.
221        let mut queue_vec: Vec<u64> = queue.drain(..).collect();
222        queue_vec.sort_unstable();
223        queue.extend(queue_vec);
224
225        let mut order = Vec::with_capacity(self.nodes.len());
226
227        while let Some(id) = queue.pop_front() {
228            order.push(id);
229            // Find successors and decrement their in-degree.
230            let mut new_ready: Vec<u64> = self
231                .edges
232                .iter()
233                .filter(|e| e.from_node == id)
234                .filter_map(|e| {
235                    let deg = in_degree.get_mut(&e.to_node)?;
236                    *deg = deg.saturating_sub(1);
237                    if *deg == 0 {
238                        Some(e.to_node)
239                    } else {
240                        None
241                    }
242                })
243                .collect();
244            new_ready.sort_unstable();
245            queue.extend(new_ready);
246        }
247
248        // Append any remaining nodes (disabled or cycle members) in id order.
249        let mut remaining: Vec<u64> = self
250            .nodes
251            .iter()
252            .map(|n| n.id)
253            .filter(|id| !order.contains(id))
254            .collect();
255        remaining.sort_unstable();
256        order.extend(remaining);
257
258        order
259    }
260}
261
262// ─────────────────────────────────────────────────────────────────────────────
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    fn source(id: u64) -> GraphNode {
268        GraphNode::new(id, &format!("source_{id}"), NodeType::Source)
269    }
270    fn filter(id: u64) -> GraphNode {
271        GraphNode::new(id, &format!("filter_{id}"), NodeType::Filter)
272    }
273    fn sink(id: u64) -> GraphNode {
274        GraphNode::new(id, &format!("sink_{id}"), NodeType::Sink)
275    }
276
277    // ── NodeType ─────────────────────────────────────────────────────────────
278
279    #[test]
280    fn source_has_zero_inputs() {
281        assert_eq!(NodeType::Source.max_inputs(), 0);
282    }
283
284    #[test]
285    fn sink_has_zero_outputs() {
286        assert_eq!(NodeType::Sink.max_outputs(), 0);
287    }
288
289    #[test]
290    fn mixer_accepts_multiple_inputs() {
291        assert!(NodeType::Mixer.max_inputs() > 1);
292    }
293
294    #[test]
295    fn splitter_produces_multiple_outputs() {
296        assert!(NodeType::Splitter.max_outputs() > 1);
297    }
298
299    // ── GraphNode ────────────────────────────────────────────────────────────
300
301    #[test]
302    fn node_set_and_get_param() {
303        let mut n = filter(1);
304        n.set_param("width", "1920");
305        assert_eq!(n.get_param("width"), Some("1920"));
306    }
307
308    #[test]
309    fn node_update_existing_param() {
310        let mut n = filter(2);
311        n.set_param("fps", "24");
312        n.set_param("fps", "60");
313        assert_eq!(n.get_param("fps"), Some("60"));
314        // Only one entry for the key.
315        assert_eq!(n.params.iter().filter(|(k, _)| k == "fps").count(), 1);
316    }
317
318    #[test]
319    fn node_missing_param_returns_none() {
320        let n = source(3);
321        assert!(n.get_param("nonexistent").is_none());
322    }
323
324    // ── GraphEdge ────────────────────────────────────────────────────────────
325
326    #[test]
327    fn edge_connects_returns_true_for_matching_pair() {
328        let edge = GraphEdge {
329            from_node: 1,
330            from_port: 0,
331            to_node: 2,
332            to_port: 0,
333        };
334        assert!(edge.connects(1, 2));
335    }
336
337    #[test]
338    fn edge_connects_returns_false_for_reversed_pair() {
339        let edge = GraphEdge {
340            from_node: 1,
341            from_port: 0,
342            to_node: 2,
343            to_port: 0,
344        };
345        assert!(!edge.connects(2, 1));
346    }
347
348    // ── ProcessingGraph ───────────────────────────────────────────────────────
349
350    #[test]
351    fn add_and_remove_node() {
352        let mut g = ProcessingGraph::new();
353        g.add_node(source(10));
354        assert_eq!(g.nodes.len(), 1);
355        assert!(g.remove_node(10));
356        assert!(g.nodes.is_empty());
357    }
358
359    #[test]
360    fn remove_node_also_removes_edges() {
361        let mut g = ProcessingGraph::new();
362        g.add_node(source(1));
363        g.add_node(sink(2));
364        g.connect(1, 0, 2, 0);
365        g.remove_node(1);
366        assert!(g.edges.is_empty());
367    }
368
369    #[test]
370    fn connect_fails_for_missing_node() {
371        let mut g = ProcessingGraph::new();
372        g.add_node(source(1));
373        assert!(!g.connect(1, 0, 99, 0)); // node 99 missing
374    }
375
376    #[test]
377    fn disconnect_removes_all_matching_edges() {
378        let mut g = ProcessingGraph::new();
379        g.add_node(source(1));
380        g.add_node(sink(2));
381        g.connect(1, 0, 2, 0);
382        g.connect(1, 0, 2, 1);
383        assert!(g.disconnect(1, 2));
384        assert!(g.edges.is_empty());
385    }
386
387    #[test]
388    fn source_nodes_returns_only_sources() {
389        let mut g = ProcessingGraph::new();
390        g.add_node(source(1));
391        g.add_node(filter(2));
392        g.add_node(sink(3));
393        let srcs: Vec<u64> = g.source_nodes().into_iter().map(|n| n.id).collect();
394        assert_eq!(srcs, vec![1]);
395    }
396
397    #[test]
398    fn sink_nodes_returns_only_sinks() {
399        let mut g = ProcessingGraph::new();
400        g.add_node(source(1));
401        g.add_node(sink(2));
402        let sinks: Vec<u64> = g.sink_nodes().into_iter().map(|n| n.id).collect();
403        assert_eq!(sinks, vec![2]);
404    }
405
406    #[test]
407    fn execution_order_linear_pipeline() {
408        // source(1) -> filter(2) -> sink(3)
409        let mut g = ProcessingGraph::new();
410        g.add_node(source(1));
411        g.add_node(filter(2));
412        g.add_node(sink(3));
413        g.connect(1, 0, 2, 0);
414        g.connect(2, 0, 3, 0);
415        let order = g.execution_order();
416        assert_eq!(order, vec![1, 2, 3]);
417    }
418
419    #[test]
420    fn execution_order_independent_nodes_are_included() {
421        let mut g = ProcessingGraph::new();
422        g.add_node(source(1));
423        g.add_node(source(2));
424        let order = g.execution_order();
425        assert_eq!(order.len(), 2);
426    }
427}