1use crate::batch::BatchOperation;
8use crate::error::Error;
9use std::collections::{HashMap, HashSet, VecDeque};
10
11pub type ExecutionOrder = Vec<usize>;
13
14pub fn resolve_execution_order(operations: &[BatchOperation]) -> Result<ExecutionOrder, Error> {
27 validate_ids(operations)?;
28
29 let id_to_index = build_id_index(operations)?;
30 let capture_var_to_op = build_capture_index(operations, &id_to_index);
31
32 let adjacency = build_adjacency(operations, &id_to_index, &capture_var_to_op)?;
33 topological_sort(operations, &adjacency)
34}
35
36#[must_use]
41pub fn has_dependencies(operations: &[BatchOperation]) -> bool {
42 operations.iter().any(|op| {
43 op.depends_on.as_ref().is_some_and(|d| !d.is_empty())
44 || op.capture.as_ref().is_some_and(|c| !c.is_empty())
45 || op.capture_append.as_ref().is_some_and(|c| !c.is_empty())
46 })
47}
48
49fn validate_ids(operations: &[BatchOperation]) -> Result<(), Error> {
53 for (i, op) in operations.iter().enumerate() {
54 let Some(context) = id_requirement_context(op) else {
55 continue;
56 };
57
58 if op.id.is_none() {
59 return Err(Error::batch_missing_id(format!(
60 "operation at index {i} uses {context} but has no id"
61 )));
62 }
63 }
64 Ok(())
65}
66
67fn id_requirement_context(op: &BatchOperation) -> Option<&'static str> {
77 let has_capture = op.capture.as_ref().is_some_and(|c| !c.is_empty());
78 let has_append = op.capture_append.as_ref().is_some_and(|c| !c.is_empty());
79 if has_capture || has_append {
80 return Some("capture");
81 }
82 if op.depends_on.as_ref().is_some_and(|d| !d.is_empty()) {
83 return Some("depends_on");
84 }
85 None
86}
87
88fn build_id_index(operations: &[BatchOperation]) -> Result<HashMap<&str, usize>, Error> {
94 let mut map = HashMap::new();
95 for (i, op) in operations.iter().enumerate() {
96 let Some(id) = op.id.as_deref() else {
97 continue;
98 };
99 if let Some(existing_idx) = map.insert(id, i) {
100 return Err(Error::validation_error(format!(
101 "Duplicate operation id '{id}': found at index {existing_idx} and {i}"
102 )));
103 }
104 }
105 Ok(map)
106}
107
108fn build_capture_index<'a>(
114 operations: &'a [BatchOperation],
115 id_to_index: &HashMap<&'a str, usize>,
116) -> HashMap<&'a str, Vec<usize>> {
117 let mut map: HashMap<&str, Vec<usize>> = HashMap::new();
118 for op in operations {
119 let Some(id) = op.id.as_deref() else {
120 continue;
121 };
122 let Some(&idx) = id_to_index.get(id) else {
123 continue;
124 };
125 if let Some(captures) = &op.capture {
126 for var_name in captures.keys() {
127 map.entry(var_name.as_str()).or_default().push(idx);
128 }
129 }
130 if let Some(appends) = &op.capture_append {
131 for var_name in appends.keys() {
132 map.entry(var_name.as_str()).or_default().push(idx);
133 }
134 }
135 }
136 map
137}
138
139fn extract_variable_references(s: &str) -> Vec<&str> {
141 let mut vars = Vec::new();
142 let mut remaining = s;
143 while let Some(start) = remaining.find("{{") {
144 let after_open = &remaining[start + 2..];
145 let Some(end) = after_open.find("}}") else {
146 break;
147 };
148 let var_name = &after_open[..end];
149 if !var_name.is_empty() {
150 vars.push(var_name);
151 }
152 remaining = &after_open[end + 2..];
153 }
154 vars
155}
156
157fn build_adjacency(
161 operations: &[BatchOperation],
162 id_to_index: &HashMap<&str, usize>,
163 capture_var_to_op: &HashMap<&str, Vec<usize>>,
164) -> Result<Vec<Vec<usize>>, Error> {
165 let n = operations.len();
166 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
167
168 for (i, op) in operations.iter().enumerate() {
169 let mut deps: HashSet<usize> = HashSet::new();
170
171 if let Some(dep_ids) = &op.depends_on {
173 for dep_id in dep_ids {
174 let &dep_idx = id_to_index.get(dep_id.as_str()).ok_or_else(|| {
175 Error::batch_missing_dependency(op.id.as_deref().unwrap_or("<unnamed>"), dep_id)
176 })?;
177 deps.insert(dep_idx);
178 }
179 }
180
181 let implicit_deps = op
185 .args
186 .iter()
187 .flat_map(|arg| extract_variable_references(arg))
188 .filter_map(|var| capture_var_to_op.get(var))
189 .flat_map(|indices| indices.iter().copied())
190 .filter(|&idx| idx != i);
191 deps.extend(implicit_deps);
192
193 for dep_idx in deps {
194 adj[dep_idx].push(i);
195 }
196 }
197
198 Ok(adj)
199}
200
201fn topological_sort(
206 operations: &[BatchOperation],
207 adj: &[Vec<usize>],
208) -> Result<ExecutionOrder, Error> {
209 let n = operations.len();
210 let mut in_degree = vec![0usize; n];
211 for successors in adj {
212 for &succ in successors {
213 in_degree[succ] += 1;
214 }
215 }
216
217 let mut queue: VecDeque<usize> = (0..n).filter(|&i| in_degree[i] == 0).collect();
219
220 let mut order = Vec::with_capacity(n);
221 while let Some(node) = queue.pop_front() {
222 order.push(node);
223 let mut successors = adj[node].clone();
225 successors.sort_unstable();
226 for succ in successors {
227 in_degree[succ] -= 1;
228 if in_degree[succ] == 0 {
229 queue.push_back(succ);
230 }
231 }
232 }
233
234 if order.len() != n {
235 let unresolved: Vec<bool> = in_degree.iter().map(|&d| d > 0).collect();
239
240 let cycle_indices = find_cycle_path(adj, &unresolved)
241 .unwrap_or_else(|| (0..n).filter(|&i| unresolved[i]).collect());
242
243 let cycle_ids: Vec<String> = cycle_indices
244 .into_iter()
245 .map(|i| {
246 operations[i]
247 .id
248 .clone()
249 .unwrap_or_else(|| format!("index {i}"))
250 })
251 .collect();
252
253 return Err(Error::batch_cycle_detected(&cycle_ids));
254 }
255
256 Ok(order)
257}
258
259fn find_cycle_path(adj: &[Vec<usize>], unresolved: &[bool]) -> Option<Vec<usize>> {
264 let n = adj.len();
265 let mut color = vec![0u8; n]; let mut stack = Vec::new();
267 let mut stack_pos: HashMap<usize, usize> = HashMap::new();
268
269 for start in 0..n {
270 if !unresolved[start] || color[start] != 0 {
271 continue;
272 }
273
274 if let Some(cycle) = dfs_cycle(
275 start,
276 adj,
277 unresolved,
278 &mut color,
279 &mut stack,
280 &mut stack_pos,
281 ) {
282 return Some(cycle);
283 }
284 }
285
286 None
287}
288
289fn dfs_cycle(
290 node: usize,
291 adj: &[Vec<usize>],
292 unresolved: &[bool],
293 color: &mut [u8],
294 stack: &mut Vec<usize>,
295 stack_pos: &mut HashMap<usize, usize>,
296) -> Option<Vec<usize>> {
297 color[node] = 1;
298 stack_pos.insert(node, stack.len());
299 stack.push(node);
300
301 let mut successors = adj[node].clone();
302 successors.sort_unstable();
303
304 for succ in successors {
305 if !unresolved[succ] {
306 continue;
307 }
308
309 match color[succ] {
310 0 => {
311 if let Some(cycle) = dfs_cycle(succ, adj, unresolved, color, stack, stack_pos) {
312 return Some(cycle);
313 }
314 }
315 1 => {
316 let start = *stack_pos
318 .get(&succ)
319 .expect("visiting node must be present in DFS stack position map");
320 let mut cycle = stack[start..].to_vec();
321 cycle.push(succ);
322 return Some(cycle);
323 }
324 _ => {}
325 }
326 }
327
328 stack.pop();
329 stack_pos.remove(&node);
330 color[node] = 2;
331 None
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use crate::batch::BatchOperation;
338 use std::collections::HashMap;
339
340 fn op(id: &str) -> BatchOperation {
341 BatchOperation {
342 id: Some(id.to_string()),
343 args: vec![],
344 ..Default::default()
345 }
346 }
347
348 fn op_with_deps(id: &str, deps: &[&str]) -> BatchOperation {
349 BatchOperation {
350 id: Some(id.to_string()),
351 args: vec![],
352 depends_on: Some(deps.iter().map(|s| (*s).to_string()).collect()),
353 ..Default::default()
354 }
355 }
356
357 fn op_with_capture(id: &str, captures: &[(&str, &str)]) -> BatchOperation {
358 let mut map = HashMap::new();
359 for &(k, v) in captures {
360 map.insert(k.to_string(), v.to_string());
361 }
362 BatchOperation {
363 id: Some(id.to_string()),
364 args: vec![],
365 capture: Some(map),
366 ..Default::default()
367 }
368 }
369
370 fn op_with_var_ref(id: &str, arg_template: &str) -> BatchOperation {
371 BatchOperation {
372 id: Some(id.to_string()),
373 args: vec![arg_template.to_string()],
374 ..Default::default()
375 }
376 }
377
378 #[test]
379 fn no_dependencies_preserves_original_order() {
380 let ops = vec![op("a"), op("b"), op("c")];
381 let order = resolve_execution_order(&ops).unwrap();
382 assert_eq!(order, vec![0, 1, 2]);
383 }
384
385 #[test]
386 fn explicit_linear_chain() {
387 let ops = vec![
388 op("create"),
389 op_with_deps("get", &["create"]),
390 op_with_deps("delete", &["get"]),
391 ];
392 let order = resolve_execution_order(&ops).unwrap();
393 assert_eq!(order, vec![0, 1, 2]);
394 }
395
396 #[test]
397 fn explicit_fan_in() {
398 let ops = vec![op("a"), op("b"), op_with_deps("c", &["a", "b"])];
400 let order = resolve_execution_order(&ops).unwrap();
401 assert!(
403 order.iter().position(|&x| x == 0).unwrap()
404 < order.iter().position(|&x| x == 2).unwrap()
405 );
406 assert!(
407 order.iter().position(|&x| x == 1).unwrap()
408 < order.iter().position(|&x| x == 2).unwrap()
409 );
410 }
411
412 #[test]
413 fn implicit_dependency_from_variable_ref() {
414 let ops = vec![
415 op_with_capture("create", &[("user_id", ".id")]),
416 op_with_var_ref("get", "--user-id={{user_id}}"),
417 ];
418 let order = resolve_execution_order(&ops).unwrap();
419 assert_eq!(order, vec![0, 1]);
420 }
421
422 #[test]
423 fn cycle_detection_two_nodes() {
424 let ops = vec![op_with_deps("a", &["b"]), op_with_deps("b", &["a"])];
425 let result = resolve_execution_order(&ops);
426 assert!(result.is_err());
427 let err = result.unwrap_err().to_string();
428 assert!(err.contains("cycle"), "expected cycle error, got: {err}");
429 assert!(
431 err.contains('a') && err.contains('b'),
432 "expected operation IDs in cycle error, got: {err}"
433 );
434 }
435
436 #[test]
437 fn cycle_detection_three_nodes() {
438 let ops = vec![
439 op_with_deps("a", &["c"]),
440 op_with_deps("b", &["a"]),
441 op_with_deps("c", &["b"]),
442 ];
443 let result = resolve_execution_order(&ops);
444 assert!(result.is_err());
445 }
446
447 #[test]
448 fn cycle_error_excludes_downstream_non_cycle_nodes() {
449 let ops = vec![
451 op_with_deps("a", &["b"]),
452 op_with_deps("b", &["a"]),
453 op_with_deps("c", &["a"]),
454 ];
455 let result = resolve_execution_order(&ops);
456 assert!(result.is_err());
457
458 let err = result.unwrap_err().to_string();
459 let reported_cycle: Vec<&str> = err
460 .rsplit(':')
461 .next()
462 .unwrap_or_default()
463 .split('→')
464 .map(str::trim)
465 .collect();
466
467 assert!(
468 reported_cycle.contains(&"a") && reported_cycle.contains(&"b"),
469 "expected cycle members a and b, got: {err}"
470 );
471 assert!(
472 !reported_cycle.contains(&"c"),
473 "expected downstream node c to be excluded from cycle report, got: {err}"
474 );
475 }
476
477 #[test]
478 fn missing_dependency_reference() {
479 let ops = vec![op("a"), op_with_deps("b", &["nonexistent"])];
480 let result = resolve_execution_order(&ops);
481 assert!(result.is_err());
482 let err = result.unwrap_err().to_string();
483 assert!(
484 err.contains("nonexistent"),
485 "expected missing dep error, got: {err}"
486 );
487 }
488
489 #[test]
490 fn duplicate_ids_rejected() {
491 let ops = vec![op("dup"), op("dup")];
492 let result = resolve_execution_order(&ops);
493 assert!(result.is_err());
494 let err = result.unwrap_err().to_string();
495 assert!(
496 err.contains("Duplicate operation id 'dup'"),
497 "expected duplicate id error, got: {err}"
498 );
499 }
500
501 #[test]
502 fn missing_id_on_capture_operation() {
503 let op = BatchOperation {
504 capture: Some(HashMap::from([("x".into(), ".id".into())])),
505 ..Default::default()
506 };
507 let ops = vec![op];
508 let result = resolve_execution_order(&ops);
509 assert!(result.is_err());
510 let err = result.unwrap_err().to_string();
511 assert!(
512 err.contains("no id"),
513 "expected missing id error, got: {err}"
514 );
515 }
516
517 #[test]
518 fn missing_id_on_depends_on_operation() {
519 let op = BatchOperation {
520 depends_on: Some(vec!["other".into()]),
521 ..Default::default()
522 };
523 let ops = vec![op];
524 let result = resolve_execution_order(&ops);
525 assert!(result.is_err());
526 }
527
528 #[test]
529 fn variable_consumer_without_id_is_valid() {
530 let ops = vec![
534 op_with_capture("producer", &[("uid", ".id")]),
535 BatchOperation {
536 args: vec!["--id".into(), "{{uid}}".into()],
537 ..Default::default()
538 },
539 ];
540 let order = resolve_execution_order(&ops).unwrap();
541 assert!(
543 order.iter().position(|&x| x == 0).unwrap()
544 < order.iter().position(|&x| x == 1).unwrap()
545 );
546 }
547
548 #[test]
549 fn has_dependencies_returns_false_for_simple_batch() {
550 let ops = vec![op("a"), op("b")];
551 assert!(!has_dependencies(&ops));
552 }
553
554 #[test]
555 fn has_dependencies_returns_true_for_capture() {
556 let ops = vec![op_with_capture("a", &[("x", ".id")])];
557 assert!(has_dependencies(&ops));
558 }
559
560 #[test]
561 fn has_dependencies_returns_true_for_depends_on() {
562 let ops = vec![op_with_deps("a", &["b"])];
563 assert!(has_dependencies(&ops));
564 }
565
566 #[test]
567 fn has_dependencies_returns_false_for_bare_variable_ref() {
568 let ops = vec![op_with_var_ref("a", "{{some_var}}")];
572 assert!(!has_dependencies(&ops));
573 }
574
575 #[test]
576 fn has_dependencies_returns_false_for_empty_maps_and_vecs() {
577 let ops = vec![BatchOperation {
580 id: Some("a".into()),
581 args: vec![],
582 capture: Some(HashMap::new()),
583 capture_append: Some(HashMap::new()),
584 depends_on: Some(vec![]),
585 ..Default::default()
586 }];
587 assert!(!has_dependencies(&ops));
588 }
589
590 #[test]
591 fn empty_capture_does_not_require_id() {
592 let ops = vec![BatchOperation {
594 args: vec![],
595 capture: Some(HashMap::new()),
596 ..Default::default()
597 }];
598 let order = resolve_execution_order(&ops).unwrap();
600 assert_eq!(order, vec![0]);
601 }
602
603 #[test]
604 fn has_dependencies_returns_true_for_variable_ref_with_capture_provider() {
605 let ops = vec![
606 op_with_capture("producer", &[("some_var", ".id")]),
607 op_with_var_ref("consumer", "{{some_var}}"),
608 ];
609 assert!(has_dependencies(&ops));
610 }
611
612 #[test]
613 fn extract_variable_references_basic() {
614 let vars = extract_variable_references("--id={{user_id}}");
615 assert_eq!(vars, vec!["user_id"]);
616 }
617
618 #[test]
619 fn extract_variable_references_multiple() {
620 let vars = extract_variable_references("{{a}} and {{b}}");
621 assert_eq!(vars, vec!["a", "b"]);
622 }
623
624 #[test]
625 fn extract_variable_references_none() {
626 let vars = extract_variable_references("no variables here");
627 assert!(vars.is_empty());
628 }
629
630 #[test]
631 fn extract_variable_references_unclosed() {
632 let vars = extract_variable_references("{{unclosed");
633 assert!(vars.is_empty());
634 }
635
636 #[test]
637 fn capture_append_creates_implicit_dependency() {
638 let append_op = BatchOperation {
639 id: Some("beat-1".into()),
640 args: vec![],
641 capture_append: Some(HashMap::from([("ids".into(), ".id".into())])),
642 ..Default::default()
643 };
644 let consumer = op_with_var_ref("final", "{{ids}}");
645 let ops = vec![append_op, consumer];
646 let order = resolve_execution_order(&ops).unwrap();
647 assert_eq!(order, vec![0, 1]);
648 }
649
650 #[test]
651 fn capture_append_multiple_providers_all_become_implicit_deps() {
652 let beat_1 = BatchOperation {
655 id: Some("beat-1".into()),
656 args: vec![],
657 capture_append: Some(HashMap::from([("ids".into(), ".id".into())])),
658 ..Default::default()
659 };
660 let beat_2 = BatchOperation {
661 id: Some("beat-2".into()),
662 args: vec![],
663 capture_append: Some(HashMap::from([("ids".into(), ".id".into())])),
664 ..Default::default()
665 };
666 let consumer = op_with_var_ref("aggregate", "{{ids}}");
667 let ops = vec![beat_1, beat_2, consumer];
668 let order = resolve_execution_order(&ops).unwrap();
669 let pos = |idx: usize| order.iter().position(|&x| x == idx).unwrap();
670 assert!(pos(0) < pos(2), "beat-1 should precede aggregate");
672 assert!(pos(1) < pos(2), "beat-2 should precede aggregate");
673 }
674
675 #[test]
676 fn diamond_dependency() {
677 let ops = vec![
679 op("a"),
680 op_with_deps("b", &["a"]),
681 op_with_deps("c", &["a"]),
682 op_with_deps("d", &["b", "c"]),
683 ];
684 let order = resolve_execution_order(&ops).unwrap();
685 let pos = |id: usize| order.iter().position(|&x| x == id).unwrap();
686 assert!(pos(0) < pos(1));
687 assert!(pos(0) < pos(2));
688 assert!(pos(1) < pos(3));
689 assert!(pos(2) < pos(3));
690 }
691}