oximedia_graph/hot_swap.rs
1//! Dynamic graph reconfiguration: hot-swap nodes without disrupting connections.
2//!
3//! Hot-swapping allows replacing a processing node with a compatible alternative
4//! in O(1) time while preserving all upstream/downstream connections. Two nodes
5//! are compatible for hot-swap if they share the same port signature — identical
6//! port counts and compatible format families (`video/*`, `audio/*`, etc.).
7//!
8//! # Example
9//!
10//! ```
11//! use oximedia_graph::processing_graph::{ProcessingGraph, GraphNode, NodeType};
12//! use oximedia_graph::hot_swap::{HotSwappable, HotSwapResult, PortSignature};
13//!
14//! let mut graph = ProcessingGraph::new();
15//! graph.add_node(GraphNode::new(1, "filter_a", NodeType::Filter));
16//! graph.add_node(GraphNode::new(2, "source_1", NodeType::Source));
17//! graph.add_node(GraphNode::new(3, "sink_1", NodeType::Sink));
18//! graph.connect(2, 0, 1, 0);
19//! graph.connect(1, 0, 3, 0);
20//!
21//! // Replace filter_a with a compatible filter_b.
22//! let replacement = GraphNode::new(1, "filter_b", NodeType::Filter);
23//! let result = graph.hot_swap_node(1, replacement);
24//! assert_eq!(result, HotSwapResult::Success);
25//! ```
26
27use crate::processing_graph::{GraphNode, NodeType, ProcessingGraph};
28
29// ── Port signature ────────────────────────────────────────────────────────────
30
31/// Describes the port surface of a node for hot-swap compatibility checking.
32///
33/// Two nodes are swap-compatible when their input counts, output counts, and
34/// format *families* all match. Format families are the coarse type prefix,
35/// e.g. `"video"`, `"audio"`, or `"data"`.
36#[derive(Debug, Clone, PartialEq)]
37pub struct PortSignature {
38 /// Number of logical input ports.
39 pub num_inputs: usize,
40 /// Number of logical output ports.
41 pub num_outputs: usize,
42 /// Format family tags for each input port (e.g. `"video"`, `"audio"`).
43 pub input_formats: Vec<String>,
44 /// Format family tags for each output port.
45 pub output_formats: Vec<String>,
46}
47
48impl PortSignature {
49 /// Returns `true` when `self` and `other` have the same port counts and
50 /// compatible format families on every corresponding port.
51 #[must_use]
52 pub fn is_compatible_with(&self, other: &PortSignature) -> bool {
53 if self.num_inputs != other.num_inputs || self.num_outputs != other.num_outputs {
54 return false;
55 }
56 let inputs_ok = self
57 .input_formats
58 .iter()
59 .zip(&other.input_formats)
60 .all(|(a, b)| format_family(a) == format_family(b));
61 let outputs_ok = self
62 .output_formats
63 .iter()
64 .zip(&other.output_formats)
65 .all(|(a, b)| format_family(a) == format_family(b));
66 inputs_ok && outputs_ok
67 }
68}
69
70/// Extracts the format *family* from a slash-delimited MIME-style tag.
71///
72/// `"video/yuv420"` → `"video"`, `"audio/f32"` → `"audio"`, `"data"` → `"data"`.
73#[must_use]
74pub fn format_family(fmt: &str) -> &str {
75 // Split at the first `/`; if none, the whole string is the family.
76 fmt.split('/').next().unwrap_or(fmt)
77}
78
79// ── HotSwappable trait ────────────────────────────────────────────────────────
80
81/// Nodes that can participate in hot-swap must be able to report their port
82/// signature.
83pub trait HotSwappable {
84 /// Returns the [`PortSignature`] describing this node's connection surface.
85 fn port_signature(&self) -> PortSignature;
86}
87
88/// Derive a format-family tag from a [`NodeType`].
89///
90/// All node types in the current implementation carry video-family ports by
91/// default (a media-processing graph). Mixer/Splitter nodes are also
92/// video-family. This can be extended in the future when typed ports are
93/// annotated on `GraphNode` itself.
94fn default_format_tag(_node_type: &NodeType) -> &'static str {
95 "video"
96}
97
98impl HotSwappable for GraphNode {
99 fn port_signature(&self) -> PortSignature {
100 let max_in = self.node_type.max_inputs();
101 let max_out = self.node_type.max_outputs();
102 let tag = default_format_tag(&self.node_type);
103
104 PortSignature {
105 num_inputs: max_in,
106 num_outputs: max_out,
107 input_formats: vec![tag.to_string(); max_in],
108 output_formats: vec![tag.to_string(); max_out],
109 }
110 }
111}
112
113// ── HotSwapResult ─────────────────────────────────────────────────────────────
114
115/// Outcome of a [`ProcessingGraph::hot_swap_node`] call.
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum HotSwapResult {
118 /// The node was replaced successfully; all existing connections are intact.
119 Success,
120 /// The replacement node's port signature is incompatible with the current
121 /// node's signature. The `reason` field contains a human-readable
122 /// explanation.
123 IncompatiblePorts {
124 /// Explanation of why the signatures do not match.
125 reason: String,
126 },
127 /// No node with the given ID exists in the graph.
128 NodeNotFound,
129 /// The graph is currently executing; the swap cannot be performed safely.
130 GraphLocked,
131}
132
133// ── ProcessingGraph hot-swap extension ───────────────────────────────────────
134
135impl ProcessingGraph {
136 /// Replace the node identified by `node_id` with `replacement`.
137 ///
138 /// All edges connected to `node_id` are preserved — only the node's
139 /// internal data (name, params, type) is swapped. The swap is refused if:
140 ///
141 /// * `node_id` is not present in the graph → [`HotSwapResult::NodeNotFound`]
142 /// * the graph is locked (executing) → [`HotSwapResult::GraphLocked`]
143 /// * the port signatures are incompatible → [`HotSwapResult::IncompatiblePorts`]
144 ///
145 /// When [`HotSwapResult::Success`] is returned the replacement node's `id`
146 /// field is forced to `node_id` so that all edge references remain valid.
147 ///
148 /// # Complexity
149 ///
150 /// O(n) where n is the number of nodes (linear scan to locate the node
151 /// slot). Edge preservation is trivially O(1) because edges reference node
152 /// IDs, not positions; no edge data is modified.
153 pub fn hot_swap_node(&mut self, node_id: u64, mut replacement: GraphNode) -> HotSwapResult {
154 // ── 1. Graph-locked guard ──────────────────────────────────────────
155 if self.is_locked {
156 return HotSwapResult::GraphLocked;
157 }
158
159 // ── 2. Look up existing node ───────────────────────────────────────
160 let existing_pos = match self.nodes.iter().position(|n| n.id == node_id) {
161 Some(pos) => pos,
162 None => return HotSwapResult::NodeNotFound,
163 };
164
165 let existing = &self.nodes[existing_pos];
166
167 // ── 3. Port-signature compatibility check ──────────────────────────
168 let old_sig = existing.port_signature();
169 let new_sig = replacement.port_signature();
170
171 if !old_sig.is_compatible_with(&new_sig) {
172 let reason = format!(
173 "port mismatch: existing node has {} input(s) / {} output(s), \
174 replacement has {} input(s) / {} output(s)",
175 old_sig.num_inputs, old_sig.num_outputs, new_sig.num_inputs, new_sig.num_outputs,
176 );
177 return HotSwapResult::IncompatiblePorts { reason };
178 }
179
180 // ── 4. Perform the swap ────────────────────────────────────────────
181 // Force the replacement's ID to match `node_id` so all edge
182 // references (which store the numeric node ID) remain valid.
183 replacement.id = node_id;
184 self.nodes[existing_pos] = replacement;
185
186 HotSwapResult::Success
187 }
188
189 /// Lock the graph to simulate an executing state (prevents hot-swap).
190 ///
191 /// Call [`ProcessingGraph::unlock`] when execution completes.
192 pub fn lock(&mut self) {
193 self.is_locked = true;
194 }
195
196 /// Unlock the graph after execution completes.
197 pub fn unlock(&mut self) {
198 self.is_locked = false;
199 }
200
201 /// Returns `true` if the graph is currently locked (executing).
202 #[must_use]
203 pub fn is_locked(&self) -> bool {
204 self.is_locked
205 }
206}
207
208// ─────────────────────────────────────────────────────────────────────────────
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use crate::processing_graph::{GraphEdge, GraphNode, NodeType, ProcessingGraph};
213
214 // ── Helpers ───────────────────────────────────────────────────────────────
215
216 fn build_linear_graph() -> (ProcessingGraph, u64, u64, u64) {
217 // source(1) → filter(2) → sink(3)
218 let mut g = ProcessingGraph::new();
219 g.add_node(GraphNode::new(1, "source", NodeType::Source));
220 g.add_node(GraphNode::new(2, "filter", NodeType::Filter));
221 g.add_node(GraphNode::new(3, "sink", NodeType::Sink));
222 g.connect(1, 0, 2, 0);
223 g.connect(2, 0, 3, 0);
224 (g, 1, 2, 3)
225 }
226
227 // ── test_hot_swap_compatible_nodes ────────────────────────────────────────
228
229 /// Swap a Filter node with another Filter node (same port signature).
230 /// After the swap the graph structure must remain intact and execution
231 /// order must be preserved.
232 #[test]
233 fn test_hot_swap_compatible_nodes() {
234 let (mut graph, source_id, filter_id, sink_id) = build_linear_graph();
235
236 let replacement = GraphNode::new(filter_id, "filter_v2", NodeType::Filter);
237 let result = graph.hot_swap_node(filter_id, replacement);
238
239 assert_eq!(result, HotSwapResult::Success);
240
241 // The swapped node must carry the replacement's name.
242 let swapped = graph.nodes.iter().find(|n| n.id == filter_id);
243 assert!(swapped.is_some(), "node must still exist after swap");
244 assert_eq!(swapped.expect("checked above").name, "filter_v2");
245
246 // Edges must be intact.
247 assert_eq!(graph.edges.len(), 2);
248 assert!(graph
249 .edges
250 .iter()
251 .any(|e| e.from_node == source_id && e.to_node == filter_id));
252 assert!(graph
253 .edges
254 .iter()
255 .any(|e| e.from_node == filter_id && e.to_node == sink_id));
256
257 // Topological execution order must still be [1, 2, 3].
258 let order = graph.execution_order();
259 assert_eq!(order, vec![source_id, filter_id, sink_id]);
260 }
261
262 // ── test_hot_swap_incompatible_ports ──────────────────────────────────────
263
264 /// Attempting to swap a 1-input Filter with a 0-input Source must fail with
265 /// IncompatiblePorts.
266 #[test]
267 fn test_hot_swap_incompatible_ports() {
268 let (mut graph, _source_id, filter_id, _sink_id) = build_linear_graph();
269
270 // Source has 0 inputs / 1 output; Filter has 1 input / 1 output.
271 let bad_replacement = GraphNode::new(filter_id, "source_impostor", NodeType::Source);
272 let result = graph.hot_swap_node(filter_id, bad_replacement);
273
274 match result {
275 HotSwapResult::IncompatiblePorts { reason } => {
276 // Reason should mention the mismatch.
277 assert!(
278 reason.contains("input"),
279 "reason should mention input mismatch, got: {reason}"
280 );
281 }
282 other => panic!("expected IncompatiblePorts, got {other:?}"),
283 }
284 }
285
286 // ── test_hot_swap_node_not_found ──────────────────────────────────────────
287
288 /// Swapping a non-existent node ID must return NodeNotFound.
289 #[test]
290 fn test_hot_swap_node_not_found() {
291 let (mut graph, _s, _f, _k) = build_linear_graph();
292
293 let ghost = GraphNode::new(99, "ghost", NodeType::Filter);
294 let result = graph.hot_swap_node(99, ghost);
295
296 assert_eq!(result, HotSwapResult::NodeNotFound);
297 }
298
299 // ── test_hot_swap_preserves_connections ───────────────────────────────────
300
301 /// After a successful swap the edges referencing the swapped node must
302 /// still be valid and the graph must execute in the correct order.
303 #[test]
304 fn test_hot_swap_preserves_connections() {
305 let (mut graph, source_id, filter_id, sink_id) = build_linear_graph();
306
307 // Swap filter(2) with another filter; the edge set must not change.
308 let edges_before: Vec<GraphEdge> = graph.edges.clone();
309 let replacement = GraphNode::new(filter_id, "optimised_filter", NodeType::Filter);
310 let result = graph.hot_swap_node(filter_id, replacement);
311 assert_eq!(result, HotSwapResult::Success);
312
313 // Edge set must be identical in both count and content.
314 assert_eq!(graph.edges.len(), edges_before.len());
315 for edge in &edges_before {
316 assert!(
317 graph.edges.contains(edge),
318 "edge {edge:?} must still be present after hot-swap"
319 );
320 }
321
322 // Execution order must still route source → filter → sink.
323 let order = graph.execution_order();
324 let source_pos = order
325 .iter()
326 .position(|&id| id == source_id)
327 .expect("source in order");
328 let filter_pos = order
329 .iter()
330 .position(|&id| id == filter_id)
331 .expect("filter in order");
332 let sink_pos = order
333 .iter()
334 .position(|&id| id == sink_id)
335 .expect("sink in order");
336 assert!(source_pos < filter_pos, "source must precede filter");
337 assert!(filter_pos < sink_pos, "filter must precede sink");
338 }
339
340 // ── test_hot_swap_graph_locked ────────────────────────────────────────────
341
342 /// While the graph is locked (executing), hot-swap must be refused.
343 #[test]
344 fn test_hot_swap_graph_locked() {
345 let (mut graph, _source_id, filter_id, _sink_id) = build_linear_graph();
346
347 graph.lock();
348 let replacement = GraphNode::new(filter_id, "filter_during_exec", NodeType::Filter);
349 let result = graph.hot_swap_node(filter_id, replacement);
350 assert_eq!(result, HotSwapResult::GraphLocked);
351
352 // After unlocking the swap must succeed.
353 graph.unlock();
354 let replacement2 = GraphNode::new(filter_id, "filter_after_unlock", NodeType::Filter);
355 let result2 = graph.hot_swap_node(filter_id, replacement2);
356 assert_eq!(result2, HotSwapResult::Success);
357 }
358
359 // ── Port-signature unit tests ─────────────────────────────────────────────
360
361 #[test]
362 fn test_port_signature_compatible_same_type() {
363 let filter_a = GraphNode::new(1, "a", NodeType::Filter);
364 let filter_b = GraphNode::new(2, "b", NodeType::Filter);
365 assert!(filter_a
366 .port_signature()
367 .is_compatible_with(&filter_b.port_signature()));
368 }
369
370 #[test]
371 fn test_port_signature_incompatible_different_types() {
372 let source = GraphNode::new(1, "src", NodeType::Source);
373 let filter = GraphNode::new(2, "flt", NodeType::Filter);
374 // Source: 0 inputs / 1 output; Filter: 1 input / 1 output → incompatible.
375 assert!(!source
376 .port_signature()
377 .is_compatible_with(&filter.port_signature()));
378 }
379
380 #[test]
381 fn test_format_family_extracts_prefix() {
382 assert_eq!(format_family("video/yuv420"), "video");
383 assert_eq!(format_family("audio/f32"), "audio");
384 assert_eq!(format_family("data"), "data");
385 assert_eq!(format_family("video"), "video");
386 }
387
388 #[test]
389 fn test_hot_swap_id_is_normalised() {
390 // Even if the replacement carries a different numeric ID, after a
391 // successful swap the node must have the target ID.
392 let (mut graph, _s, filter_id, _k) = build_linear_graph();
393
394 // Deliberately give a different ID to the replacement.
395 let replacement = GraphNode::new(999, "filter_new_id", NodeType::Filter);
396 let result = graph.hot_swap_node(filter_id, replacement);
397 assert_eq!(result, HotSwapResult::Success);
398
399 let node = graph.nodes.iter().find(|n| n.id == filter_id);
400 assert!(node.is_some(), "node must be found by the original ID");
401 assert_eq!(node.expect("checked above").name, "filter_new_id");
402
403 // ID 999 must NOT appear anywhere in the node list.
404 assert!(!graph.nodes.iter().any(|n| n.id == 999));
405 }
406}