Skip to main content

oximedia_graph/
hot_swap.rs

1//! Dynamic graph reconfiguration: hot-swap nodes without disrupting connections.
2//!
3//! Hot-swapping allows replacing a processing node with a compatible alternative
4//! in O(1) time while preserving all upstream/downstream connections. Two nodes
5//! are compatible for hot-swap if they share the same port signature — identical
6//! port counts and compatible format families (`video/*`, `audio/*`, etc.).
7//!
8//! # Example
9//!
10//! ```
11//! use oximedia_graph::processing_graph::{ProcessingGraph, GraphNode, NodeType};
12//! use oximedia_graph::hot_swap::{HotSwappable, HotSwapResult, PortSignature};
13//!
14//! let mut graph = ProcessingGraph::new();
15//! graph.add_node(GraphNode::new(1, "filter_a", NodeType::Filter));
16//! graph.add_node(GraphNode::new(2, "source_1", NodeType::Source));
17//! graph.add_node(GraphNode::new(3, "sink_1", NodeType::Sink));
18//! graph.connect(2, 0, 1, 0);
19//! graph.connect(1, 0, 3, 0);
20//!
21//! // Replace filter_a with a compatible filter_b.
22//! let replacement = GraphNode::new(1, "filter_b", NodeType::Filter);
23//! let result = graph.hot_swap_node(1, replacement);
24//! assert_eq!(result, HotSwapResult::Success);
25//! ```
26
27use crate::processing_graph::{GraphNode, NodeType, ProcessingGraph};
28
29// ── Port signature ────────────────────────────────────────────────────────────
30
31/// Describes the port surface of a node for hot-swap compatibility checking.
32///
33/// Two nodes are swap-compatible when their input counts, output counts, and
34/// format *families* all match. Format families are the coarse type prefix,
35/// e.g. `"video"`, `"audio"`, or `"data"`.
36#[derive(Debug, Clone, PartialEq)]
37pub struct PortSignature {
38    /// Number of logical input ports.
39    pub num_inputs: usize,
40    /// Number of logical output ports.
41    pub num_outputs: usize,
42    /// Format family tags for each input port (e.g. `"video"`, `"audio"`).
43    pub input_formats: Vec<String>,
44    /// Format family tags for each output port.
45    pub output_formats: Vec<String>,
46}
47
48impl PortSignature {
49    /// Returns `true` when `self` and `other` have the same port counts and
50    /// compatible format families on every corresponding port.
51    #[must_use]
52    pub fn is_compatible_with(&self, other: &PortSignature) -> bool {
53        if self.num_inputs != other.num_inputs || self.num_outputs != other.num_outputs {
54            return false;
55        }
56        let inputs_ok = self
57            .input_formats
58            .iter()
59            .zip(&other.input_formats)
60            .all(|(a, b)| format_family(a) == format_family(b));
61        let outputs_ok = self
62            .output_formats
63            .iter()
64            .zip(&other.output_formats)
65            .all(|(a, b)| format_family(a) == format_family(b));
66        inputs_ok && outputs_ok
67    }
68}
69
70/// Extracts the format *family* from a slash-delimited MIME-style tag.
71///
72/// `"video/yuv420"` → `"video"`, `"audio/f32"` → `"audio"`, `"data"` → `"data"`.
73#[must_use]
74pub fn format_family(fmt: &str) -> &str {
75    // Split at the first `/`; if none, the whole string is the family.
76    fmt.split('/').next().unwrap_or(fmt)
77}
78
79// ── HotSwappable trait ────────────────────────────────────────────────────────
80
81/// Nodes that can participate in hot-swap must be able to report their port
82/// signature.
83pub trait HotSwappable {
84    /// Returns the [`PortSignature`] describing this node's connection surface.
85    fn port_signature(&self) -> PortSignature;
86}
87
88/// Derive a format-family tag from a [`NodeType`].
89///
90/// All node types in the current implementation carry video-family ports by
91/// default (a media-processing graph). Mixer/Splitter nodes are also
92/// video-family. This can be extended in the future when typed ports are
93/// annotated on `GraphNode` itself.
94fn default_format_tag(_node_type: &NodeType) -> &'static str {
95    "video"
96}
97
98impl HotSwappable for GraphNode {
99    fn port_signature(&self) -> PortSignature {
100        let max_in = self.node_type.max_inputs();
101        let max_out = self.node_type.max_outputs();
102        let tag = default_format_tag(&self.node_type);
103
104        PortSignature {
105            num_inputs: max_in,
106            num_outputs: max_out,
107            input_formats: vec![tag.to_string(); max_in],
108            output_formats: vec![tag.to_string(); max_out],
109        }
110    }
111}
112
113// ── HotSwapResult ─────────────────────────────────────────────────────────────
114
115/// Outcome of a [`ProcessingGraph::hot_swap_node`] call.
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum HotSwapResult {
118    /// The node was replaced successfully; all existing connections are intact.
119    Success,
120    /// The replacement node's port signature is incompatible with the current
121    /// node's signature. The `reason` field contains a human-readable
122    /// explanation.
123    IncompatiblePorts {
124        /// Explanation of why the signatures do not match.
125        reason: String,
126    },
127    /// No node with the given ID exists in the graph.
128    NodeNotFound,
129    /// The graph is currently executing; the swap cannot be performed safely.
130    GraphLocked,
131}
132
133// ── ProcessingGraph hot-swap extension ───────────────────────────────────────
134
135impl ProcessingGraph {
136    /// Replace the node identified by `node_id` with `replacement`.
137    ///
138    /// All edges connected to `node_id` are preserved — only the node's
139    /// internal data (name, params, type) is swapped. The swap is refused if:
140    ///
141    /// * `node_id` is not present in the graph → [`HotSwapResult::NodeNotFound`]
142    /// * the graph is locked (executing) → [`HotSwapResult::GraphLocked`]
143    /// * the port signatures are incompatible → [`HotSwapResult::IncompatiblePorts`]
144    ///
145    /// When [`HotSwapResult::Success`] is returned the replacement node's `id`
146    /// field is forced to `node_id` so that all edge references remain valid.
147    ///
148    /// # Complexity
149    ///
150    /// O(n) where n is the number of nodes (linear scan to locate the node
151    /// slot). Edge preservation is trivially O(1) because edges reference node
152    /// IDs, not positions; no edge data is modified.
153    pub fn hot_swap_node(&mut self, node_id: u64, mut replacement: GraphNode) -> HotSwapResult {
154        // ── 1. Graph-locked guard ──────────────────────────────────────────
155        if self.is_locked {
156            return HotSwapResult::GraphLocked;
157        }
158
159        // ── 2. Look up existing node ───────────────────────────────────────
160        let existing_pos = match self.nodes.iter().position(|n| n.id == node_id) {
161            Some(pos) => pos,
162            None => return HotSwapResult::NodeNotFound,
163        };
164
165        let existing = &self.nodes[existing_pos];
166
167        // ── 3. Port-signature compatibility check ──────────────────────────
168        let old_sig = existing.port_signature();
169        let new_sig = replacement.port_signature();
170
171        if !old_sig.is_compatible_with(&new_sig) {
172            let reason = format!(
173                "port mismatch: existing node has {} input(s) / {} output(s), \
174                 replacement has {} input(s) / {} output(s)",
175                old_sig.num_inputs, old_sig.num_outputs, new_sig.num_inputs, new_sig.num_outputs,
176            );
177            return HotSwapResult::IncompatiblePorts { reason };
178        }
179
180        // ── 4. Perform the swap ────────────────────────────────────────────
181        // Force the replacement's ID to match `node_id` so all edge
182        // references (which store the numeric node ID) remain valid.
183        replacement.id = node_id;
184        self.nodes[existing_pos] = replacement;
185
186        HotSwapResult::Success
187    }
188
189    /// Lock the graph to simulate an executing state (prevents hot-swap).
190    ///
191    /// Call [`ProcessingGraph::unlock`] when execution completes.
192    pub fn lock(&mut self) {
193        self.is_locked = true;
194    }
195
196    /// Unlock the graph after execution completes.
197    pub fn unlock(&mut self) {
198        self.is_locked = false;
199    }
200
201    /// Returns `true` if the graph is currently locked (executing).
202    #[must_use]
203    pub fn is_locked(&self) -> bool {
204        self.is_locked
205    }
206}
207
208// ─────────────────────────────────────────────────────────────────────────────
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::processing_graph::{GraphEdge, GraphNode, NodeType, ProcessingGraph};
213
214    // ── Helpers ───────────────────────────────────────────────────────────────
215
216    fn build_linear_graph() -> (ProcessingGraph, u64, u64, u64) {
217        // source(1) → filter(2) → sink(3)
218        let mut g = ProcessingGraph::new();
219        g.add_node(GraphNode::new(1, "source", NodeType::Source));
220        g.add_node(GraphNode::new(2, "filter", NodeType::Filter));
221        g.add_node(GraphNode::new(3, "sink", NodeType::Sink));
222        g.connect(1, 0, 2, 0);
223        g.connect(2, 0, 3, 0);
224        (g, 1, 2, 3)
225    }
226
227    // ── test_hot_swap_compatible_nodes ────────────────────────────────────────
228
229    /// Swap a Filter node with another Filter node (same port signature).
230    /// After the swap the graph structure must remain intact and execution
231    /// order must be preserved.
232    #[test]
233    fn test_hot_swap_compatible_nodes() {
234        let (mut graph, source_id, filter_id, sink_id) = build_linear_graph();
235
236        let replacement = GraphNode::new(filter_id, "filter_v2", NodeType::Filter);
237        let result = graph.hot_swap_node(filter_id, replacement);
238
239        assert_eq!(result, HotSwapResult::Success);
240
241        // The swapped node must carry the replacement's name.
242        let swapped = graph.nodes.iter().find(|n| n.id == filter_id);
243        assert!(swapped.is_some(), "node must still exist after swap");
244        assert_eq!(swapped.expect("checked above").name, "filter_v2");
245
246        // Edges must be intact.
247        assert_eq!(graph.edges.len(), 2);
248        assert!(graph
249            .edges
250            .iter()
251            .any(|e| e.from_node == source_id && e.to_node == filter_id));
252        assert!(graph
253            .edges
254            .iter()
255            .any(|e| e.from_node == filter_id && e.to_node == sink_id));
256
257        // Topological execution order must still be [1, 2, 3].
258        let order = graph.execution_order();
259        assert_eq!(order, vec![source_id, filter_id, sink_id]);
260    }
261
262    // ── test_hot_swap_incompatible_ports ──────────────────────────────────────
263
264    /// Attempting to swap a 1-input Filter with a 0-input Source must fail with
265    /// IncompatiblePorts.
266    #[test]
267    fn test_hot_swap_incompatible_ports() {
268        let (mut graph, _source_id, filter_id, _sink_id) = build_linear_graph();
269
270        // Source has 0 inputs / 1 output; Filter has 1 input / 1 output.
271        let bad_replacement = GraphNode::new(filter_id, "source_impostor", NodeType::Source);
272        let result = graph.hot_swap_node(filter_id, bad_replacement);
273
274        match result {
275            HotSwapResult::IncompatiblePorts { reason } => {
276                // Reason should mention the mismatch.
277                assert!(
278                    reason.contains("input"),
279                    "reason should mention input mismatch, got: {reason}"
280                );
281            }
282            other => panic!("expected IncompatiblePorts, got {other:?}"),
283        }
284    }
285
286    // ── test_hot_swap_node_not_found ──────────────────────────────────────────
287
288    /// Swapping a non-existent node ID must return NodeNotFound.
289    #[test]
290    fn test_hot_swap_node_not_found() {
291        let (mut graph, _s, _f, _k) = build_linear_graph();
292
293        let ghost = GraphNode::new(99, "ghost", NodeType::Filter);
294        let result = graph.hot_swap_node(99, ghost);
295
296        assert_eq!(result, HotSwapResult::NodeNotFound);
297    }
298
299    // ── test_hot_swap_preserves_connections ───────────────────────────────────
300
301    /// After a successful swap the edges referencing the swapped node must
302    /// still be valid and the graph must execute in the correct order.
303    #[test]
304    fn test_hot_swap_preserves_connections() {
305        let (mut graph, source_id, filter_id, sink_id) = build_linear_graph();
306
307        // Swap filter(2) with another filter; the edge set must not change.
308        let edges_before: Vec<GraphEdge> = graph.edges.clone();
309        let replacement = GraphNode::new(filter_id, "optimised_filter", NodeType::Filter);
310        let result = graph.hot_swap_node(filter_id, replacement);
311        assert_eq!(result, HotSwapResult::Success);
312
313        // Edge set must be identical in both count and content.
314        assert_eq!(graph.edges.len(), edges_before.len());
315        for edge in &edges_before {
316            assert!(
317                graph.edges.contains(edge),
318                "edge {edge:?} must still be present after hot-swap"
319            );
320        }
321
322        // Execution order must still route source → filter → sink.
323        let order = graph.execution_order();
324        let source_pos = order
325            .iter()
326            .position(|&id| id == source_id)
327            .expect("source in order");
328        let filter_pos = order
329            .iter()
330            .position(|&id| id == filter_id)
331            .expect("filter in order");
332        let sink_pos = order
333            .iter()
334            .position(|&id| id == sink_id)
335            .expect("sink in order");
336        assert!(source_pos < filter_pos, "source must precede filter");
337        assert!(filter_pos < sink_pos, "filter must precede sink");
338    }
339
340    // ── test_hot_swap_graph_locked ────────────────────────────────────────────
341
342    /// While the graph is locked (executing), hot-swap must be refused.
343    #[test]
344    fn test_hot_swap_graph_locked() {
345        let (mut graph, _source_id, filter_id, _sink_id) = build_linear_graph();
346
347        graph.lock();
348        let replacement = GraphNode::new(filter_id, "filter_during_exec", NodeType::Filter);
349        let result = graph.hot_swap_node(filter_id, replacement);
350        assert_eq!(result, HotSwapResult::GraphLocked);
351
352        // After unlocking the swap must succeed.
353        graph.unlock();
354        let replacement2 = GraphNode::new(filter_id, "filter_after_unlock", NodeType::Filter);
355        let result2 = graph.hot_swap_node(filter_id, replacement2);
356        assert_eq!(result2, HotSwapResult::Success);
357    }
358
359    // ── Port-signature unit tests ─────────────────────────────────────────────
360
361    #[test]
362    fn test_port_signature_compatible_same_type() {
363        let filter_a = GraphNode::new(1, "a", NodeType::Filter);
364        let filter_b = GraphNode::new(2, "b", NodeType::Filter);
365        assert!(filter_a
366            .port_signature()
367            .is_compatible_with(&filter_b.port_signature()));
368    }
369
370    #[test]
371    fn test_port_signature_incompatible_different_types() {
372        let source = GraphNode::new(1, "src", NodeType::Source);
373        let filter = GraphNode::new(2, "flt", NodeType::Filter);
374        // Source: 0 inputs / 1 output; Filter: 1 input / 1 output → incompatible.
375        assert!(!source
376            .port_signature()
377            .is_compatible_with(&filter.port_signature()));
378    }
379
380    #[test]
381    fn test_format_family_extracts_prefix() {
382        assert_eq!(format_family("video/yuv420"), "video");
383        assert_eq!(format_family("audio/f32"), "audio");
384        assert_eq!(format_family("data"), "data");
385        assert_eq!(format_family("video"), "video");
386    }
387
388    #[test]
389    fn test_hot_swap_id_is_normalised() {
390        // Even if the replacement carries a different numeric ID, after a
391        // successful swap the node must have the target ID.
392        let (mut graph, _s, filter_id, _k) = build_linear_graph();
393
394        // Deliberately give a different ID to the replacement.
395        let replacement = GraphNode::new(999, "filter_new_id", NodeType::Filter);
396        let result = graph.hot_swap_node(filter_id, replacement);
397        assert_eq!(result, HotSwapResult::Success);
398
399        let node = graph.nodes.iter().find(|n| n.id == filter_id);
400        assert!(node.is_some(), "node must be found by the original ID");
401        assert_eq!(node.expect("checked above").name, "filter_new_id");
402
403        // ID 999 must NOT appear anywhere in the node list.
404        assert!(!graph.nodes.iter().any(|n| n.id == 999));
405    }
406}