mecha10_core/
dependency.rs

1//! Dependency Resolution for Node Startup/Shutdown Ordering
2//!
3//! Provides topological sorting and dependency graph analysis to ensure nodes
4//! start and stop in the correct order based on their dependencies.
5//!
6//! # Use Cases
7//!
8//! - Start nodes in dependency order (dependencies before dependents)
9//! - Stop nodes in reverse order (dependents before dependencies)
10//! - Detect circular dependencies
11//! - Validate dependency references
12//! - Group nodes by dependency level for parallel startup
13//!
14//! # Example
15//!
16//! ```rust
17//! use mecha10::dependency::{DependencyGraph, StartupPlan};
18//! use mecha10::types::NodeConfig;
19//!
20//! # async fn example() -> mecha10::Result<()> {
21//! let nodes = vec![
22//!     NodeConfig { id: "camera".into(), depends_on: vec![], /* ... */ },
23//!     NodeConfig { id: "vision".into(), depends_on: vec!["camera".into()], /* ... */ },
24//!     NodeConfig { id: "planner".into(), depends_on: vec!["vision".into()], /* ... */ },
25//! ];
26//!
27//! // Build dependency graph
28//! let graph = DependencyGraph::from_configs(&nodes)?;
29//!
30//! // Get startup order
31//! let startup_order = graph.topological_sort()?;
32//! // Result: ["camera", "vision", "planner"]
33//!
34//! // Get shutdown order (reverse)
35//! let shutdown_order = graph.reverse_topological_sort()?;
36//! // Result: ["planner", "vision", "camera"]
37//!
38//! // Get parallel startup plan (nodes grouped by level)
39//! let plan = graph.startup_plan()?;
40//! // Level 0: ["camera"] - no dependencies
41//! // Level 1: ["vision"] - depends on camera
42//! // Level 2: ["planner"] - depends on vision
43//! # Ok(())
44//! # }
45//! ```
46
47use crate::error::{Mecha10Error, Result};
48use crate::types::NodeConfig;
49use serde::{Deserialize, Serialize};
50use std::collections::{HashMap, HashSet, VecDeque};
51use tracing::{debug, info};
52
53// ============================================================================
54// Dependency Graph
55// ============================================================================
56
57/// Dependency graph for node relationships
58///
59/// Maintains a directed graph of node dependencies where an edge from A to B
60/// means "A depends on B" (B must start before A).
61#[derive(Debug, Clone)]
62pub struct DependencyGraph {
63    /// Node dependencies: node_id -> vec of dependencies
64    dependencies: HashMap<String, Vec<String>>,
65
66    /// Reverse dependencies: node_id -> vec of dependents
67    dependents: HashMap<String, Vec<String>>,
68
69    /// All node IDs
70    nodes: HashSet<String>,
71}
72
73impl DependencyGraph {
74    /// Create a new empty dependency graph
75    pub fn new() -> Self {
76        Self {
77            dependencies: HashMap::new(),
78            dependents: HashMap::new(),
79            nodes: HashSet::new(),
80        }
81    }
82
83    /// Build dependency graph from node configurations
84    ///
85    /// # Arguments
86    ///
87    /// * `configs` - Slice of node configurations
88    ///
89    /// # Returns
90    ///
91    /// Dependency graph or error if dependencies are invalid
92    ///
93    /// # Errors
94    ///
95    /// - Circular dependencies detected
96    /// - Undefined dependency references
97    pub fn from_configs(configs: &[NodeConfig]) -> Result<Self> {
98        let mut graph = Self::new();
99
100        // Add all nodes first
101        for config in configs {
102            graph.add_node(&config.id);
103        }
104
105        // Add dependencies
106        for config in configs {
107            for dep in &config.depends_on {
108                graph.add_dependency(&config.id, dep)?;
109            }
110        }
111
112        // Validate graph
113        graph.validate()?;
114
115        Ok(graph)
116    }
117
118    /// Add a node to the graph
119    pub fn add_node(&mut self, node_id: &str) {
120        self.nodes.insert(node_id.to_string());
121        self.dependencies.entry(node_id.to_string()).or_default();
122        self.dependents.entry(node_id.to_string()).or_default();
123    }
124
125    /// Add a dependency relationship
126    ///
127    /// # Arguments
128    ///
129    /// * `node_id` - The dependent node
130    /// * `dependency` - The node that `node_id` depends on
131    ///
132    /// # Errors
133    ///
134    /// Returns error if dependency node doesn't exist
135    pub fn add_dependency(&mut self, node_id: &str, dependency: &str) -> Result<()> {
136        // Ensure both nodes exist
137        if !self.nodes.contains(node_id) {
138            return Err(Mecha10Error::Configuration(format!(
139                "Node '{}' not found in graph",
140                node_id
141            )));
142        }
143
144        if !self.nodes.contains(dependency) {
145            return Err(Mecha10Error::Configuration(format!(
146                "Dependency '{}' not found (required by '{}')",
147                dependency, node_id
148            )));
149        }
150
151        // Add to dependencies map
152        self.dependencies
153            .entry(node_id.to_string())
154            .or_default()
155            .push(dependency.to_string());
156
157        // Add to reverse dependencies map
158        self.dependents
159            .entry(dependency.to_string())
160            .or_default()
161            .push(node_id.to_string());
162
163        Ok(())
164    }
165
166    /// Validate the dependency graph
167    ///
168    /// Checks for:
169    /// - Circular dependencies
170    /// - Undefined dependency references
171    pub fn validate(&self) -> Result<()> {
172        // Check for circular dependencies
173        self.check_circular_dependencies()?;
174
175        // Check all dependencies exist
176        for (node_id, deps) in &self.dependencies {
177            for dep in deps {
178                if !self.nodes.contains(dep) {
179                    return Err(Mecha10Error::Configuration(format!(
180                        "Node '{}' depends on undefined node '{}'",
181                        node_id, dep
182                    )));
183                }
184            }
185        }
186
187        Ok(())
188    }
189
190    /// Check for circular dependencies using DFS
191    fn check_circular_dependencies(&self) -> Result<()> {
192        let mut visited = HashSet::new();
193        let mut rec_stack = HashSet::new();
194
195        for node in &self.nodes {
196            if !visited.contains(node) && self.has_cycle_dfs(node, &mut visited, &mut rec_stack)? {
197                return Err(Mecha10Error::Configuration(format!(
198                    "Circular dependency detected involving node '{}'",
199                    node
200                )));
201            }
202        }
203
204        Ok(())
205    }
206
207    /// DFS helper for cycle detection
208    fn has_cycle_dfs(
209        &self,
210        node: &str,
211        visited: &mut HashSet<String>,
212        rec_stack: &mut HashSet<String>,
213    ) -> Result<bool> {
214        visited.insert(node.to_string());
215        rec_stack.insert(node.to_string());
216
217        if let Some(deps) = self.dependencies.get(node) {
218            for dep in deps {
219                if !visited.contains(dep) {
220                    if self.has_cycle_dfs(dep, visited, rec_stack)? {
221                        return Ok(true);
222                    }
223                } else if rec_stack.contains(dep) {
224                    return Ok(true); // Cycle detected
225                }
226            }
227        }
228
229        rec_stack.remove(node);
230        Ok(false)
231    }
232
233    /// Perform topological sort using Kahn's algorithm
234    ///
235    /// Returns nodes in dependency order (dependencies before dependents).
236    /// This is the order for starting nodes.
237    ///
238    /// # Returns
239    ///
240    /// Vector of node IDs in topological order
241    ///
242    /// # Errors
243    ///
244    /// Returns error if graph has cycles
245    pub fn topological_sort(&self) -> Result<Vec<String>> {
246        let mut in_degree: HashMap<String, usize> = HashMap::new();
247        let mut result = Vec::new();
248        let mut queue = VecDeque::new();
249
250        // Calculate in-degree for each node
251        for node in &self.nodes {
252            let degree = self.dependencies.get(node).map(|d| d.len()).unwrap_or(0);
253            in_degree.insert(node.clone(), degree);
254
255            // Nodes with no dependencies can start immediately
256            if degree == 0 {
257                queue.push_back(node.clone());
258            }
259        }
260
261        // Process nodes with no dependencies
262        while let Some(node) = queue.pop_front() {
263            result.push(node.clone());
264
265            // Reduce in-degree for all dependents
266            if let Some(dependents) = self.dependents.get(&node) {
267                for dependent in dependents {
268                    if let Some(degree) = in_degree.get_mut(dependent) {
269                        *degree -= 1;
270                        if *degree == 0 {
271                            queue.push_back(dependent.clone());
272                        }
273                    }
274                }
275            }
276        }
277
278        // Check if all nodes were processed
279        if result.len() != self.nodes.len() {
280            return Err(Mecha10Error::Configuration(
281                "Circular dependency detected during topological sort".to_string(),
282            ));
283        }
284
285        debug!("Topological sort result: {:?}", result);
286        Ok(result)
287    }
288
289    /// Get reverse topological sort (for shutdown order)
290    ///
291    /// Returns nodes in reverse dependency order (dependents before dependencies).
292    /// This is the order for stopping nodes.
293    pub fn reverse_topological_sort(&self) -> Result<Vec<String>> {
294        let mut order = self.topological_sort()?;
295        order.reverse();
296        debug!("Reverse topological sort result: {:?}", order);
297        Ok(order)
298    }
299
300    /// Get startup plan with nodes grouped by dependency level
301    ///
302    /// Nodes at the same level have no dependencies on each other and can
303    /// be started in parallel.
304    ///
305    /// # Returns
306    ///
307    /// StartupPlan with nodes grouped by level
308    pub fn startup_plan(&self) -> Result<StartupPlan> {
309        let mut levels: Vec<Vec<String>> = Vec::new();
310        let mut processed = HashSet::new();
311        let mut current_level = Vec::new();
312
313        // Find nodes with no dependencies (level 0)
314        for node in &self.nodes {
315            if self.dependencies.get(node).map(|d| d.is_empty()).unwrap_or(true) {
316                current_level.push(node.clone());
317                processed.insert(node.clone());
318            }
319        }
320
321        if !current_level.is_empty() {
322            levels.push(current_level.clone());
323        }
324
325        // Process remaining levels
326        while processed.len() < self.nodes.len() {
327            let mut next_level = Vec::new();
328
329            for node in &self.nodes {
330                if processed.contains(node) {
331                    continue;
332                }
333
334                // Check if all dependencies are processed
335                if let Some(deps) = self.dependencies.get(node) {
336                    if deps.iter().all(|dep| processed.contains(dep)) {
337                        next_level.push(node.clone());
338                        processed.insert(node.clone());
339                    }
340                }
341            }
342
343            if next_level.is_empty() && processed.len() < self.nodes.len() {
344                // No progress - must be a cycle
345                return Err(Mecha10Error::Configuration(
346                    "Circular dependency detected in startup plan".to_string(),
347                ));
348            }
349
350            if !next_level.is_empty() {
351                levels.push(next_level);
352            }
353        }
354
355        info!("Startup plan created with {} levels", levels.len());
356        for (i, level) in levels.iter().enumerate() {
357            debug!("  Level {}: {:?}", i, level);
358        }
359
360        Ok(StartupPlan { levels })
361    }
362
363    /// Get shutdown plan (reverse of startup plan)
364    pub fn shutdown_plan(&self) -> Result<ShutdownPlan> {
365        let startup = self.startup_plan()?;
366        let mut levels = startup.levels;
367        levels.reverse();
368
369        info!("Shutdown plan created with {} levels", levels.len());
370        for (i, level) in levels.iter().enumerate() {
371            debug!("  Level {}: {:?}", i, level);
372        }
373
374        Ok(ShutdownPlan { levels })
375    }
376
377    /// Get all dependencies of a node (transitive closure)
378    ///
379    /// Returns all nodes that must be started before this node.
380    pub fn get_all_dependencies(&self, node_id: &str) -> HashSet<String> {
381        let mut result = HashSet::new();
382        let mut to_visit = vec![node_id.to_string()];
383
384        while let Some(current) = to_visit.pop() {
385            if let Some(deps) = self.dependencies.get(&current) {
386                for dep in deps {
387                    if result.insert(dep.clone()) {
388                        to_visit.push(dep.clone());
389                    }
390                }
391            }
392        }
393
394        result
395    }
396
397    /// Get all dependents of a node (transitive closure)
398    ///
399    /// Returns all nodes that must be stopped before this node can be stopped.
400    pub fn get_all_dependents(&self, node_id: &str) -> HashSet<String> {
401        let mut result = HashSet::new();
402        let mut to_visit = vec![node_id.to_string()];
403
404        while let Some(current) = to_visit.pop() {
405            if let Some(deps) = self.dependents.get(&current) {
406                for dep in deps {
407                    if result.insert(dep.clone()) {
408                        to_visit.push(dep.clone());
409                    }
410                }
411            }
412        }
413
414        result
415    }
416
417    /// Check if a node can be started
418    ///
419    /// Returns true if all dependencies are in the provided running set.
420    pub fn can_start(&self, node_id: &str, running: &HashSet<String>) -> bool {
421        if let Some(deps) = self.dependencies.get(node_id) {
422            deps.iter().all(|dep| running.contains(dep))
423        } else {
424            true // No dependencies
425        }
426    }
427
428    /// Check if a node can be stopped
429    ///
430    /// Returns true if no dependents are in the provided running set.
431    pub fn can_stop(&self, node_id: &str, running: &HashSet<String>) -> bool {
432        if let Some(deps) = self.dependents.get(node_id) {
433            !deps.iter().any(|dep| running.contains(dep))
434        } else {
435            true // No dependents
436        }
437    }
438
439    /// Get nodes that depend on this node
440    pub fn get_direct_dependents(&self, node_id: &str) -> Vec<String> {
441        self.dependents.get(node_id).cloned().unwrap_or_default()
442    }
443
444    /// Get nodes this node depends on
445    pub fn get_direct_dependencies(&self, node_id: &str) -> Vec<String> {
446        self.dependencies.get(node_id).cloned().unwrap_or_default()
447    }
448
449    /// Get the number of nodes in the graph
450    pub fn node_count(&self) -> usize {
451        self.nodes.len()
452    }
453
454    /// Get all node IDs
455    pub fn all_nodes(&self) -> Vec<String> {
456        self.nodes.iter().cloned().collect()
457    }
458}
459
460impl Default for DependencyGraph {
461    fn default() -> Self {
462        Self::new()
463    }
464}
465
466// ============================================================================
467// Startup Plan
468// ============================================================================
469
470/// Startup plan with nodes grouped by dependency level
471///
472/// Nodes at the same level can be started in parallel.
473#[derive(Debug, Clone, Serialize, Deserialize)]
474pub struct StartupPlan {
475    /// Nodes grouped by dependency level
476    /// Level 0: No dependencies
477    /// Level N: All dependencies are in levels 0..N-1
478    pub levels: Vec<Vec<String>>,
479}
480
481impl StartupPlan {
482    /// Get total number of levels
483    pub fn level_count(&self) -> usize {
484        self.levels.len()
485    }
486
487    /// Get nodes at a specific level
488    pub fn get_level(&self, level: usize) -> Option<&Vec<String>> {
489        self.levels.get(level)
490    }
491
492    /// Get total number of nodes
493    pub fn node_count(&self) -> usize {
494        self.levels.iter().map(|l| l.len()).sum()
495    }
496
497    /// Get maximum parallelism (largest level size)
498    pub fn max_parallelism(&self) -> usize {
499        self.levels.iter().map(|l| l.len()).max().unwrap_or(0)
500    }
501
502    /// Flatten to sequential order
503    pub fn flatten(&self) -> Vec<String> {
504        self.levels.iter().flatten().cloned().collect()
505    }
506}
507
508// ============================================================================
509// Shutdown Plan
510// ============================================================================
511
512/// Shutdown plan with nodes grouped by dependency level (reversed)
513///
514/// Nodes at the same level can be stopped in parallel.
515#[derive(Debug, Clone, Serialize, Deserialize)]
516pub struct ShutdownPlan {
517    /// Nodes grouped by dependency level (reversed)
518    pub levels: Vec<Vec<String>>,
519}
520
521impl ShutdownPlan {
522    /// Get total number of levels
523    pub fn level_count(&self) -> usize {
524        self.levels.len()
525    }
526
527    /// Get nodes at a specific level
528    pub fn get_level(&self, level: usize) -> Option<&Vec<String>> {
529        self.levels.get(level)
530    }
531
532    /// Get total number of nodes
533    pub fn node_count(&self) -> usize {
534        self.levels.iter().map(|l| l.len()).sum()
535    }
536
537    /// Get maximum parallelism (largest level size)
538    pub fn max_parallelism(&self) -> usize {
539        self.levels.iter().map(|l| l.len()).max().unwrap_or(0)
540    }
541
542    /// Flatten to sequential order
543    pub fn flatten(&self) -> Vec<String> {
544        self.levels.iter().flatten().cloned().collect()
545    }
546}
547
548// ============================================================================
549// Helper Functions
550// ============================================================================
551
552/// Create a simple dependency graph from node ID mappings
553///
554/// # Example
555///
556/// ```
557/// use mecha10::dependency::create_graph;
558///
559/// let graph = create_graph(&[
560///     ("camera", vec![]),
561///     ("vision", vec!["camera"]),
562///     ("planner", vec!["vision"]),
563/// ])?;
564/// ```
565pub fn create_graph(nodes: &[(&str, Vec<&str>)]) -> Result<DependencyGraph> {
566    let mut graph = DependencyGraph::new();
567
568    // Add all nodes
569    for (node_id, _) in nodes {
570        graph.add_node(node_id);
571    }
572
573    // Add dependencies
574    for (node_id, deps) in nodes {
575        for dep in deps {
576            graph.add_dependency(node_id, dep)?;
577        }
578    }
579
580    graph.validate()?;
581    Ok(graph)
582}