oxigdal_workflow/scheduler/
dependency.rs1use crate::error::{Result, WorkflowError};
4use crate::scheduler::{ExecutionStatus, SchedulerConfig};
5use dashmap::DashMap;
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct WorkflowDependency {
13 pub workflow_id: String,
15 pub dependencies: Vec<DependencyRule>,
17 pub strategy: DependencyStrategy,
19 pub description: Option<String>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct DependencyRule {
26 pub workflow_id: String,
28 pub required_status: ExecutionStatus,
30 pub time_window_secs: Option<u64>,
32 pub version_requirement: Option<String>,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38pub enum DependencyStrategy {
39 All,
41 Any,
43 AtLeast {
45 count: usize,
47 },
48 Majority,
50}
51
52#[derive(Debug)]
54pub struct DependencyGraph {
55 dependencies: HashMap<String, HashSet<String>>,
57 dependents: HashMap<String, HashSet<String>>,
59}
60
61impl DependencyGraph {
62 pub fn new() -> Self {
64 Self {
65 dependencies: HashMap::new(),
66 dependents: HashMap::new(),
67 }
68 }
69
70 pub fn add_dependency(&mut self, workflow_id: String, dependency_id: String) {
72 self.dependencies
73 .entry(workflow_id.clone())
74 .or_default()
75 .insert(dependency_id.clone());
76
77 self.dependents
78 .entry(dependency_id)
79 .or_default()
80 .insert(workflow_id);
81 }
82
83 pub fn remove_dependency(&mut self, workflow_id: &str, dependency_id: &str) {
85 if let Some(deps) = self.dependencies.get_mut(workflow_id) {
86 deps.remove(dependency_id);
87 }
88
89 if let Some(dependents) = self.dependents.get_mut(dependency_id) {
90 dependents.remove(workflow_id);
91 }
92 }
93
94 pub fn get_dependencies(&self, workflow_id: &str) -> Option<&HashSet<String>> {
96 self.dependencies.get(workflow_id)
97 }
98
99 pub fn get_dependents(&self, workflow_id: &str) -> Option<&HashSet<String>> {
101 self.dependents.get(workflow_id)
102 }
103
104 pub fn has_cycle(&self, start_id: &str) -> bool {
106 let mut visited = HashSet::new();
107 let mut rec_stack = HashSet::new();
108 self.has_cycle_util(start_id, &mut visited, &mut rec_stack)
109 }
110
111 fn has_cycle_util(
113 &self,
114 current: &str,
115 visited: &mut HashSet<String>,
116 rec_stack: &mut HashSet<String>,
117 ) -> bool {
118 if rec_stack.contains(current) {
119 return true;
120 }
121
122 if visited.contains(current) {
123 return false;
124 }
125
126 visited.insert(current.to_string());
127 rec_stack.insert(current.to_string());
128
129 if let Some(deps) = self.dependencies.get(current) {
130 for dep in deps {
131 if self.has_cycle_util(dep, visited, rec_stack) {
132 return true;
133 }
134 }
135 }
136
137 rec_stack.remove(current);
138 false
139 }
140
141 pub fn get_execution_order(&self) -> Result<Vec<String>> {
143 let mut in_degree: HashMap<String, usize> = HashMap::new();
144 let mut zero_in_degree = Vec::new();
145 let mut result = Vec::new();
146
147 for workflow_id in self.dependencies.keys() {
149 in_degree.entry(workflow_id.clone()).or_insert(0);
150 }
151 for deps in self.dependencies.values() {
152 for dep in deps {
153 in_degree.entry(dep.clone()).or_insert(0);
154 }
155 }
156
157 for (workflow_id, deps) in &self.dependencies {
160 for _ in deps {
161 *in_degree.entry(workflow_id.clone()).or_insert(0) += 1;
162 }
163 }
164
165 for (id, °ree) in &in_degree {
167 if degree == 0 {
168 zero_in_degree.push(id.clone());
169 }
170 }
171
172 while let Some(current) = zero_in_degree.pop() {
174 result.push(current.clone());
175
176 if let Some(dependents) = self.dependents.get(¤t) {
177 for dependent in dependents {
178 if let Some(degree) = in_degree.get_mut(dependent) {
179 *degree -= 1;
180 if *degree == 0 {
181 zero_in_degree.push(dependent.clone());
182 }
183 }
184 }
185 }
186 }
187
188 if result.len() != in_degree.len() {
190 return Err(WorkflowError::validation("Circular dependency detected"));
191 }
192
193 Ok(result)
194 }
195}
196
197impl Default for DependencyGraph {
198 fn default() -> Self {
199 Self::new()
200 }
201}
202
203pub struct DependencyScheduler {
205 _config: SchedulerConfig,
207 dependencies: Arc<DashMap<String, WorkflowDependency>>,
208 graph: Arc<parking_lot::RwLock<DependencyGraph>>,
209 execution_status: Arc<DashMap<String, ExecutionStatus>>,
210}
211
212impl DependencyScheduler {
213 pub fn new(config: SchedulerConfig) -> Self {
215 Self {
216 _config: config,
217 dependencies: Arc::new(DashMap::new()),
218 graph: Arc::new(parking_lot::RwLock::new(DependencyGraph::new())),
219 execution_status: Arc::new(DashMap::new()),
220 }
221 }
222
223 pub fn add_dependency(&self, dependency: WorkflowDependency) -> Result<()> {
225 let workflow_id = dependency.workflow_id.clone();
226
227 let mut graph = self.graph.write();
229 for rule in &dependency.dependencies {
230 graph.add_dependency(workflow_id.clone(), rule.workflow_id.clone());
231 }
232
233 if graph.has_cycle(&workflow_id) {
235 for rule in &dependency.dependencies {
237 graph.remove_dependency(&workflow_id, &rule.workflow_id);
238 }
239 return Err(WorkflowError::validation(format!(
240 "Adding dependency would create a cycle for workflow '{}'",
241 workflow_id
242 )));
243 }
244
245 drop(graph);
246
247 self.dependencies.insert(workflow_id, dependency);
248 Ok(())
249 }
250
251 pub fn remove_dependency(&self, workflow_id: &str) -> Result<()> {
253 let entry = self
254 .dependencies
255 .remove(workflow_id)
256 .ok_or_else(|| WorkflowError::not_found(workflow_id))?;
257
258 let dependency = entry.1;
259
260 let mut graph = self.graph.write();
262 for rule in &dependency.dependencies {
263 graph.remove_dependency(workflow_id, &rule.workflow_id);
264 }
265
266 Ok(())
267 }
268
269 pub fn are_dependencies_satisfied(&self, workflow_id: &str) -> Result<bool> {
271 let dependency = self
272 .dependencies
273 .get(workflow_id)
274 .ok_or_else(|| WorkflowError::not_found(workflow_id))?;
275
276 let mut satisfied_count = 0;
277 let total_count = dependency.dependencies.len();
278
279 for rule in &dependency.dependencies {
280 if self.is_dependency_satisfied(rule)? {
281 satisfied_count += 1;
282 }
283 }
284
285 let result = match dependency.strategy {
286 DependencyStrategy::All => satisfied_count == total_count,
287 DependencyStrategy::Any => satisfied_count > 0,
288 DependencyStrategy::AtLeast { count } => satisfied_count >= count,
289 DependencyStrategy::Majority => satisfied_count > total_count / 2,
290 };
291
292 Ok(result)
293 }
294
295 fn is_dependency_satisfied(&self, rule: &DependencyRule) -> Result<bool> {
297 let status = self
298 .execution_status
299 .get(&rule.workflow_id)
300 .map(|entry| *entry.value())
301 .unwrap_or(ExecutionStatus::Pending);
302
303 Ok(status == rule.required_status)
304 }
305
306 pub fn update_status(&self, workflow_id: String, status: ExecutionStatus) {
308 self.execution_status.insert(workflow_id, status);
309 }
310
311 pub fn get_executable_workflows(&self) -> Result<Vec<String>> {
313 let mut executable = Vec::new();
314
315 for entry in self.dependencies.iter() {
316 let workflow_id = entry.key();
317 if self.are_dependencies_satisfied(workflow_id)? {
318 executable.push(workflow_id.clone());
319 }
320 }
321
322 Ok(executable)
323 }
324
325 pub fn get_graph(&self) -> parking_lot::RwLockReadGuard<'_, DependencyGraph> {
327 self.graph.read()
328 }
329
330 pub fn get_execution_order(&self) -> Result<Vec<String>> {
332 self.graph.read().get_execution_order()
333 }
334
335 pub fn clear(&self) {
337 self.dependencies.clear();
338 self.execution_status.clear();
339 let mut graph = self.graph.write();
340 *graph = DependencyGraph::new();
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347
348 #[test]
349 fn test_dependency_graph_creation() {
350 let mut graph = DependencyGraph::new();
351 graph.add_dependency("workflow1".to_string(), "workflow2".to_string());
352
353 assert!(graph.get_dependencies("workflow1").is_some());
354 assert_eq!(
355 graph
356 .get_dependencies("workflow1")
357 .expect("Missing deps")
358 .len(),
359 1
360 );
361 }
362
363 #[test]
364 fn test_dependency_graph_cycle_detection() {
365 let mut graph = DependencyGraph::new();
366 graph.add_dependency("workflow1".to_string(), "workflow2".to_string());
367 graph.add_dependency("workflow2".to_string(), "workflow3".to_string());
368 graph.add_dependency("workflow3".to_string(), "workflow1".to_string());
369
370 assert!(graph.has_cycle("workflow1"));
371 }
372
373 #[test]
374 fn test_dependency_graph_execution_order() {
375 let mut graph = DependencyGraph::new();
376 graph.add_dependency("workflow1".to_string(), "workflow2".to_string());
377 graph.add_dependency("workflow2".to_string(), "workflow3".to_string());
378
379 let order = graph.get_execution_order().expect("Failed to get order");
380 assert!(!order.is_empty());
381 }
382
383 #[test]
384 fn test_dependency_scheduler() {
385 let scheduler = DependencyScheduler::new(SchedulerConfig::default());
386
387 let dependency = WorkflowDependency {
388 workflow_id: "workflow1".to_string(),
389 dependencies: vec![DependencyRule {
390 workflow_id: "workflow2".to_string(),
391 required_status: ExecutionStatus::Success,
392 time_window_secs: None,
393 version_requirement: None,
394 }],
395 strategy: DependencyStrategy::All,
396 description: None,
397 };
398
399 scheduler
400 .add_dependency(dependency)
401 .expect("Failed to add dependency");
402
403 assert!(
405 !scheduler
406 .are_dependencies_satisfied("workflow1")
407 .expect("Check failed")
408 );
409
410 scheduler.update_status("workflow2".to_string(), ExecutionStatus::Success);
412
413 assert!(
415 scheduler
416 .are_dependencies_satisfied("workflow1")
417 .expect("Check failed")
418 );
419 }
420
421 #[test]
422 fn test_dependency_cycle_prevention() {
423 let scheduler = DependencyScheduler::new(SchedulerConfig::default());
424
425 let dep1 = WorkflowDependency {
426 workflow_id: "workflow1".to_string(),
427 dependencies: vec![DependencyRule {
428 workflow_id: "workflow2".to_string(),
429 required_status: ExecutionStatus::Success,
430 time_window_secs: None,
431 version_requirement: None,
432 }],
433 strategy: DependencyStrategy::All,
434 description: None,
435 };
436
437 scheduler.add_dependency(dep1).expect("Failed to add");
438
439 let dep2 = WorkflowDependency {
440 workflow_id: "workflow2".to_string(),
441 dependencies: vec![DependencyRule {
442 workflow_id: "workflow1".to_string(),
443 required_status: ExecutionStatus::Success,
444 time_window_secs: None,
445 version_requirement: None,
446 }],
447 strategy: DependencyStrategy::All,
448 description: None,
449 };
450
451 assert!(scheduler.add_dependency(dep2).is_err());
453 }
454}