runtara_workflows/
dependency_analysis.rs

1// Copyright (C) 2025 SyncMyOrders Sp. z o.o.
2// SPDX-License-Identifier: AGPL-3.0-or-later
3//! Dependency analysis utilities for workflow compilation.
4//!
5//! This module analyzes workflow definitions to determine what features,
6//! operators, and code modules are required for compilation.
7
8use serde_json::Value;
9use std::collections::{HashMap, HashSet};
10
11// ============================================================================
12// StartScenario Dependency Analysis
13// ============================================================================
14
15/// Represents a scenario reference (ID + version).
16///
17/// Used for dependency tracking and circular dependency detection.
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct ScenarioReference {
20    /// The scenario's unique identifier.
21    pub scenario_id: String,
22    /// The scenario's version number.
23    pub version: i32,
24}
25
26/// Information about a StartScenario step.
27///
28/// Extracted from the execution graph during dependency analysis.
29#[derive(Debug, Clone)]
30pub struct StartScenarioStepInfo {
31    /// The step ID in the parent workflow.
32    pub step_id: String,
33    /// The scenario ID of the child workflow to start.
34    pub child_scenario_id: String,
35    /// The version requested ("latest", "current", or explicit number).
36    pub child_version_requested: String,
37}
38
39/// Extracts all StartScenario steps from a scenario definition
40pub fn extract_start_scenario_steps(
41    execution_graph: &Value,
42) -> Result<Vec<StartScenarioStepInfo>, String> {
43    let mut steps = Vec::new();
44
45    let steps_obj = execution_graph
46        .get("steps")
47        .and_then(|v| v.as_object())
48        .ok_or_else(|| "Missing 'steps' object in execution graph".to_string())?;
49
50    for (step_id, step_def) in steps_obj {
51        if step_def.get("stepType").and_then(|v| v.as_str()) == Some("StartScenario") {
52            let child_scenario_id = step_def
53                .get("childScenarioId")
54                .and_then(|v| v.as_str())
55                .ok_or_else(|| format!("StartScenario step '{}' missing childScenarioId", step_id))?
56                .to_string();
57
58            let child_version_requested = step_def
59                .get("childVersion")
60                .ok_or_else(|| format!("StartScenario step '{}' missing childVersion", step_id))?;
61
62            // Convert childVersion to string (might be number or string)
63            let child_version_str = match child_version_requested {
64                Value::String(s) => s.clone(),
65                Value::Number(n) => n.to_string(),
66                _ => {
67                    return Err(format!(
68                        "StartScenario step '{}' has invalid childVersion type",
69                        step_id
70                    ));
71                }
72            };
73
74            steps.push(StartScenarioStepInfo {
75                step_id: step_id.clone(),
76                child_scenario_id,
77                child_version_requested: child_version_str,
78            });
79        }
80    }
81
82    Ok(steps)
83}
84
85/// Resolves a version string ("latest", "current", or explicit number) to an actual version number
86pub fn resolve_version(
87    version_str: &str,
88    latest_version: i32,
89    current_version: Option<i32>,
90) -> Result<i32, String> {
91    match version_str {
92        "latest" => Ok(latest_version),
93        "current" => current_version.ok_or_else(|| {
94            "Cannot resolve 'current' version: scenario has no current_version set".to_string()
95        }),
96        _ => version_str.parse::<i32>().map_err(|_| {
97            format!(
98                "Invalid version string '{}': must be 'latest', 'current', or a number",
99                version_str
100            )
101        }),
102    }
103}
104
105/// Represents the dependency graph for circular dependency detection
106pub struct DependencyGraph {
107    /// Map of (scenario_id, version) -> list of child (scenario_id, version) tuples
108    edges: HashMap<ScenarioReference, Vec<ScenarioReference>>,
109}
110
111impl DependencyGraph {
112    /// Create a new empty dependency graph.
113    pub fn new() -> Self {
114        Self {
115            edges: HashMap::new(),
116        }
117    }
118
119    /// Add a dependency edge from parent to child
120    pub fn add_edge(&mut self, parent: ScenarioReference, child: ScenarioReference) {
121        self.edges
122            .entry(parent)
123            .or_insert_with(Vec::new)
124            .push(child);
125    }
126
127    /// Detect circular dependencies using depth-first search
128    /// Returns Ok(()) if no cycles, or Err with the cycle path if a cycle is detected
129    pub fn detect_cycles(&self, start: &ScenarioReference) -> Result<(), Vec<ScenarioReference>> {
130        let mut visited = HashSet::new();
131        let mut path = Vec::new();
132
133        self.dfs(start, &mut visited, &mut path)
134    }
135
136    /// Depth-first search helper for cycle detection
137    fn dfs(
138        &self,
139        node: &ScenarioReference,
140        visited: &mut HashSet<ScenarioReference>,
141        path: &mut Vec<ScenarioReference>,
142    ) -> Result<(), Vec<ScenarioReference>> {
143        // Check if this node is already in the current path (cycle detected)
144        if path.contains(node) {
145            // Build the cycle path from where it starts repeating
146            let mut cycle = Vec::new();
147            let mut found_start = false;
148            for n in path.iter() {
149                if n == node {
150                    found_start = true;
151                }
152                if found_start {
153                    cycle.push(n.clone());
154                }
155            }
156            cycle.push(node.clone()); // Add the node again to show the cycle
157            return Err(cycle);
158        }
159
160        // If we've already fully explored this node, skip it
161        if visited.contains(node) {
162            return Ok(());
163        }
164
165        // Add to current path
166        path.push(node.clone());
167
168        // Visit all children
169        if let Some(children) = self.edges.get(node) {
170            for child in children {
171                self.dfs(child, visited, path)?;
172            }
173        }
174
175        // Remove from current path and mark as fully explored
176        path.pop();
177        visited.insert(node.clone());
178
179        Ok(())
180    }
181
182    /// Format a cycle path as a human-readable error message
183    pub fn format_cycle_error(cycle: &[ScenarioReference]) -> String {
184        let mut msg = String::from("Circular dependency detected:\n\nCycle path:\n");
185        for (i, node) in cycle.iter().enumerate() {
186            if i > 0 {
187                msg.push_str("  → ");
188            } else {
189                msg.push_str("  ");
190            }
191            msg.push_str(&format!("{} (v{})", node.scenario_id, node.version));
192            if i == cycle.len() - 1 {
193                msg.push_str("  ← Cycle!");
194            }
195            msg.push('\n');
196        }
197        msg.push_str("\nTo fix: Remove the StartScenario step that creates this cycle.\n");
198        msg
199    }
200}
201
202impl Default for DependencyGraph {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn test_resolve_version_latest() {
214        assert_eq!(resolve_version("latest", 5, Some(3)).unwrap(), 5);
215    }
216
217    #[test]
218    fn test_resolve_version_current() {
219        assert_eq!(resolve_version("current", 5, Some(3)).unwrap(), 3);
220    }
221
222    #[test]
223    fn test_resolve_version_current_missing() {
224        let result = resolve_version("current", 5, None);
225        assert!(result.is_err());
226        assert!(
227            result
228                .unwrap_err()
229                .contains("scenario has no current_version set")
230        );
231    }
232
233    #[test]
234    fn test_resolve_version_explicit() {
235        assert_eq!(resolve_version("42", 5, Some(3)).unwrap(), 42);
236    }
237
238    #[test]
239    fn test_resolve_version_invalid() {
240        let result = resolve_version("invalid", 5, Some(3));
241        assert!(result.is_err());
242    }
243
244    #[test]
245    fn test_no_cycles() {
246        // A → B → C (linear, no cycle)
247        let mut graph = DependencyGraph::new();
248        let a = ScenarioReference {
249            scenario_id: "a".to_string(),
250            version: 1,
251        };
252        let b = ScenarioReference {
253            scenario_id: "b".to_string(),
254            version: 1,
255        };
256        let c = ScenarioReference {
257            scenario_id: "c".to_string(),
258            version: 1,
259        };
260
261        graph.add_edge(a.clone(), b.clone());
262        graph.add_edge(b.clone(), c.clone());
263
264        assert!(graph.detect_cycles(&a).is_ok());
265    }
266
267    #[test]
268    fn test_simple_cycle() {
269        // A → B → A (cycle)
270        let mut graph = DependencyGraph::new();
271        let a = ScenarioReference {
272            scenario_id: "a".to_string(),
273            version: 1,
274        };
275        let b = ScenarioReference {
276            scenario_id: "b".to_string(),
277            version: 1,
278        };
279
280        graph.add_edge(a.clone(), b.clone());
281        graph.add_edge(b.clone(), a.clone());
282
283        let result = graph.detect_cycles(&a);
284        assert!(result.is_err());
285        let cycle = result.unwrap_err();
286        assert_eq!(cycle.len(), 3); // A → B → A
287    }
288
289    #[test]
290    fn test_self_reference() {
291        // A → A (self-reference)
292        let mut graph = DependencyGraph::new();
293        let a = ScenarioReference {
294            scenario_id: "a".to_string(),
295            version: 1,
296        };
297
298        graph.add_edge(a.clone(), a.clone());
299
300        let result = graph.detect_cycles(&a);
301        assert!(result.is_err());
302    }
303
304    #[test]
305    fn test_diamond_no_cycle() {
306        //     A
307        //    / \
308        //   B   C
309        //    \ /
310        //     D
311        let mut graph = DependencyGraph::new();
312        let a = ScenarioReference {
313            scenario_id: "a".to_string(),
314            version: 1,
315        };
316        let b = ScenarioReference {
317            scenario_id: "b".to_string(),
318            version: 1,
319        };
320        let c = ScenarioReference {
321            scenario_id: "c".to_string(),
322            version: 1,
323        };
324        let d = ScenarioReference {
325            scenario_id: "d".to_string(),
326            version: 1,
327        };
328
329        graph.add_edge(a.clone(), b.clone());
330        graph.add_edge(a.clone(), c.clone());
331        graph.add_edge(b.clone(), d.clone());
332        graph.add_edge(c.clone(), d.clone());
333
334        assert!(graph.detect_cycles(&a).is_ok());
335    }
336
337    #[test]
338    fn test_different_versions_no_cycle() {
339        // A(v1) → B(v2), B(v3) → A(v1) is NOT a cycle (different versions)
340        let mut graph = DependencyGraph::new();
341        let a_v1 = ScenarioReference {
342            scenario_id: "a".to_string(),
343            version: 1,
344        };
345        let b_v2 = ScenarioReference {
346            scenario_id: "b".to_string(),
347            version: 2,
348        };
349        let b_v3 = ScenarioReference {
350            scenario_id: "b".to_string(),
351            version: 3,
352        };
353
354        graph.add_edge(a_v1.clone(), b_v2.clone());
355        graph.add_edge(b_v3.clone(), a_v1.clone());
356
357        // Starting from a_v1 should not find a cycle (b_v3 is not in the graph)
358        assert!(graph.detect_cycles(&a_v1).is_ok());
359    }
360
361    #[test]
362    fn test_extract_start_scenario_steps() {
363        let execution_graph = serde_json::json!({
364            "steps": {
365                "step1": {
366                    "stepType": "Agent",
367                    "operatorId": "utils"
368                },
369                "step2": {
370                    "stepType": "StartScenario",
371                    "childScenarioId": "child-scenario",
372                    "childVersion": "latest"
373                },
374                "step3": {
375                    "stepType": "StartScenario",
376                    "childScenarioId": "another-child",
377                    "childVersion": 42
378                }
379            }
380        });
381
382        let steps = extract_start_scenario_steps(&execution_graph).unwrap();
383        assert_eq!(steps.len(), 2);
384
385        // Note: HashMap iteration order is not guaranteed, so we check by finding
386        let step2 = steps.iter().find(|s| s.step_id == "step2").unwrap();
387        assert_eq!(step2.child_scenario_id, "child-scenario");
388        assert_eq!(step2.child_version_requested, "latest");
389
390        let step3 = steps.iter().find(|s| s.step_id == "step3").unwrap();
391        assert_eq!(step3.child_scenario_id, "another-child");
392        assert_eq!(step3.child_version_requested, "42");
393    }
394
395    #[test]
396    fn test_extract_start_scenario_steps_missing_child_id() {
397        let execution_graph = serde_json::json!({
398            "steps": {
399                "step1": {
400                    "stepType": "StartScenario",
401                    "childVersion": "latest"
402                }
403            }
404        });
405
406        let result = extract_start_scenario_steps(&execution_graph);
407        assert!(result.is_err());
408        assert!(result.unwrap_err().contains("missing childScenarioId"));
409    }
410}