1#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum NodeType {
10 Source,
12 Decoder,
14 Filter,
16 Encoder,
18 Sink,
20 Mixer,
22 Splitter,
24}
25
26impl NodeType {
27 pub fn max_inputs(&self) -> usize {
29 match self {
30 Self::Source => 0,
31 Self::Decoder => 1,
32 Self::Filter => 1,
33 Self::Encoder => 1,
34 Self::Sink => 1,
35 Self::Mixer => 8,
36 Self::Splitter => 1,
37 }
38 }
39
40 pub fn max_outputs(&self) -> usize {
42 match self {
43 Self::Source => 1,
44 Self::Decoder => 1,
45 Self::Filter => 1,
46 Self::Encoder => 1,
47 Self::Sink => 0,
48 Self::Mixer => 1,
49 Self::Splitter => 8,
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct GraphNode {
57 pub id: u64,
59 pub name: String,
61 pub node_type: NodeType,
63 pub enabled: bool,
65 pub params: Vec<(String, String)>,
67}
68
69impl GraphNode {
70 pub fn new(id: u64, name: &str, node_type: NodeType) -> Self {
72 Self {
73 id,
74 name: name.to_string(),
75 node_type,
76 enabled: true,
77 params: Vec::new(),
78 }
79 }
80
81 pub fn get_param(&self, key: &str) -> Option<&str> {
83 self.params
84 .iter()
85 .find(|(k, _)| k == key)
86 .map(|(_, v)| v.as_str())
87 }
88
89 pub fn set_param(&mut self, key: &str, value: &str) {
91 if let Some(entry) = self.params.iter_mut().find(|(k, _)| k == key) {
92 entry.1 = value.to_string();
93 } else {
94 self.params.push((key.to_string(), value.to_string()));
95 }
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct GraphEdge {
102 pub from_node: u64,
104 pub from_port: u32,
106 pub to_node: u64,
108 pub to_port: u32,
110}
111
112impl GraphEdge {
113 pub fn connects(&self, from: u64, to: u64) -> bool {
115 self.from_node == from && self.to_node == to
116 }
117}
118
119#[derive(Debug, Default)]
121pub struct ProcessingGraph {
122 pub nodes: Vec<GraphNode>,
124 pub edges: Vec<GraphEdge>,
126}
127
128impl ProcessingGraph {
129 pub fn new() -> Self {
131 Self::default()
132 }
133
134 pub fn add_node(&mut self, node: GraphNode) {
136 self.nodes.push(node);
137 }
138
139 pub fn remove_node(&mut self, id: u64) -> bool {
143 let before = self.nodes.len();
144 self.nodes.retain(|n| n.id != id);
145 self.edges.retain(|e| e.from_node != id && e.to_node != id);
146 self.nodes.len() < before
147 }
148
149 pub fn connect(&mut self, from: u64, from_port: u32, to: u64, to_port: u32) -> bool {
153 let has_from = self.nodes.iter().any(|n| n.id == from);
154 let has_to = self.nodes.iter().any(|n| n.id == to);
155 if !has_from || !has_to {
156 return false;
157 }
158 self.edges.push(GraphEdge {
159 from_node: from,
160 from_port,
161 to_node: to,
162 to_port,
163 });
164 true
165 }
166
167 pub fn disconnect(&mut self, from: u64, to: u64) -> bool {
171 let before = self.edges.len();
172 self.edges.retain(|e| !e.connects(from, to));
173 self.edges.len() < before
174 }
175
176 pub fn source_nodes(&self) -> Vec<&GraphNode> {
178 self.nodes
179 .iter()
180 .filter(|n| n.node_type.max_inputs() == 0)
181 .collect()
182 }
183
184 pub fn sink_nodes(&self) -> Vec<&GraphNode> {
186 self.nodes
187 .iter()
188 .filter(|n| n.node_type.max_outputs() == 0)
189 .collect()
190 }
191
192 pub fn execution_order(&self) -> Vec<u64> {
197 use std::collections::{HashMap, VecDeque};
198
199 let mut in_degree: HashMap<u64, usize> = self
201 .nodes
202 .iter()
203 .filter(|n| n.enabled)
204 .map(|n| (n.id, 0))
205 .collect();
206
207 for edge in &self.edges {
208 if in_degree.contains_key(&edge.from_node) && in_degree.contains_key(&edge.to_node) {
209 *in_degree.entry(edge.to_node).or_insert(0) += 1;
210 }
211 }
212
213 let mut queue: VecDeque<u64> = in_degree
215 .iter()
216 .filter(|(_, °)| deg == 0)
217 .map(|(&id, _)| id)
218 .collect();
219
220 let mut queue_vec: Vec<u64> = queue.drain(..).collect();
222 queue_vec.sort_unstable();
223 queue.extend(queue_vec);
224
225 let mut order = Vec::with_capacity(self.nodes.len());
226
227 while let Some(id) = queue.pop_front() {
228 order.push(id);
229 let mut new_ready: Vec<u64> = self
231 .edges
232 .iter()
233 .filter(|e| e.from_node == id)
234 .filter_map(|e| {
235 let deg = in_degree.get_mut(&e.to_node)?;
236 *deg = deg.saturating_sub(1);
237 if *deg == 0 {
238 Some(e.to_node)
239 } else {
240 None
241 }
242 })
243 .collect();
244 new_ready.sort_unstable();
245 queue.extend(new_ready);
246 }
247
248 let mut remaining: Vec<u64> = self
250 .nodes
251 .iter()
252 .map(|n| n.id)
253 .filter(|id| !order.contains(id))
254 .collect();
255 remaining.sort_unstable();
256 order.extend(remaining);
257
258 order
259 }
260}
261
262#[cfg(test)]
264mod tests {
265 use super::*;
266
267 fn source(id: u64) -> GraphNode {
268 GraphNode::new(id, &format!("source_{id}"), NodeType::Source)
269 }
270 fn filter(id: u64) -> GraphNode {
271 GraphNode::new(id, &format!("filter_{id}"), NodeType::Filter)
272 }
273 fn sink(id: u64) -> GraphNode {
274 GraphNode::new(id, &format!("sink_{id}"), NodeType::Sink)
275 }
276
277 #[test]
280 fn source_has_zero_inputs() {
281 assert_eq!(NodeType::Source.max_inputs(), 0);
282 }
283
284 #[test]
285 fn sink_has_zero_outputs() {
286 assert_eq!(NodeType::Sink.max_outputs(), 0);
287 }
288
289 #[test]
290 fn mixer_accepts_multiple_inputs() {
291 assert!(NodeType::Mixer.max_inputs() > 1);
292 }
293
294 #[test]
295 fn splitter_produces_multiple_outputs() {
296 assert!(NodeType::Splitter.max_outputs() > 1);
297 }
298
299 #[test]
302 fn node_set_and_get_param() {
303 let mut n = filter(1);
304 n.set_param("width", "1920");
305 assert_eq!(n.get_param("width"), Some("1920"));
306 }
307
308 #[test]
309 fn node_update_existing_param() {
310 let mut n = filter(2);
311 n.set_param("fps", "24");
312 n.set_param("fps", "60");
313 assert_eq!(n.get_param("fps"), Some("60"));
314 assert_eq!(n.params.iter().filter(|(k, _)| k == "fps").count(), 1);
316 }
317
318 #[test]
319 fn node_missing_param_returns_none() {
320 let n = source(3);
321 assert!(n.get_param("nonexistent").is_none());
322 }
323
324 #[test]
327 fn edge_connects_returns_true_for_matching_pair() {
328 let edge = GraphEdge {
329 from_node: 1,
330 from_port: 0,
331 to_node: 2,
332 to_port: 0,
333 };
334 assert!(edge.connects(1, 2));
335 }
336
337 #[test]
338 fn edge_connects_returns_false_for_reversed_pair() {
339 let edge = GraphEdge {
340 from_node: 1,
341 from_port: 0,
342 to_node: 2,
343 to_port: 0,
344 };
345 assert!(!edge.connects(2, 1));
346 }
347
348 #[test]
351 fn add_and_remove_node() {
352 let mut g = ProcessingGraph::new();
353 g.add_node(source(10));
354 assert_eq!(g.nodes.len(), 1);
355 assert!(g.remove_node(10));
356 assert!(g.nodes.is_empty());
357 }
358
359 #[test]
360 fn remove_node_also_removes_edges() {
361 let mut g = ProcessingGraph::new();
362 g.add_node(source(1));
363 g.add_node(sink(2));
364 g.connect(1, 0, 2, 0);
365 g.remove_node(1);
366 assert!(g.edges.is_empty());
367 }
368
369 #[test]
370 fn connect_fails_for_missing_node() {
371 let mut g = ProcessingGraph::new();
372 g.add_node(source(1));
373 assert!(!g.connect(1, 0, 99, 0)); }
375
376 #[test]
377 fn disconnect_removes_all_matching_edges() {
378 let mut g = ProcessingGraph::new();
379 g.add_node(source(1));
380 g.add_node(sink(2));
381 g.connect(1, 0, 2, 0);
382 g.connect(1, 0, 2, 1);
383 assert!(g.disconnect(1, 2));
384 assert!(g.edges.is_empty());
385 }
386
387 #[test]
388 fn source_nodes_returns_only_sources() {
389 let mut g = ProcessingGraph::new();
390 g.add_node(source(1));
391 g.add_node(filter(2));
392 g.add_node(sink(3));
393 let srcs: Vec<u64> = g.source_nodes().into_iter().map(|n| n.id).collect();
394 assert_eq!(srcs, vec![1]);
395 }
396
397 #[test]
398 fn sink_nodes_returns_only_sinks() {
399 let mut g = ProcessingGraph::new();
400 g.add_node(source(1));
401 g.add_node(sink(2));
402 let sinks: Vec<u64> = g.sink_nodes().into_iter().map(|n| n.id).collect();
403 assert_eq!(sinks, vec![2]);
404 }
405
406 #[test]
407 fn execution_order_linear_pipeline() {
408 let mut g = ProcessingGraph::new();
410 g.add_node(source(1));
411 g.add_node(filter(2));
412 g.add_node(sink(3));
413 g.connect(1, 0, 2, 0);
414 g.connect(2, 0, 3, 0);
415 let order = g.execution_order();
416 assert_eq!(order, vec![1, 2, 3]);
417 }
418
419 #[test]
420 fn execution_order_independent_nodes_are_included() {
421 let mut g = ProcessingGraph::new();
422 g.add_node(source(1));
423 g.add_node(source(2));
424 let order = g.execution_order();
425 assert_eq!(order.len(), 2);
426 }
427}