mecha10_core/dependency.rs
1//! Dependency Resolution for Node Startup/Shutdown Ordering
2//!
3//! Provides topological sorting and dependency graph analysis to ensure nodes
4//! start and stop in the correct order based on their dependencies.
5//!
6//! # Use Cases
7//!
8//! - Start nodes in dependency order (dependencies before dependents)
9//! - Stop nodes in reverse order (dependents before dependencies)
10//! - Detect circular dependencies
11//! - Validate dependency references
12//! - Group nodes by dependency level for parallel startup
13//!
14//! # Example
15//!
16//! ```rust
17//! use mecha10::dependency::{DependencyGraph, StartupPlan};
18//! use mecha10::types::NodeConfig;
19//!
20//! # async fn example() -> mecha10::Result<()> {
21//! let nodes = vec![
22//! NodeConfig { id: "camera".into(), depends_on: vec![], /* ... */ },
23//! NodeConfig { id: "vision".into(), depends_on: vec!["camera".into()], /* ... */ },
24//! NodeConfig { id: "planner".into(), depends_on: vec!["vision".into()], /* ... */ },
25//! ];
26//!
27//! // Build dependency graph
28//! let graph = DependencyGraph::from_configs(&nodes)?;
29//!
30//! // Get startup order
31//! let startup_order = graph.topological_sort()?;
32//! // Result: ["camera", "vision", "planner"]
33//!
34//! // Get shutdown order (reverse)
35//! let shutdown_order = graph.reverse_topological_sort()?;
36//! // Result: ["planner", "vision", "camera"]
37//!
38//! // Get parallel startup plan (nodes grouped by level)
39//! let plan = graph.startup_plan()?;
40//! // Level 0: ["camera"] - no dependencies
41//! // Level 1: ["vision"] - depends on camera
42//! // Level 2: ["planner"] - depends on vision
43//! # Ok(())
44//! # }
45//! ```
46
47use crate::error::{Mecha10Error, Result};
48use crate::types::NodeConfig;
49use serde::{Deserialize, Serialize};
50use std::collections::{HashMap, HashSet, VecDeque};
51use tracing::{debug, info};
52
53// ============================================================================
54// Dependency Graph
55// ============================================================================
56
57/// Dependency graph for node relationships
58///
59/// Maintains a directed graph of node dependencies where an edge from A to B
60/// means "A depends on B" (B must start before A).
61#[derive(Debug, Clone)]
62pub struct DependencyGraph {
63 /// Node dependencies: node_id -> vec of dependencies
64 dependencies: HashMap<String, Vec<String>>,
65
66 /// Reverse dependencies: node_id -> vec of dependents
67 dependents: HashMap<String, Vec<String>>,
68
69 /// All node IDs
70 nodes: HashSet<String>,
71}
72
73impl DependencyGraph {
74 /// Create a new empty dependency graph
75 pub fn new() -> Self {
76 Self {
77 dependencies: HashMap::new(),
78 dependents: HashMap::new(),
79 nodes: HashSet::new(),
80 }
81 }
82
83 /// Build dependency graph from node configurations
84 ///
85 /// # Arguments
86 ///
87 /// * `configs` - Slice of node configurations
88 ///
89 /// # Returns
90 ///
91 /// Dependency graph or error if dependencies are invalid
92 ///
93 /// # Errors
94 ///
95 /// - Circular dependencies detected
96 /// - Undefined dependency references
97 pub fn from_configs(configs: &[NodeConfig]) -> Result<Self> {
98 let mut graph = Self::new();
99
100 // Add all nodes first
101 for config in configs {
102 graph.add_node(&config.id);
103 }
104
105 // Add dependencies
106 for config in configs {
107 for dep in &config.depends_on {
108 graph.add_dependency(&config.id, dep)?;
109 }
110 }
111
112 // Validate graph
113 graph.validate()?;
114
115 Ok(graph)
116 }
117
118 /// Add a node to the graph
119 pub fn add_node(&mut self, node_id: &str) {
120 self.nodes.insert(node_id.to_string());
121 self.dependencies.entry(node_id.to_string()).or_default();
122 self.dependents.entry(node_id.to_string()).or_default();
123 }
124
125 /// Add a dependency relationship
126 ///
127 /// # Arguments
128 ///
129 /// * `node_id` - The dependent node
130 /// * `dependency` - The node that `node_id` depends on
131 ///
132 /// # Errors
133 ///
134 /// Returns error if dependency node doesn't exist
135 pub fn add_dependency(&mut self, node_id: &str, dependency: &str) -> Result<()> {
136 // Ensure both nodes exist
137 if !self.nodes.contains(node_id) {
138 return Err(Mecha10Error::Configuration(format!(
139 "Node '{}' not found in graph",
140 node_id
141 )));
142 }
143
144 if !self.nodes.contains(dependency) {
145 return Err(Mecha10Error::Configuration(format!(
146 "Dependency '{}' not found (required by '{}')",
147 dependency, node_id
148 )));
149 }
150
151 // Add to dependencies map
152 self.dependencies
153 .entry(node_id.to_string())
154 .or_default()
155 .push(dependency.to_string());
156
157 // Add to reverse dependencies map
158 self.dependents
159 .entry(dependency.to_string())
160 .or_default()
161 .push(node_id.to_string());
162
163 Ok(())
164 }
165
166 /// Validate the dependency graph
167 ///
168 /// Checks for:
169 /// - Circular dependencies
170 /// - Undefined dependency references
171 pub fn validate(&self) -> Result<()> {
172 // Check for circular dependencies
173 self.check_circular_dependencies()?;
174
175 // Check all dependencies exist
176 for (node_id, deps) in &self.dependencies {
177 for dep in deps {
178 if !self.nodes.contains(dep) {
179 return Err(Mecha10Error::Configuration(format!(
180 "Node '{}' depends on undefined node '{}'",
181 node_id, dep
182 )));
183 }
184 }
185 }
186
187 Ok(())
188 }
189
190 /// Check for circular dependencies using DFS
191 fn check_circular_dependencies(&self) -> Result<()> {
192 let mut visited = HashSet::new();
193 let mut rec_stack = HashSet::new();
194
195 for node in &self.nodes {
196 if !visited.contains(node) && self.has_cycle_dfs(node, &mut visited, &mut rec_stack)? {
197 return Err(Mecha10Error::Configuration(format!(
198 "Circular dependency detected involving node '{}'",
199 node
200 )));
201 }
202 }
203
204 Ok(())
205 }
206
207 /// DFS helper for cycle detection
208 fn has_cycle_dfs(
209 &self,
210 node: &str,
211 visited: &mut HashSet<String>,
212 rec_stack: &mut HashSet<String>,
213 ) -> Result<bool> {
214 visited.insert(node.to_string());
215 rec_stack.insert(node.to_string());
216
217 if let Some(deps) = self.dependencies.get(node) {
218 for dep in deps {
219 if !visited.contains(dep) {
220 if self.has_cycle_dfs(dep, visited, rec_stack)? {
221 return Ok(true);
222 }
223 } else if rec_stack.contains(dep) {
224 return Ok(true); // Cycle detected
225 }
226 }
227 }
228
229 rec_stack.remove(node);
230 Ok(false)
231 }
232
233 /// Perform topological sort using Kahn's algorithm
234 ///
235 /// Returns nodes in dependency order (dependencies before dependents).
236 /// This is the order for starting nodes.
237 ///
238 /// # Returns
239 ///
240 /// Vector of node IDs in topological order
241 ///
242 /// # Errors
243 ///
244 /// Returns error if graph has cycles
245 pub fn topological_sort(&self) -> Result<Vec<String>> {
246 let mut in_degree: HashMap<String, usize> = HashMap::new();
247 let mut result = Vec::new();
248 let mut queue = VecDeque::new();
249
250 // Calculate in-degree for each node
251 for node in &self.nodes {
252 let degree = self.dependencies.get(node).map(|d| d.len()).unwrap_or(0);
253 in_degree.insert(node.clone(), degree);
254
255 // Nodes with no dependencies can start immediately
256 if degree == 0 {
257 queue.push_back(node.clone());
258 }
259 }
260
261 // Process nodes with no dependencies
262 while let Some(node) = queue.pop_front() {
263 result.push(node.clone());
264
265 // Reduce in-degree for all dependents
266 if let Some(dependents) = self.dependents.get(&node) {
267 for dependent in dependents {
268 if let Some(degree) = in_degree.get_mut(dependent) {
269 *degree -= 1;
270 if *degree == 0 {
271 queue.push_back(dependent.clone());
272 }
273 }
274 }
275 }
276 }
277
278 // Check if all nodes were processed
279 if result.len() != self.nodes.len() {
280 return Err(Mecha10Error::Configuration(
281 "Circular dependency detected during topological sort".to_string(),
282 ));
283 }
284
285 debug!("Topological sort result: {:?}", result);
286 Ok(result)
287 }
288
289 /// Get reverse topological sort (for shutdown order)
290 ///
291 /// Returns nodes in reverse dependency order (dependents before dependencies).
292 /// This is the order for stopping nodes.
293 pub fn reverse_topological_sort(&self) -> Result<Vec<String>> {
294 let mut order = self.topological_sort()?;
295 order.reverse();
296 debug!("Reverse topological sort result: {:?}", order);
297 Ok(order)
298 }
299
300 /// Get startup plan with nodes grouped by dependency level
301 ///
302 /// Nodes at the same level have no dependencies on each other and can
303 /// be started in parallel.
304 ///
305 /// # Returns
306 ///
307 /// StartupPlan with nodes grouped by level
308 pub fn startup_plan(&self) -> Result<StartupPlan> {
309 let mut levels: Vec<Vec<String>> = Vec::new();
310 let mut processed = HashSet::new();
311 let mut current_level = Vec::new();
312
313 // Find nodes with no dependencies (level 0)
314 for node in &self.nodes {
315 if self.dependencies.get(node).map(|d| d.is_empty()).unwrap_or(true) {
316 current_level.push(node.clone());
317 processed.insert(node.clone());
318 }
319 }
320
321 if !current_level.is_empty() {
322 levels.push(current_level.clone());
323 }
324
325 // Process remaining levels
326 while processed.len() < self.nodes.len() {
327 let mut next_level = Vec::new();
328
329 for node in &self.nodes {
330 if processed.contains(node) {
331 continue;
332 }
333
334 // Check if all dependencies are processed
335 if let Some(deps) = self.dependencies.get(node) {
336 if deps.iter().all(|dep| processed.contains(dep)) {
337 next_level.push(node.clone());
338 processed.insert(node.clone());
339 }
340 }
341 }
342
343 if next_level.is_empty() && processed.len() < self.nodes.len() {
344 // No progress - must be a cycle
345 return Err(Mecha10Error::Configuration(
346 "Circular dependency detected in startup plan".to_string(),
347 ));
348 }
349
350 if !next_level.is_empty() {
351 levels.push(next_level);
352 }
353 }
354
355 info!("Startup plan created with {} levels", levels.len());
356 for (i, level) in levels.iter().enumerate() {
357 debug!(" Level {}: {:?}", i, level);
358 }
359
360 Ok(StartupPlan { levels })
361 }
362
363 /// Get shutdown plan (reverse of startup plan)
364 pub fn shutdown_plan(&self) -> Result<ShutdownPlan> {
365 let startup = self.startup_plan()?;
366 let mut levels = startup.levels;
367 levels.reverse();
368
369 info!("Shutdown plan created with {} levels", levels.len());
370 for (i, level) in levels.iter().enumerate() {
371 debug!(" Level {}: {:?}", i, level);
372 }
373
374 Ok(ShutdownPlan { levels })
375 }
376
377 /// Get all dependencies of a node (transitive closure)
378 ///
379 /// Returns all nodes that must be started before this node.
380 pub fn get_all_dependencies(&self, node_id: &str) -> HashSet<String> {
381 let mut result = HashSet::new();
382 let mut to_visit = vec![node_id.to_string()];
383
384 while let Some(current) = to_visit.pop() {
385 if let Some(deps) = self.dependencies.get(¤t) {
386 for dep in deps {
387 if result.insert(dep.clone()) {
388 to_visit.push(dep.clone());
389 }
390 }
391 }
392 }
393
394 result
395 }
396
397 /// Get all dependents of a node (transitive closure)
398 ///
399 /// Returns all nodes that must be stopped before this node can be stopped.
400 pub fn get_all_dependents(&self, node_id: &str) -> HashSet<String> {
401 let mut result = HashSet::new();
402 let mut to_visit = vec![node_id.to_string()];
403
404 while let Some(current) = to_visit.pop() {
405 if let Some(deps) = self.dependents.get(¤t) {
406 for dep in deps {
407 if result.insert(dep.clone()) {
408 to_visit.push(dep.clone());
409 }
410 }
411 }
412 }
413
414 result
415 }
416
417 /// Check if a node can be started
418 ///
419 /// Returns true if all dependencies are in the provided running set.
420 pub fn can_start(&self, node_id: &str, running: &HashSet<String>) -> bool {
421 if let Some(deps) = self.dependencies.get(node_id) {
422 deps.iter().all(|dep| running.contains(dep))
423 } else {
424 true // No dependencies
425 }
426 }
427
428 /// Check if a node can be stopped
429 ///
430 /// Returns true if no dependents are in the provided running set.
431 pub fn can_stop(&self, node_id: &str, running: &HashSet<String>) -> bool {
432 if let Some(deps) = self.dependents.get(node_id) {
433 !deps.iter().any(|dep| running.contains(dep))
434 } else {
435 true // No dependents
436 }
437 }
438
439 /// Get nodes that depend on this node
440 pub fn get_direct_dependents(&self, node_id: &str) -> Vec<String> {
441 self.dependents.get(node_id).cloned().unwrap_or_default()
442 }
443
444 /// Get nodes this node depends on
445 pub fn get_direct_dependencies(&self, node_id: &str) -> Vec<String> {
446 self.dependencies.get(node_id).cloned().unwrap_or_default()
447 }
448
449 /// Get the number of nodes in the graph
450 pub fn node_count(&self) -> usize {
451 self.nodes.len()
452 }
453
454 /// Get all node IDs
455 pub fn all_nodes(&self) -> Vec<String> {
456 self.nodes.iter().cloned().collect()
457 }
458}
459
460impl Default for DependencyGraph {
461 fn default() -> Self {
462 Self::new()
463 }
464}
465
466// ============================================================================
467// Startup Plan
468// ============================================================================
469
470/// Startup plan with nodes grouped by dependency level
471///
472/// Nodes at the same level can be started in parallel.
473#[derive(Debug, Clone, Serialize, Deserialize)]
474pub struct StartupPlan {
475 /// Nodes grouped by dependency level
476 /// Level 0: No dependencies
477 /// Level N: All dependencies are in levels 0..N-1
478 pub levels: Vec<Vec<String>>,
479}
480
481impl StartupPlan {
482 /// Get total number of levels
483 pub fn level_count(&self) -> usize {
484 self.levels.len()
485 }
486
487 /// Get nodes at a specific level
488 pub fn get_level(&self, level: usize) -> Option<&Vec<String>> {
489 self.levels.get(level)
490 }
491
492 /// Get total number of nodes
493 pub fn node_count(&self) -> usize {
494 self.levels.iter().map(|l| l.len()).sum()
495 }
496
497 /// Get maximum parallelism (largest level size)
498 pub fn max_parallelism(&self) -> usize {
499 self.levels.iter().map(|l| l.len()).max().unwrap_or(0)
500 }
501
502 /// Flatten to sequential order
503 pub fn flatten(&self) -> Vec<String> {
504 self.levels.iter().flatten().cloned().collect()
505 }
506}
507
508// ============================================================================
509// Shutdown Plan
510// ============================================================================
511
512/// Shutdown plan with nodes grouped by dependency level (reversed)
513///
514/// Nodes at the same level can be stopped in parallel.
515#[derive(Debug, Clone, Serialize, Deserialize)]
516pub struct ShutdownPlan {
517 /// Nodes grouped by dependency level (reversed)
518 pub levels: Vec<Vec<String>>,
519}
520
521impl ShutdownPlan {
522 /// Get total number of levels
523 pub fn level_count(&self) -> usize {
524 self.levels.len()
525 }
526
527 /// Get nodes at a specific level
528 pub fn get_level(&self, level: usize) -> Option<&Vec<String>> {
529 self.levels.get(level)
530 }
531
532 /// Get total number of nodes
533 pub fn node_count(&self) -> usize {
534 self.levels.iter().map(|l| l.len()).sum()
535 }
536
537 /// Get maximum parallelism (largest level size)
538 pub fn max_parallelism(&self) -> usize {
539 self.levels.iter().map(|l| l.len()).max().unwrap_or(0)
540 }
541
542 /// Flatten to sequential order
543 pub fn flatten(&self) -> Vec<String> {
544 self.levels.iter().flatten().cloned().collect()
545 }
546}
547
548// ============================================================================
549// Helper Functions
550// ============================================================================
551
552/// Create a simple dependency graph from node ID mappings
553///
554/// # Example
555///
556/// ```
557/// use mecha10::dependency::create_graph;
558///
559/// let graph = create_graph(&[
560/// ("camera", vec![]),
561/// ("vision", vec!["camera"]),
562/// ("planner", vec!["vision"]),
563/// ])?;
564/// ```
565pub fn create_graph(nodes: &[(&str, Vec<&str>)]) -> Result<DependencyGraph> {
566 let mut graph = DependencyGraph::new();
567
568 // Add all nodes
569 for (node_id, _) in nodes {
570 graph.add_node(node_id);
571 }
572
573 // Add dependencies
574 for (node_id, deps) in nodes {
575 for dep in deps {
576 graph.add_dependency(node_id, dep)?;
577 }
578 }
579
580 graph.validate()?;
581 Ok(graph)
582}