use crate::processing_graph::{GraphNode, NodeType, ProcessingGraph};
#[derive(Debug, Clone, PartialEq)]
pub struct PortSignature {
pub num_inputs: usize,
pub num_outputs: usize,
pub input_formats: Vec<String>,
pub output_formats: Vec<String>,
}
impl PortSignature {
#[must_use]
pub fn is_compatible_with(&self, other: &PortSignature) -> bool {
if self.num_inputs != other.num_inputs || self.num_outputs != other.num_outputs {
return false;
}
let inputs_ok = self
.input_formats
.iter()
.zip(&other.input_formats)
.all(|(a, b)| format_family(a) == format_family(b));
let outputs_ok = self
.output_formats
.iter()
.zip(&other.output_formats)
.all(|(a, b)| format_family(a) == format_family(b));
inputs_ok && outputs_ok
}
}
#[must_use]
pub fn format_family(fmt: &str) -> &str {
fmt.split('/').next().unwrap_or(fmt)
}
pub trait HotSwappable {
fn port_signature(&self) -> PortSignature;
}
fn default_format_tag(_node_type: &NodeType) -> &'static str {
"video"
}
impl HotSwappable for GraphNode {
fn port_signature(&self) -> PortSignature {
let max_in = self.node_type.max_inputs();
let max_out = self.node_type.max_outputs();
let tag = default_format_tag(&self.node_type);
PortSignature {
num_inputs: max_in,
num_outputs: max_out,
input_formats: vec![tag.to_string(); max_in],
output_formats: vec![tag.to_string(); max_out],
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HotSwapResult {
Success,
IncompatiblePorts {
reason: String,
},
NodeNotFound,
GraphLocked,
}
impl ProcessingGraph {
pub fn hot_swap_node(&mut self, node_id: u64, mut replacement: GraphNode) -> HotSwapResult {
if self.is_locked {
return HotSwapResult::GraphLocked;
}
let existing_pos = match self.nodes.iter().position(|n| n.id == node_id) {
Some(pos) => pos,
None => return HotSwapResult::NodeNotFound,
};
let existing = &self.nodes[existing_pos];
let old_sig = existing.port_signature();
let new_sig = replacement.port_signature();
if !old_sig.is_compatible_with(&new_sig) {
let reason = format!(
"port mismatch: existing node has {} input(s) / {} output(s), \
replacement has {} input(s) / {} output(s)",
old_sig.num_inputs, old_sig.num_outputs, new_sig.num_inputs, new_sig.num_outputs,
);
return HotSwapResult::IncompatiblePorts { reason };
}
replacement.id = node_id;
self.nodes[existing_pos] = replacement;
HotSwapResult::Success
}
pub fn lock(&mut self) {
self.is_locked = true;
}
pub fn unlock(&mut self) {
self.is_locked = false;
}
#[must_use]
pub fn is_locked(&self) -> bool {
self.is_locked
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processing_graph::{GraphEdge, GraphNode, NodeType, ProcessingGraph};
fn build_linear_graph() -> (ProcessingGraph, u64, u64, u64) {
let mut g = ProcessingGraph::new();
g.add_node(GraphNode::new(1, "source", NodeType::Source));
g.add_node(GraphNode::new(2, "filter", NodeType::Filter));
g.add_node(GraphNode::new(3, "sink", NodeType::Sink));
g.connect(1, 0, 2, 0);
g.connect(2, 0, 3, 0);
(g, 1, 2, 3)
}
#[test]
fn test_hot_swap_compatible_nodes() {
let (mut graph, source_id, filter_id, sink_id) = build_linear_graph();
let replacement = GraphNode::new(filter_id, "filter_v2", NodeType::Filter);
let result = graph.hot_swap_node(filter_id, replacement);
assert_eq!(result, HotSwapResult::Success);
let swapped = graph.nodes.iter().find(|n| n.id == filter_id);
assert!(swapped.is_some(), "node must still exist after swap");
assert_eq!(swapped.expect("checked above").name, "filter_v2");
assert_eq!(graph.edges.len(), 2);
assert!(graph
.edges
.iter()
.any(|e| e.from_node == source_id && e.to_node == filter_id));
assert!(graph
.edges
.iter()
.any(|e| e.from_node == filter_id && e.to_node == sink_id));
let order = graph.execution_order();
assert_eq!(order, vec![source_id, filter_id, sink_id]);
}
#[test]
fn test_hot_swap_incompatible_ports() {
let (mut graph, _source_id, filter_id, _sink_id) = build_linear_graph();
let bad_replacement = GraphNode::new(filter_id, "source_impostor", NodeType::Source);
let result = graph.hot_swap_node(filter_id, bad_replacement);
match result {
HotSwapResult::IncompatiblePorts { reason } => {
assert!(
reason.contains("input"),
"reason should mention input mismatch, got: {reason}"
);
}
other => panic!("expected IncompatiblePorts, got {other:?}"),
}
}
#[test]
fn test_hot_swap_node_not_found() {
let (mut graph, _s, _f, _k) = build_linear_graph();
let ghost = GraphNode::new(99, "ghost", NodeType::Filter);
let result = graph.hot_swap_node(99, ghost);
assert_eq!(result, HotSwapResult::NodeNotFound);
}
#[test]
fn test_hot_swap_preserves_connections() {
let (mut graph, source_id, filter_id, sink_id) = build_linear_graph();
let edges_before: Vec<GraphEdge> = graph.edges.clone();
let replacement = GraphNode::new(filter_id, "optimised_filter", NodeType::Filter);
let result = graph.hot_swap_node(filter_id, replacement);
assert_eq!(result, HotSwapResult::Success);
assert_eq!(graph.edges.len(), edges_before.len());
for edge in &edges_before {
assert!(
graph.edges.contains(edge),
"edge {edge:?} must still be present after hot-swap"
);
}
let order = graph.execution_order();
let source_pos = order
.iter()
.position(|&id| id == source_id)
.expect("source in order");
let filter_pos = order
.iter()
.position(|&id| id == filter_id)
.expect("filter in order");
let sink_pos = order
.iter()
.position(|&id| id == sink_id)
.expect("sink in order");
assert!(source_pos < filter_pos, "source must precede filter");
assert!(filter_pos < sink_pos, "filter must precede sink");
}
#[test]
fn test_hot_swap_graph_locked() {
let (mut graph, _source_id, filter_id, _sink_id) = build_linear_graph();
graph.lock();
let replacement = GraphNode::new(filter_id, "filter_during_exec", NodeType::Filter);
let result = graph.hot_swap_node(filter_id, replacement);
assert_eq!(result, HotSwapResult::GraphLocked);
graph.unlock();
let replacement2 = GraphNode::new(filter_id, "filter_after_unlock", NodeType::Filter);
let result2 = graph.hot_swap_node(filter_id, replacement2);
assert_eq!(result2, HotSwapResult::Success);
}
#[test]
fn test_port_signature_compatible_same_type() {
let filter_a = GraphNode::new(1, "a", NodeType::Filter);
let filter_b = GraphNode::new(2, "b", NodeType::Filter);
assert!(filter_a
.port_signature()
.is_compatible_with(&filter_b.port_signature()));
}
#[test]
fn test_port_signature_incompatible_different_types() {
let source = GraphNode::new(1, "src", NodeType::Source);
let filter = GraphNode::new(2, "flt", NodeType::Filter);
assert!(!source
.port_signature()
.is_compatible_with(&filter.port_signature()));
}
#[test]
fn test_format_family_extracts_prefix() {
assert_eq!(format_family("video/yuv420"), "video");
assert_eq!(format_family("audio/f32"), "audio");
assert_eq!(format_family("data"), "data");
assert_eq!(format_family("video"), "video");
}
#[test]
fn test_hot_swap_id_is_normalised() {
let (mut graph, _s, filter_id, _k) = build_linear_graph();
let replacement = GraphNode::new(999, "filter_new_id", NodeType::Filter);
let result = graph.hot_swap_node(filter_id, replacement);
assert_eq!(result, HotSwapResult::Success);
let node = graph.nodes.iter().find(|n| n.id == filter_id);
assert!(node.is_some(), "node must be found by the original ID");
assert_eq!(node.expect("checked above").name, "filter_new_id");
assert!(!graph.nodes.iter().any(|n| n.id == 999));
}
}