Skip to main content

oxigdal_workflow/scheduler/
dependency.rs

1//! Cross-workflow dependency scheduling.
2
3use crate::error::{Result, WorkflowError};
4use crate::scheduler::{ExecutionStatus, SchedulerConfig};
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9
10/// Workflow dependency definition.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct WorkflowDependency {
13    /// Workflow ID that depends on others.
14    pub workflow_id: String,
15    /// List of workflow IDs this workflow depends on.
16    pub dependencies: Vec<DependencyRule>,
17    /// Dependency resolution strategy.
18    pub strategy: DependencyStrategy,
19    /// Description of the dependency.
20    pub description: Option<String>,
21}
22
23/// Dependency rule for a single dependency.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct DependencyRule {
26    /// Dependent workflow ID.
27    pub workflow_id: String,
28    /// Required execution status.
29    pub required_status: ExecutionStatus,
30    /// Optional time window in seconds (dependency must complete within this window).
31    pub time_window_secs: Option<u64>,
32    /// Optional execution version/tag requirement.
33    pub version_requirement: Option<String>,
34}
35
36/// Dependency resolution strategy.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38pub enum DependencyStrategy {
39    /// All dependencies must be satisfied.
40    All,
41    /// At least one dependency must be satisfied.
42    Any,
43    /// Exactly N dependencies must be satisfied.
44    AtLeast {
45        /// Minimum number of dependencies that must be satisfied.
46        count: usize,
47    },
48    /// Custom voting strategy (majority).
49    Majority,
50}
51
52/// Dependency graph for tracking workflow dependencies.
53#[derive(Debug)]
54pub struct DependencyGraph {
55    /// Map of workflow ID to its dependencies.
56    dependencies: HashMap<String, HashSet<String>>,
57    /// Map of workflow ID to workflows that depend on it.
58    dependents: HashMap<String, HashSet<String>>,
59}
60
61impl DependencyGraph {
62    /// Create a new empty dependency graph.
63    pub fn new() -> Self {
64        Self {
65            dependencies: HashMap::new(),
66            dependents: HashMap::new(),
67        }
68    }
69
70    /// Add a dependency edge.
71    pub fn add_dependency(&mut self, workflow_id: String, dependency_id: String) {
72        self.dependencies
73            .entry(workflow_id.clone())
74            .or_default()
75            .insert(dependency_id.clone());
76
77        self.dependents
78            .entry(dependency_id)
79            .or_default()
80            .insert(workflow_id);
81    }
82
83    /// Remove a dependency edge.
84    pub fn remove_dependency(&mut self, workflow_id: &str, dependency_id: &str) {
85        if let Some(deps) = self.dependencies.get_mut(workflow_id) {
86            deps.remove(dependency_id);
87        }
88
89        if let Some(dependents) = self.dependents.get_mut(dependency_id) {
90            dependents.remove(workflow_id);
91        }
92    }
93
94    /// Get all dependencies for a workflow.
95    pub fn get_dependencies(&self, workflow_id: &str) -> Option<&HashSet<String>> {
96        self.dependencies.get(workflow_id)
97    }
98
99    /// Get all workflows that depend on a given workflow.
100    pub fn get_dependents(&self, workflow_id: &str) -> Option<&HashSet<String>> {
101        self.dependents.get(workflow_id)
102    }
103
104    /// Check for circular dependencies using DFS.
105    pub fn has_cycle(&self, start_id: &str) -> bool {
106        let mut visited = HashSet::new();
107        let mut rec_stack = HashSet::new();
108        self.has_cycle_util(start_id, &mut visited, &mut rec_stack)
109    }
110
111    /// Utility function for cycle detection.
112    fn has_cycle_util(
113        &self,
114        current: &str,
115        visited: &mut HashSet<String>,
116        rec_stack: &mut HashSet<String>,
117    ) -> bool {
118        if rec_stack.contains(current) {
119            return true;
120        }
121
122        if visited.contains(current) {
123            return false;
124        }
125
126        visited.insert(current.to_string());
127        rec_stack.insert(current.to_string());
128
129        if let Some(deps) = self.dependencies.get(current) {
130            for dep in deps {
131                if self.has_cycle_util(dep, visited, rec_stack) {
132                    return true;
133                }
134            }
135        }
136
137        rec_stack.remove(current);
138        false
139    }
140
141    /// Get execution order using topological sort.
142    pub fn get_execution_order(&self) -> Result<Vec<String>> {
143        let mut in_degree: HashMap<String, usize> = HashMap::new();
144        let mut zero_in_degree = Vec::new();
145        let mut result = Vec::new();
146
147        // Initialize in-degrees for all nodes
148        for workflow_id in self.dependencies.keys() {
149            in_degree.entry(workflow_id.clone()).or_insert(0);
150        }
151        for deps in self.dependencies.values() {
152            for dep in deps {
153                in_degree.entry(dep.clone()).or_insert(0);
154            }
155        }
156
157        // Calculate in-degrees: if workflow_id depends on deps,
158        // then workflow_id has incoming edges from each dep
159        for (workflow_id, deps) in &self.dependencies {
160            for _ in deps {
161                *in_degree.entry(workflow_id.clone()).or_insert(0) += 1;
162            }
163        }
164
165        // Find nodes with zero in-degree
166        for (id, &degree) in &in_degree {
167            if degree == 0 {
168                zero_in_degree.push(id.clone());
169            }
170        }
171
172        // Process nodes
173        while let Some(current) = zero_in_degree.pop() {
174            result.push(current.clone());
175
176            if let Some(dependents) = self.dependents.get(&current) {
177                for dependent in dependents {
178                    if let Some(degree) = in_degree.get_mut(dependent) {
179                        *degree -= 1;
180                        if *degree == 0 {
181                            zero_in_degree.push(dependent.clone());
182                        }
183                    }
184                }
185            }
186        }
187
188        // Check if all nodes were processed (no cycle)
189        if result.len() != in_degree.len() {
190            return Err(WorkflowError::validation("Circular dependency detected"));
191        }
192
193        Ok(result)
194    }
195}
196
197impl Default for DependencyGraph {
198    fn default() -> Self {
199        Self::new()
200    }
201}
202
203/// Dependency scheduler for managing cross-workflow dependencies.
204pub struct DependencyScheduler {
205    /// Scheduler configuration (reserved for future enhancements).
206    _config: SchedulerConfig,
207    dependencies: Arc<DashMap<String, WorkflowDependency>>,
208    graph: Arc<parking_lot::RwLock<DependencyGraph>>,
209    execution_status: Arc<DashMap<String, ExecutionStatus>>,
210}
211
212impl DependencyScheduler {
213    /// Create a new dependency scheduler.
214    pub fn new(config: SchedulerConfig) -> Self {
215        Self {
216            _config: config,
217            dependencies: Arc::new(DashMap::new()),
218            graph: Arc::new(parking_lot::RwLock::new(DependencyGraph::new())),
219            execution_status: Arc::new(DashMap::new()),
220        }
221    }
222
223    /// Add a workflow dependency.
224    pub fn add_dependency(&self, dependency: WorkflowDependency) -> Result<()> {
225        let workflow_id = dependency.workflow_id.clone();
226
227        // Update the graph
228        let mut graph = self.graph.write();
229        for rule in &dependency.dependencies {
230            graph.add_dependency(workflow_id.clone(), rule.workflow_id.clone());
231        }
232
233        // Check for cycles
234        if graph.has_cycle(&workflow_id) {
235            // Rollback
236            for rule in &dependency.dependencies {
237                graph.remove_dependency(&workflow_id, &rule.workflow_id);
238            }
239            return Err(WorkflowError::validation(format!(
240                "Adding dependency would create a cycle for workflow '{}'",
241                workflow_id
242            )));
243        }
244
245        drop(graph);
246
247        self.dependencies.insert(workflow_id, dependency);
248        Ok(())
249    }
250
251    /// Remove a workflow dependency.
252    pub fn remove_dependency(&self, workflow_id: &str) -> Result<()> {
253        let entry = self
254            .dependencies
255            .remove(workflow_id)
256            .ok_or_else(|| WorkflowError::not_found(workflow_id))?;
257
258        let dependency = entry.1;
259
260        // Update the graph
261        let mut graph = self.graph.write();
262        for rule in &dependency.dependencies {
263            graph.remove_dependency(workflow_id, &rule.workflow_id);
264        }
265
266        Ok(())
267    }
268
269    /// Check if a workflow's dependencies are satisfied.
270    pub fn are_dependencies_satisfied(&self, workflow_id: &str) -> Result<bool> {
271        let dependency = self
272            .dependencies
273            .get(workflow_id)
274            .ok_or_else(|| WorkflowError::not_found(workflow_id))?;
275
276        let mut satisfied_count = 0;
277        let total_count = dependency.dependencies.len();
278
279        for rule in &dependency.dependencies {
280            if self.is_dependency_satisfied(rule)? {
281                satisfied_count += 1;
282            }
283        }
284
285        let result = match dependency.strategy {
286            DependencyStrategy::All => satisfied_count == total_count,
287            DependencyStrategy::Any => satisfied_count > 0,
288            DependencyStrategy::AtLeast { count } => satisfied_count >= count,
289            DependencyStrategy::Majority => satisfied_count > total_count / 2,
290        };
291
292        Ok(result)
293    }
294
295    /// Check if a single dependency rule is satisfied.
296    fn is_dependency_satisfied(&self, rule: &DependencyRule) -> Result<bool> {
297        let status = self
298            .execution_status
299            .get(&rule.workflow_id)
300            .map(|entry| *entry.value())
301            .unwrap_or(ExecutionStatus::Pending);
302
303        Ok(status == rule.required_status)
304    }
305
306    /// Update the execution status of a workflow.
307    pub fn update_status(&self, workflow_id: String, status: ExecutionStatus) {
308        self.execution_status.insert(workflow_id, status);
309    }
310
311    /// Get workflows that can be executed (dependencies satisfied).
312    pub fn get_executable_workflows(&self) -> Result<Vec<String>> {
313        let mut executable = Vec::new();
314
315        for entry in self.dependencies.iter() {
316            let workflow_id = entry.key();
317            if self.are_dependencies_satisfied(workflow_id)? {
318                executable.push(workflow_id.clone());
319            }
320        }
321
322        Ok(executable)
323    }
324
325    /// Get the dependency graph.
326    pub fn get_graph(&self) -> parking_lot::RwLockReadGuard<'_, DependencyGraph> {
327        self.graph.read()
328    }
329
330    /// Get execution order for all workflows.
331    pub fn get_execution_order(&self) -> Result<Vec<String>> {
332        self.graph.read().get_execution_order()
333    }
334
335    /// Clear all dependencies.
336    pub fn clear(&self) {
337        self.dependencies.clear();
338        self.execution_status.clear();
339        let mut graph = self.graph.write();
340        *graph = DependencyGraph::new();
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347
348    #[test]
349    fn test_dependency_graph_creation() {
350        let mut graph = DependencyGraph::new();
351        graph.add_dependency("workflow1".to_string(), "workflow2".to_string());
352
353        assert!(graph.get_dependencies("workflow1").is_some());
354        assert_eq!(
355            graph
356                .get_dependencies("workflow1")
357                .expect("Missing deps")
358                .len(),
359            1
360        );
361    }
362
363    #[test]
364    fn test_dependency_graph_cycle_detection() {
365        let mut graph = DependencyGraph::new();
366        graph.add_dependency("workflow1".to_string(), "workflow2".to_string());
367        graph.add_dependency("workflow2".to_string(), "workflow3".to_string());
368        graph.add_dependency("workflow3".to_string(), "workflow1".to_string());
369
370        assert!(graph.has_cycle("workflow1"));
371    }
372
373    #[test]
374    fn test_dependency_graph_execution_order() {
375        let mut graph = DependencyGraph::new();
376        graph.add_dependency("workflow1".to_string(), "workflow2".to_string());
377        graph.add_dependency("workflow2".to_string(), "workflow3".to_string());
378
379        let order = graph.get_execution_order().expect("Failed to get order");
380        assert!(!order.is_empty());
381    }
382
383    #[test]
384    fn test_dependency_scheduler() {
385        let scheduler = DependencyScheduler::new(SchedulerConfig::default());
386
387        let dependency = WorkflowDependency {
388            workflow_id: "workflow1".to_string(),
389            dependencies: vec![DependencyRule {
390                workflow_id: "workflow2".to_string(),
391                required_status: ExecutionStatus::Success,
392                time_window_secs: None,
393                version_requirement: None,
394            }],
395            strategy: DependencyStrategy::All,
396            description: None,
397        };
398
399        scheduler
400            .add_dependency(dependency)
401            .expect("Failed to add dependency");
402
403        // Initially not satisfied
404        assert!(
405            !scheduler
406                .are_dependencies_satisfied("workflow1")
407                .expect("Check failed")
408        );
409
410        // Update status
411        scheduler.update_status("workflow2".to_string(), ExecutionStatus::Success);
412
413        // Now satisfied
414        assert!(
415            scheduler
416                .are_dependencies_satisfied("workflow1")
417                .expect("Check failed")
418        );
419    }
420
421    #[test]
422    fn test_dependency_cycle_prevention() {
423        let scheduler = DependencyScheduler::new(SchedulerConfig::default());
424
425        let dep1 = WorkflowDependency {
426            workflow_id: "workflow1".to_string(),
427            dependencies: vec![DependencyRule {
428                workflow_id: "workflow2".to_string(),
429                required_status: ExecutionStatus::Success,
430                time_window_secs: None,
431                version_requirement: None,
432            }],
433            strategy: DependencyStrategy::All,
434            description: None,
435        };
436
437        scheduler.add_dependency(dep1).expect("Failed to add");
438
439        let dep2 = WorkflowDependency {
440            workflow_id: "workflow2".to_string(),
441            dependencies: vec![DependencyRule {
442                workflow_id: "workflow1".to_string(),
443                required_status: ExecutionStatus::Success,
444                time_window_secs: None,
445                version_requirement: None,
446            }],
447            strategy: DependencyStrategy::All,
448            description: None,
449        };
450
451        // Should fail due to cycle
452        assert!(scheduler.add_dependency(dep2).is_err());
453    }
454}