Skip to main content

aperture_cli/batch/
graph.rs

1//! Dependency graph construction, validation, and topological sorting for batch operations.
2//!
3//! Builds a directed acyclic graph from `depends_on` edges and implicit variable
4//! references, validates structure, detects cycles, and produces a topological
5//! execution order via Kahn's algorithm.
6
7use crate::batch::BatchOperation;
8use crate::error::Error;
9use std::collections::{HashMap, HashSet, VecDeque};
10
11/// Result of dependency graph resolution: an ordered list of operation indices.
12pub type ExecutionOrder = Vec<usize>;
13
14/// Validates the batch operations and returns a topological execution order.
15///
16/// # Validation rules
17///
18/// 1. Every operation that uses `capture`, `capture_append`, or `depends_on` must have an `id`.
19/// 2. Every `depends_on` reference must point to an existing operation `id`.
20/// 3. Implicit dependencies from `{{variable}}` usage are inferred from `capture`/`capture_append`.
21/// 4. The resulting dependency graph must be acyclic.
22///
23/// # Errors
24///
25/// Returns an error if any validation rule is violated or a cycle is detected.
26pub fn resolve_execution_order(operations: &[BatchOperation]) -> Result<ExecutionOrder, Error> {
27    validate_ids(operations)?;
28
29    let id_to_index = build_id_index(operations)?;
30    let capture_var_to_op = build_capture_index(operations, &id_to_index);
31
32    let adjacency = build_adjacency(operations, &id_to_index, &capture_var_to_op)?;
33    topological_sort(operations, &adjacency)
34}
35
36/// Checks whether the batch uses any dependency features.
37///
38/// Empty maps/vecs (e.g. `capture: {}`, `depends_on: []`) are treated as absent
39/// so they don't accidentally trigger dependent mode.
40#[must_use]
41pub fn has_dependencies(operations: &[BatchOperation]) -> bool {
42    operations.iter().any(|op| {
43        op.depends_on.as_ref().is_some_and(|d| !d.is_empty())
44            || op.capture.as_ref().is_some_and(|c| !c.is_empty())
45            || op.capture_append.as_ref().is_some_and(|c| !c.is_empty())
46    })
47}
48
49// ── Validation ──────────────────────────────────────────────────────
50
51/// Ensures every operation that requires an `id` has one.
52fn validate_ids(operations: &[BatchOperation]) -> Result<(), Error> {
53    for (i, op) in operations.iter().enumerate() {
54        let Some(context) = id_requirement_context(op) else {
55            continue;
56        };
57
58        if op.id.is_none() {
59            return Err(Error::batch_missing_id(format!(
60                "operation at index {i} uses {context} but has no id"
61            )));
62        }
63    }
64    Ok(())
65}
66
67/// Returns the reason an operation needs an `id`, or `None` if it doesn't.
68///
69/// Only operations that *produce* captured values or declare explicit dependencies
70/// require an ID. Operations that merely *consume* variables via `{{var}}` in args
71/// do not need an ID — implicit dependency edges use the operation's index, and
72/// omitting the ID requirement here avoids false positives when literal `{{`
73/// appears in argument strings (e.g. JSON bodies with template syntax).
74///
75/// Empty maps/vecs are treated as absent, consistent with [`has_dependencies`].
76fn id_requirement_context(op: &BatchOperation) -> Option<&'static str> {
77    let has_capture = op.capture.as_ref().is_some_and(|c| !c.is_empty());
78    let has_append = op.capture_append.as_ref().is_some_and(|c| !c.is_empty());
79    if has_capture || has_append {
80        return Some("capture");
81    }
82    if op.depends_on.as_ref().is_some_and(|d| !d.is_empty()) {
83        return Some("depends_on");
84    }
85    None
86}
87
88/// Builds a map from operation `id` → index in the operations slice.
89///
90/// # Errors
91///
92/// Returns an error if two or more operations share the same `id`.
93fn build_id_index(operations: &[BatchOperation]) -> Result<HashMap<&str, usize>, Error> {
94    let mut map = HashMap::new();
95    for (i, op) in operations.iter().enumerate() {
96        let Some(id) = op.id.as_deref() else {
97            continue;
98        };
99        if let Some(existing_idx) = map.insert(id, i) {
100            return Err(Error::validation_error(format!(
101                "Duplicate operation id '{id}': found at index {existing_idx} and {i}"
102            )));
103        }
104    }
105    Ok(map)
106}
107
108/// Builds a map from captured variable name → indices of all operations that capture it.
109///
110/// For `capture` (scalar) variables there is typically one provider.
111/// For `capture_append` (list) variables there may be many — all must complete
112/// before a consumer can safely interpolate the accumulated list.
113fn build_capture_index<'a>(
114    operations: &'a [BatchOperation],
115    id_to_index: &HashMap<&'a str, usize>,
116) -> HashMap<&'a str, Vec<usize>> {
117    let mut map: HashMap<&str, Vec<usize>> = HashMap::new();
118    for op in operations {
119        let Some(id) = op.id.as_deref() else {
120            continue;
121        };
122        let Some(&idx) = id_to_index.get(id) else {
123            continue;
124        };
125        if let Some(captures) = &op.capture {
126            for var_name in captures.keys() {
127                map.entry(var_name.as_str()).or_default().push(idx);
128            }
129        }
130        if let Some(appends) = &op.capture_append {
131            for var_name in appends.keys() {
132                map.entry(var_name.as_str()).or_default().push(idx);
133            }
134        }
135    }
136    map
137}
138
139/// Extracts variable names referenced in `{{name}}` patterns from a string.
140fn extract_variable_references(s: &str) -> Vec<&str> {
141    let mut vars = Vec::new();
142    let mut remaining = s;
143    while let Some(start) = remaining.find("{{") {
144        let after_open = &remaining[start + 2..];
145        let Some(end) = after_open.find("}}") else {
146            break;
147        };
148        let var_name = &after_open[..end];
149        if !var_name.is_empty() {
150            vars.push(var_name);
151        }
152        remaining = &after_open[end + 2..];
153    }
154    vars
155}
156
157/// Builds the adjacency list (edges: dependency → dependent).
158///
159/// An edge from `a` to `b` means `a` must execute before `b`.
160fn build_adjacency(
161    operations: &[BatchOperation],
162    id_to_index: &HashMap<&str, usize>,
163    capture_var_to_op: &HashMap<&str, Vec<usize>>,
164) -> Result<Vec<Vec<usize>>, Error> {
165    let n = operations.len();
166    let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
167
168    for (i, op) in operations.iter().enumerate() {
169        let mut deps: HashSet<usize> = HashSet::new();
170
171        // Explicit depends_on
172        if let Some(dep_ids) = &op.depends_on {
173            for dep_id in dep_ids {
174                let &dep_idx = id_to_index.get(dep_id.as_str()).ok_or_else(|| {
175                    Error::batch_missing_dependency(op.id.as_deref().unwrap_or("<unnamed>"), dep_id)
176                })?;
177                deps.insert(dep_idx);
178            }
179        }
180
181        // Implicit dependencies from variable references in args.
182        // For capture_append variables with multiple providers, this
183        // correctly adds edges from ALL providers to the consumer.
184        let implicit_deps = op
185            .args
186            .iter()
187            .flat_map(|arg| extract_variable_references(arg))
188            .filter_map(|var| capture_var_to_op.get(var))
189            .flat_map(|indices| indices.iter().copied())
190            .filter(|&idx| idx != i);
191        deps.extend(implicit_deps);
192
193        for dep_idx in deps {
194            adj[dep_idx].push(i);
195        }
196    }
197
198    Ok(adj)
199}
200
201/// Kahn's algorithm for topological sorting with cycle detection.
202///
203/// Returns indices in execution order. Operations with no dependencies
204/// preserve their original relative order.
205fn topological_sort(
206    operations: &[BatchOperation],
207    adj: &[Vec<usize>],
208) -> Result<ExecutionOrder, Error> {
209    let n = operations.len();
210    let mut in_degree = vec![0usize; n];
211    for successors in adj {
212        for &succ in successors {
213            in_degree[succ] += 1;
214        }
215    }
216
217    // Seed queue with zero-in-degree nodes in original order
218    let mut queue: VecDeque<usize> = (0..n).filter(|&i| in_degree[i] == 0).collect();
219
220    let mut order = Vec::with_capacity(n);
221    while let Some(node) = queue.pop_front() {
222        order.push(node);
223        // Sort successors to preserve original order among siblings
224        let mut successors = adj[node].clone();
225        successors.sort_unstable();
226        for succ in successors {
227            in_degree[succ] -= 1;
228            if in_degree[succ] == 0 {
229                queue.push_back(succ);
230            }
231        }
232    }
233
234    if order.len() != n {
235        // Build the unresolved subgraph (nodes with in-degree > 0 after Kahn).
236        // This includes cycle members and possibly downstream nodes blocked by
237        // those cycles; we then extract one concrete cycle path from that subgraph.
238        let unresolved: Vec<bool> = in_degree.iter().map(|&d| d > 0).collect();
239
240        let cycle_indices = find_cycle_path(adj, &unresolved)
241            .unwrap_or_else(|| (0..n).filter(|&i| unresolved[i]).collect());
242
243        let cycle_ids: Vec<String> = cycle_indices
244            .into_iter()
245            .map(|i| {
246                operations[i]
247                    .id
248                    .clone()
249                    .unwrap_or_else(|| format!("index {i}"))
250            })
251            .collect();
252
253        return Err(Error::batch_cycle_detected(&cycle_ids));
254    }
255
256    Ok(order)
257}
258
259/// Finds one concrete directed cycle in the unresolved subgraph.
260///
261/// Returns node indices describing the cycle path. The start node is repeated
262/// at the end (e.g., `[a, b, a]`) to make the loop explicit in error messages.
263fn find_cycle_path(adj: &[Vec<usize>], unresolved: &[bool]) -> Option<Vec<usize>> {
264    let n = adj.len();
265    let mut color = vec![0u8; n]; // 0 = unvisited, 1 = visiting, 2 = done
266    let mut stack = Vec::new();
267    let mut stack_pos: HashMap<usize, usize> = HashMap::new();
268
269    for start in 0..n {
270        if !unresolved[start] || color[start] != 0 {
271            continue;
272        }
273
274        if let Some(cycle) = dfs_cycle(
275            start,
276            adj,
277            unresolved,
278            &mut color,
279            &mut stack,
280            &mut stack_pos,
281        ) {
282            return Some(cycle);
283        }
284    }
285
286    None
287}
288
289fn dfs_cycle(
290    node: usize,
291    adj: &[Vec<usize>],
292    unresolved: &[bool],
293    color: &mut [u8],
294    stack: &mut Vec<usize>,
295    stack_pos: &mut HashMap<usize, usize>,
296) -> Option<Vec<usize>> {
297    color[node] = 1;
298    stack_pos.insert(node, stack.len());
299    stack.push(node);
300
301    let mut successors = adj[node].clone();
302    successors.sort_unstable();
303
304    for succ in successors {
305        if !unresolved[succ] {
306            continue;
307        }
308
309        match color[succ] {
310            0 => {
311                if let Some(cycle) = dfs_cycle(succ, adj, unresolved, color, stack, stack_pos) {
312                    return Some(cycle);
313                }
314            }
315            1 => {
316                // Back-edge to an ancestor in the DFS stack => concrete cycle.
317                let start = *stack_pos
318                    .get(&succ)
319                    .expect("visiting node must be present in DFS stack position map");
320                let mut cycle = stack[start..].to_vec();
321                cycle.push(succ);
322                return Some(cycle);
323            }
324            _ => {}
325        }
326    }
327
328    stack.pop();
329    stack_pos.remove(&node);
330    color[node] = 2;
331    None
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use crate::batch::BatchOperation;
338    use std::collections::HashMap;
339
340    fn op(id: &str) -> BatchOperation {
341        BatchOperation {
342            id: Some(id.to_string()),
343            args: vec![],
344            ..Default::default()
345        }
346    }
347
348    fn op_with_deps(id: &str, deps: &[&str]) -> BatchOperation {
349        BatchOperation {
350            id: Some(id.to_string()),
351            args: vec![],
352            depends_on: Some(deps.iter().map(|s| (*s).to_string()).collect()),
353            ..Default::default()
354        }
355    }
356
357    fn op_with_capture(id: &str, captures: &[(&str, &str)]) -> BatchOperation {
358        let mut map = HashMap::new();
359        for &(k, v) in captures {
360            map.insert(k.to_string(), v.to_string());
361        }
362        BatchOperation {
363            id: Some(id.to_string()),
364            args: vec![],
365            capture: Some(map),
366            ..Default::default()
367        }
368    }
369
370    fn op_with_var_ref(id: &str, arg_template: &str) -> BatchOperation {
371        BatchOperation {
372            id: Some(id.to_string()),
373            args: vec![arg_template.to_string()],
374            ..Default::default()
375        }
376    }
377
378    #[test]
379    fn no_dependencies_preserves_original_order() {
380        let ops = vec![op("a"), op("b"), op("c")];
381        let order = resolve_execution_order(&ops).unwrap();
382        assert_eq!(order, vec![0, 1, 2]);
383    }
384
385    #[test]
386    fn explicit_linear_chain() {
387        let ops = vec![
388            op("create"),
389            op_with_deps("get", &["create"]),
390            op_with_deps("delete", &["get"]),
391        ];
392        let order = resolve_execution_order(&ops).unwrap();
393        assert_eq!(order, vec![0, 1, 2]);
394    }
395
396    #[test]
397    fn explicit_fan_in() {
398        // a and b are independent; c depends on both
399        let ops = vec![op("a"), op("b"), op_with_deps("c", &["a", "b"])];
400        let order = resolve_execution_order(&ops).unwrap();
401        // a and b before c
402        assert!(
403            order.iter().position(|&x| x == 0).unwrap()
404                < order.iter().position(|&x| x == 2).unwrap()
405        );
406        assert!(
407            order.iter().position(|&x| x == 1).unwrap()
408                < order.iter().position(|&x| x == 2).unwrap()
409        );
410    }
411
412    #[test]
413    fn implicit_dependency_from_variable_ref() {
414        let ops = vec![
415            op_with_capture("create", &[("user_id", ".id")]),
416            op_with_var_ref("get", "--user-id={{user_id}}"),
417        ];
418        let order = resolve_execution_order(&ops).unwrap();
419        assert_eq!(order, vec![0, 1]);
420    }
421
422    #[test]
423    fn cycle_detection_two_nodes() {
424        let ops = vec![op_with_deps("a", &["b"]), op_with_deps("b", &["a"])];
425        let result = resolve_execution_order(&ops);
426        assert!(result.is_err());
427        let err = result.unwrap_err().to_string();
428        assert!(err.contains("cycle"), "expected cycle error, got: {err}");
429        // Error should reference operation IDs, not just indices
430        assert!(
431            err.contains('a') && err.contains('b'),
432            "expected operation IDs in cycle error, got: {err}"
433        );
434    }
435
436    #[test]
437    fn cycle_detection_three_nodes() {
438        let ops = vec![
439            op_with_deps("a", &["c"]),
440            op_with_deps("b", &["a"]),
441            op_with_deps("c", &["b"]),
442        ];
443        let result = resolve_execution_order(&ops);
444        assert!(result.is_err());
445    }
446
447    #[test]
448    fn cycle_error_excludes_downstream_non_cycle_nodes() {
449        // a <-> b is the cycle; c depends on a but is not part of the cycle.
450        let ops = vec![
451            op_with_deps("a", &["b"]),
452            op_with_deps("b", &["a"]),
453            op_with_deps("c", &["a"]),
454        ];
455        let result = resolve_execution_order(&ops);
456        assert!(result.is_err());
457
458        let err = result.unwrap_err().to_string();
459        let reported_cycle: Vec<&str> = err
460            .rsplit(':')
461            .next()
462            .unwrap_or_default()
463            .split('→')
464            .map(str::trim)
465            .collect();
466
467        assert!(
468            reported_cycle.contains(&"a") && reported_cycle.contains(&"b"),
469            "expected cycle members a and b, got: {err}"
470        );
471        assert!(
472            !reported_cycle.contains(&"c"),
473            "expected downstream node c to be excluded from cycle report, got: {err}"
474        );
475    }
476
477    #[test]
478    fn missing_dependency_reference() {
479        let ops = vec![op("a"), op_with_deps("b", &["nonexistent"])];
480        let result = resolve_execution_order(&ops);
481        assert!(result.is_err());
482        let err = result.unwrap_err().to_string();
483        assert!(
484            err.contains("nonexistent"),
485            "expected missing dep error, got: {err}"
486        );
487    }
488
489    #[test]
490    fn duplicate_ids_rejected() {
491        let ops = vec![op("dup"), op("dup")];
492        let result = resolve_execution_order(&ops);
493        assert!(result.is_err());
494        let err = result.unwrap_err().to_string();
495        assert!(
496            err.contains("Duplicate operation id 'dup'"),
497            "expected duplicate id error, got: {err}"
498        );
499    }
500
501    #[test]
502    fn missing_id_on_capture_operation() {
503        let op = BatchOperation {
504            capture: Some(HashMap::from([("x".into(), ".id".into())])),
505            ..Default::default()
506        };
507        let ops = vec![op];
508        let result = resolve_execution_order(&ops);
509        assert!(result.is_err());
510        let err = result.unwrap_err().to_string();
511        assert!(
512            err.contains("no id"),
513            "expected missing id error, got: {err}"
514        );
515    }
516
517    #[test]
518    fn missing_id_on_depends_on_operation() {
519        let op = BatchOperation {
520            depends_on: Some(vec!["other".into()]),
521            ..Default::default()
522        };
523        let ops = vec![op];
524        let result = resolve_execution_order(&ops);
525        assert!(result.is_err());
526    }
527
528    #[test]
529    fn variable_consumer_without_id_is_valid() {
530        // An operation that references {{var}} without an id should pass
531        // validation — only producers (capture/capture_append) and explicit
532        // depends_on users need IDs.
533        let ops = vec![
534            op_with_capture("producer", &[("uid", ".id")]),
535            BatchOperation {
536                args: vec!["--id".into(), "{{uid}}".into()],
537                ..Default::default()
538            },
539        ];
540        let order = resolve_execution_order(&ops).unwrap();
541        // Producer before consumer
542        assert!(
543            order.iter().position(|&x| x == 0).unwrap()
544                < order.iter().position(|&x| x == 1).unwrap()
545        );
546    }
547
548    #[test]
549    fn has_dependencies_returns_false_for_simple_batch() {
550        let ops = vec![op("a"), op("b")];
551        assert!(!has_dependencies(&ops));
552    }
553
554    #[test]
555    fn has_dependencies_returns_true_for_capture() {
556        let ops = vec![op_with_capture("a", &[("x", ".id")])];
557        assert!(has_dependencies(&ops));
558    }
559
560    #[test]
561    fn has_dependencies_returns_true_for_depends_on() {
562        let ops = vec![op_with_deps("a", &["b"])];
563        assert!(has_dependencies(&ops));
564    }
565
566    #[test]
567    fn has_dependencies_returns_false_for_bare_variable_ref() {
568        // A {{var}} reference without any capture/capture_append/depends_on
569        // should NOT trigger the dependent path — preserves backward compat
570        // for literal {{ in arguments.
571        let ops = vec![op_with_var_ref("a", "{{some_var}}")];
572        assert!(!has_dependencies(&ops));
573    }
574
575    #[test]
576    fn has_dependencies_returns_false_for_empty_maps_and_vecs() {
577        // Empty capture/capture_append/depends_on should not trigger dependent mode.
578        // This prevents confusing "missing id" errors when users write e.g. `capture: {}`.
579        let ops = vec![BatchOperation {
580            id: Some("a".into()),
581            args: vec![],
582            capture: Some(HashMap::new()),
583            capture_append: Some(HashMap::new()),
584            depends_on: Some(vec![]),
585            ..Default::default()
586        }];
587        assert!(!has_dependencies(&ops));
588    }
589
590    #[test]
591    fn empty_capture_does_not_require_id() {
592        // An operation with capture: {} (empty map) should not require an id.
593        let ops = vec![BatchOperation {
594            args: vec![],
595            capture: Some(HashMap::new()),
596            ..Default::default()
597        }];
598        // Should succeed — empty capture is treated as absent
599        let order = resolve_execution_order(&ops).unwrap();
600        assert_eq!(order, vec![0]);
601    }
602
603    #[test]
604    fn has_dependencies_returns_true_for_variable_ref_with_capture_provider() {
605        let ops = vec![
606            op_with_capture("producer", &[("some_var", ".id")]),
607            op_with_var_ref("consumer", "{{some_var}}"),
608        ];
609        assert!(has_dependencies(&ops));
610    }
611
612    #[test]
613    fn extract_variable_references_basic() {
614        let vars = extract_variable_references("--id={{user_id}}");
615        assert_eq!(vars, vec!["user_id"]);
616    }
617
618    #[test]
619    fn extract_variable_references_multiple() {
620        let vars = extract_variable_references("{{a}} and {{b}}");
621        assert_eq!(vars, vec!["a", "b"]);
622    }
623
624    #[test]
625    fn extract_variable_references_none() {
626        let vars = extract_variable_references("no variables here");
627        assert!(vars.is_empty());
628    }
629
630    #[test]
631    fn extract_variable_references_unclosed() {
632        let vars = extract_variable_references("{{unclosed");
633        assert!(vars.is_empty());
634    }
635
636    #[test]
637    fn capture_append_creates_implicit_dependency() {
638        let append_op = BatchOperation {
639            id: Some("beat-1".into()),
640            args: vec![],
641            capture_append: Some(HashMap::from([("ids".into(), ".id".into())])),
642            ..Default::default()
643        };
644        let consumer = op_with_var_ref("final", "{{ids}}");
645        let ops = vec![append_op, consumer];
646        let order = resolve_execution_order(&ops).unwrap();
647        assert_eq!(order, vec![0, 1]);
648    }
649
650    #[test]
651    fn capture_append_multiple_providers_all_become_implicit_deps() {
652        // Two providers capture_append into the same list; consumer references
653        // {{ids}} without explicit depends_on. Both providers must run first.
654        let beat_1 = BatchOperation {
655            id: Some("beat-1".into()),
656            args: vec![],
657            capture_append: Some(HashMap::from([("ids".into(), ".id".into())])),
658            ..Default::default()
659        };
660        let beat_2 = BatchOperation {
661            id: Some("beat-2".into()),
662            args: vec![],
663            capture_append: Some(HashMap::from([("ids".into(), ".id".into())])),
664            ..Default::default()
665        };
666        let consumer = op_with_var_ref("aggregate", "{{ids}}");
667        let ops = vec![beat_1, beat_2, consumer];
668        let order = resolve_execution_order(&ops).unwrap();
669        let pos = |idx: usize| order.iter().position(|&x| x == idx).unwrap();
670        // Both providers must appear before the consumer
671        assert!(pos(0) < pos(2), "beat-1 should precede aggregate");
672        assert!(pos(1) < pos(2), "beat-2 should precede aggregate");
673    }
674
675    #[test]
676    fn diamond_dependency() {
677        // a -> b, a -> c, b -> d, c -> d
678        let ops = vec![
679            op("a"),
680            op_with_deps("b", &["a"]),
681            op_with_deps("c", &["a"]),
682            op_with_deps("d", &["b", "c"]),
683        ];
684        let order = resolve_execution_order(&ops).unwrap();
685        let pos = |id: usize| order.iter().position(|&x| x == id).unwrap();
686        assert!(pos(0) < pos(1));
687        assert!(pos(0) < pos(2));
688        assert!(pos(1) < pos(3));
689        assert!(pos(2) < pos(3));
690    }
691}