#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeType {
Source,
Decoder,
Filter,
Encoder,
Sink,
Mixer,
Splitter,
}
impl NodeType {
pub fn max_inputs(&self) -> usize {
match self {
Self::Source => 0,
Self::Decoder => 1,
Self::Filter => 1,
Self::Encoder => 1,
Self::Sink => 1,
Self::Mixer => 8,
Self::Splitter => 1,
}
}
pub fn max_outputs(&self) -> usize {
match self {
Self::Source => 1,
Self::Decoder => 1,
Self::Filter => 1,
Self::Encoder => 1,
Self::Sink => 0,
Self::Mixer => 1,
Self::Splitter => 8,
}
}
}
#[derive(Debug, Clone)]
pub struct GraphNode {
pub id: u64,
pub name: String,
pub node_type: NodeType,
pub enabled: bool,
pub params: Vec<(String, String)>,
}
impl GraphNode {
pub fn new(id: u64, name: &str, node_type: NodeType) -> Self {
Self {
id,
name: name.to_string(),
node_type,
enabled: true,
params: Vec::new(),
}
}
pub fn get_param(&self, key: &str) -> Option<&str> {
self.params
.iter()
.find(|(k, _)| k == key)
.map(|(_, v)| v.as_str())
}
pub fn set_param(&mut self, key: &str, value: &str) {
if let Some(entry) = self.params.iter_mut().find(|(k, _)| k == key) {
entry.1 = value.to_string();
} else {
self.params.push((key.to_string(), value.to_string()));
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GraphEdge {
pub from_node: u64,
pub from_port: u32,
pub to_node: u64,
pub to_port: u32,
}
impl GraphEdge {
pub fn connects(&self, from: u64, to: u64) -> bool {
self.from_node == from && self.to_node == to
}
}
#[derive(Debug, Default)]
pub struct ProcessingGraph {
pub nodes: Vec<GraphNode>,
pub edges: Vec<GraphEdge>,
pub is_locked: bool,
}
impl ProcessingGraph {
pub fn new() -> Self {
Self::default()
}
pub fn add_node(&mut self, node: GraphNode) {
self.nodes.push(node);
}
pub fn remove_node(&mut self, id: u64) -> bool {
let before = self.nodes.len();
self.nodes.retain(|n| n.id != id);
self.edges.retain(|e| e.from_node != id && e.to_node != id);
self.nodes.len() < before
}
pub fn connect(&mut self, from: u64, from_port: u32, to: u64, to_port: u32) -> bool {
let has_from = self.nodes.iter().any(|n| n.id == from);
let has_to = self.nodes.iter().any(|n| n.id == to);
if !has_from || !has_to {
return false;
}
self.edges.push(GraphEdge {
from_node: from,
from_port,
to_node: to,
to_port,
});
true
}
pub fn disconnect(&mut self, from: u64, to: u64) -> bool {
let before = self.edges.len();
self.edges.retain(|e| !e.connects(from, to));
self.edges.len() < before
}
pub fn source_nodes(&self) -> Vec<&GraphNode> {
self.nodes
.iter()
.filter(|n| n.node_type.max_inputs() == 0)
.collect()
}
pub fn sink_nodes(&self) -> Vec<&GraphNode> {
self.nodes
.iter()
.filter(|n| n.node_type.max_outputs() == 0)
.collect()
}
pub fn execution_order(&self) -> Vec<u64> {
use std::collections::{HashMap, VecDeque};
let mut in_degree: HashMap<u64, usize> = self
.nodes
.iter()
.filter(|n| n.enabled)
.map(|n| (n.id, 0))
.collect();
for edge in &self.edges {
if in_degree.contains_key(&edge.from_node) && in_degree.contains_key(&edge.to_node) {
*in_degree.entry(edge.to_node).or_insert(0) += 1;
}
}
let mut queue: VecDeque<u64> = in_degree
.iter()
.filter(|(_, °)| deg == 0)
.map(|(&id, _)| id)
.collect();
let mut queue_vec: Vec<u64> = queue.drain(..).collect();
queue_vec.sort_unstable();
queue.extend(queue_vec);
let mut order = Vec::with_capacity(self.nodes.len());
while let Some(id) = queue.pop_front() {
order.push(id);
let mut new_ready: Vec<u64> = self
.edges
.iter()
.filter(|e| e.from_node == id)
.filter_map(|e| {
let deg = in_degree.get_mut(&e.to_node)?;
*deg = deg.saturating_sub(1);
if *deg == 0 {
Some(e.to_node)
} else {
None
}
})
.collect();
new_ready.sort_unstable();
queue.extend(new_ready);
}
let mut remaining: Vec<u64> = self
.nodes
.iter()
.map(|n| n.id)
.filter(|id| !order.contains(id))
.collect();
remaining.sort_unstable();
order.extend(remaining);
order
}
}
pub trait TransientError {
fn is_transient(&self) -> bool;
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub backoff_ms: u64,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_attempts: 3,
backoff_ms: 1,
}
}
}
impl RetryPolicy {
pub fn execute<T, E, F>(&self, mut f: F) -> Result<T, E>
where
E: TransientError,
F: FnMut() -> Result<T, E>,
{
let mut last_err: Option<E> = None;
for attempt in 0..self.max_attempts {
match f() {
Ok(v) => return Ok(v),
Err(e) => {
if !e.is_transient() {
return Err(e);
}
if attempt + 1 < self.max_attempts {
let sleep_ms = self.backoff_ms.saturating_mul(1u64 << attempt);
std::thread::sleep(std::time::Duration::from_millis(sleep_ms));
}
last_err = Some(e);
}
}
}
Err(last_err.expect("at least one attempt must have set last_err"))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn source(id: u64) -> GraphNode {
GraphNode::new(id, &format!("source_{id}"), NodeType::Source)
}
fn filter(id: u64) -> GraphNode {
GraphNode::new(id, &format!("filter_{id}"), NodeType::Filter)
}
fn sink(id: u64) -> GraphNode {
GraphNode::new(id, &format!("sink_{id}"), NodeType::Sink)
}
#[test]
fn source_has_zero_inputs() {
assert_eq!(NodeType::Source.max_inputs(), 0);
}
#[test]
fn sink_has_zero_outputs() {
assert_eq!(NodeType::Sink.max_outputs(), 0);
}
#[test]
fn mixer_accepts_multiple_inputs() {
assert!(NodeType::Mixer.max_inputs() > 1);
}
#[test]
fn splitter_produces_multiple_outputs() {
assert!(NodeType::Splitter.max_outputs() > 1);
}
#[test]
fn node_set_and_get_param() {
let mut n = filter(1);
n.set_param("width", "1920");
assert_eq!(n.get_param("width"), Some("1920"));
}
#[test]
fn node_update_existing_param() {
let mut n = filter(2);
n.set_param("fps", "24");
n.set_param("fps", "60");
assert_eq!(n.get_param("fps"), Some("60"));
assert_eq!(n.params.iter().filter(|(k, _)| k == "fps").count(), 1);
}
#[test]
fn node_missing_param_returns_none() {
let n = source(3);
assert!(n.get_param("nonexistent").is_none());
}
#[test]
fn edge_connects_returns_true_for_matching_pair() {
let edge = GraphEdge {
from_node: 1,
from_port: 0,
to_node: 2,
to_port: 0,
};
assert!(edge.connects(1, 2));
}
#[test]
fn edge_connects_returns_false_for_reversed_pair() {
let edge = GraphEdge {
from_node: 1,
from_port: 0,
to_node: 2,
to_port: 0,
};
assert!(!edge.connects(2, 1));
}
#[test]
fn add_and_remove_node() {
let mut g = ProcessingGraph::new();
g.add_node(source(10));
assert_eq!(g.nodes.len(), 1);
assert!(g.remove_node(10));
assert!(g.nodes.is_empty());
}
#[test]
fn remove_node_also_removes_edges() {
let mut g = ProcessingGraph::new();
g.add_node(source(1));
g.add_node(sink(2));
g.connect(1, 0, 2, 0);
g.remove_node(1);
assert!(g.edges.is_empty());
}
#[test]
fn connect_fails_for_missing_node() {
let mut g = ProcessingGraph::new();
g.add_node(source(1));
assert!(!g.connect(1, 0, 99, 0)); }
#[test]
fn disconnect_removes_all_matching_edges() {
let mut g = ProcessingGraph::new();
g.add_node(source(1));
g.add_node(sink(2));
g.connect(1, 0, 2, 0);
g.connect(1, 0, 2, 1);
assert!(g.disconnect(1, 2));
assert!(g.edges.is_empty());
}
#[test]
fn source_nodes_returns_only_sources() {
let mut g = ProcessingGraph::new();
g.add_node(source(1));
g.add_node(filter(2));
g.add_node(sink(3));
let srcs: Vec<u64> = g.source_nodes().into_iter().map(|n| n.id).collect();
assert_eq!(srcs, vec![1]);
}
#[test]
fn sink_nodes_returns_only_sinks() {
let mut g = ProcessingGraph::new();
g.add_node(source(1));
g.add_node(sink(2));
let sinks: Vec<u64> = g.sink_nodes().into_iter().map(|n| n.id).collect();
assert_eq!(sinks, vec![2]);
}
#[test]
fn execution_order_linear_pipeline() {
let mut g = ProcessingGraph::new();
g.add_node(source(1));
g.add_node(filter(2));
g.add_node(sink(3));
g.connect(1, 0, 2, 0);
g.connect(2, 0, 3, 0);
let order = g.execution_order();
assert_eq!(order, vec![1, 2, 3]);
}
#[test]
fn execution_order_independent_nodes_are_included() {
let mut g = ProcessingGraph::new();
g.add_node(source(1));
g.add_node(source(2));
let order = g.execution_order();
assert_eq!(order.len(), 2);
}
#[derive(Debug, PartialEq, Eq)]
enum TestError {
Transient(String),
Fatal(String),
}
impl TransientError for TestError {
fn is_transient(&self) -> bool {
matches!(self, Self::Transient(_))
}
}
#[test]
fn test_retry_succeeds_on_second_attempt() {
let call_count = std::cell::Cell::new(0u32);
let policy = RetryPolicy {
max_attempts: 3,
backoff_ms: 0,
};
let result = policy.execute(|| {
let n = call_count.get();
call_count.set(n + 1);
if n == 0 {
Err(TestError::Transient("temp".to_string()))
} else {
Ok(42u32)
}
});
assert_eq!(result, Ok(42));
assert_eq!(call_count.get(), 2, "should have been called exactly twice");
}
#[test]
fn test_retry_exhausted() {
let call_count = std::cell::Cell::new(0u32);
let policy = RetryPolicy {
max_attempts: 3,
backoff_ms: 0,
};
let result: Result<u32, TestError> = policy.execute(|| {
call_count.set(call_count.get() + 1);
Err(TestError::Transient("always fails".to_string()))
});
assert!(result.is_err(), "all attempts exhausted; must return Err");
assert_eq!(
call_count.get(),
3,
"execute must invoke f exactly max_attempts times"
);
}
#[test]
fn test_retry_fatal_error_no_retry() {
let call_count = std::cell::Cell::new(0u32);
let policy = RetryPolicy {
max_attempts: 5,
backoff_ms: 0,
};
let result: Result<u32, TestError> = policy.execute(|| {
call_count.set(call_count.get() + 1);
Err(TestError::Fatal("unrecoverable".to_string()))
});
assert!(result.is_err());
assert_eq!(
call_count.get(),
1,
"fatal error must halt retries immediately"
);
}
}