use std::collections::VecDeque;
use zeph_common::fidelity::PlannedToolHint;
use super::error::OrchestrationError;
use super::graph::{FailureStrategy, GraphStatus, TaskGraph, TaskId, TaskNode, TaskStatus};
use super::verify_predicate::PredicateOutcome;
pub fn validate(tasks: &[TaskNode], max_tasks: usize) -> Result<(), OrchestrationError> {
if tasks.len() > max_tasks {
return Err(OrchestrationError::InvalidGraph(format!(
"graph has {} tasks, exceeding the limit of {max_tasks}",
tasks.len()
)));
}
if tasks.is_empty() {
return Err(OrchestrationError::InvalidGraph(
"graph has no tasks".to_string(),
));
}
for (i, task) in tasks.iter().enumerate() {
let expected = u32::try_from(i).map_err(|_| {
OrchestrationError::InvalidGraph(format!("task index {i} overflows u32"))
})?;
if task.id != TaskId(expected) {
return Err(OrchestrationError::InvalidGraph(format!(
"task at index {i} has id {task_id} (expected {i})",
task_id = task.id
)));
}
for dep in &task.depends_on {
if *dep == task.id {
return Err(OrchestrationError::InvalidGraph(format!(
"task {i} has a self-reference"
)));
}
if dep.index() >= tasks.len() {
return Err(OrchestrationError::InvalidGraph(format!(
"task {i} references non-existent task {dep}"
)));
}
}
}
let sorted = toposort(tasks)?;
let has_root = tasks.iter().any(|t| t.depends_on.is_empty());
if !has_root {
return Err(OrchestrationError::CycleDetected);
}
let _ = sorted;
Ok(())
}
pub fn toposort(tasks: &[TaskNode]) -> Result<Vec<TaskId>, OrchestrationError> {
let n = tasks.len();
let mut in_degree = vec![0u32; n];
for task in tasks {
in_degree[task.id.index()] = u32::try_from(task.depends_on.len()).map_err(|_| {
OrchestrationError::InvalidGraph("dependency count overflows u32".to_string())
})?;
}
let mut queue: VecDeque<TaskId> = in_degree
.iter()
.enumerate()
.filter(|(_, d)| **d == 0)
.map(|(i, _)| u32::try_from(i).map(TaskId))
.collect::<Result<_, _>>()
.map_err(|_| OrchestrationError::InvalidGraph("task index overflows u32".to_string()))?;
let mut dependents: Vec<Vec<TaskId>> = vec![Vec::new(); n];
for task in tasks {
for dep in &task.depends_on {
dependents[dep.index()].push(task.id);
}
}
let mut order = Vec::with_capacity(n);
while let Some(id) = queue.pop_front() {
order.push(id);
for &dep_id in &dependents[id.index()] {
in_degree[dep_id.index()] -= 1;
if in_degree[dep_id.index()] == 0 {
queue.push_back(dep_id);
}
}
}
if order.len() != n {
return Err(OrchestrationError::CycleDetected);
}
Ok(order)
}
fn all_parents_predicate_clear(task: &TaskNode, graph: &TaskGraph) -> bool {
task.depends_on.iter().all(|parent_id| {
let parent = &graph.tasks[parent_id.index()];
matches!(
(&parent.verify_predicate, &parent.predicate_outcome),
(None, _)
| (Some(_), Some(PredicateOutcome { passed: true, .. }))
)
})
}
#[must_use]
pub fn ready_tasks(graph: &TaskGraph) -> Vec<TaskId> {
graph
.tasks
.iter()
.filter_map(|task| {
match task.status {
TaskStatus::Ready => {
if all_parents_predicate_clear(task, graph) {
Some(task.id)
} else {
None
}
}
TaskStatus::Pending => {
let all_deps_done = task
.depends_on
.iter()
.all(|dep_id| graph.tasks[dep_id.index()].status == TaskStatus::Completed);
if all_deps_done && all_parents_predicate_clear(task, graph) {
Some(task.id)
} else {
None
}
}
_ => None,
}
})
.collect()
}
pub fn propagate_failure(
graph: &mut TaskGraph,
failed_id: TaskId,
rev_adj: &[Vec<TaskId>],
) -> Vec<TaskId> {
if graph.tasks[failed_id.index()].status != TaskStatus::Failed {
return Vec::new();
}
let strategy = graph.tasks[failed_id.index()]
.failure_strategy
.unwrap_or(graph.default_failure_strategy);
let max_retries = graph.tasks[failed_id.index()]
.max_retries
.unwrap_or(graph.default_max_retries);
match strategy {
FailureStrategy::Abort => {
graph.status = GraphStatus::Failed;
graph
.tasks
.iter()
.filter(|t| t.status == TaskStatus::Running)
.map(|t| t.id)
.collect()
}
FailureStrategy::Skip => {
graph.tasks[failed_id.index()].status = TaskStatus::Skipped;
let mut to_cancel = Vec::new();
let mut queue: VecDeque<TaskId> = VecDeque::new();
queue.push_back(failed_id);
while let Some(current) = queue.pop_front() {
let dependents = rev_adj.get(current.index()).map_or(&[] as &[TaskId], |v| v);
for &dep_id in dependents {
if !graph.tasks[dep_id.index()].status.is_terminal() {
if graph.tasks[dep_id.index()].status == TaskStatus::Running {
to_cancel.push(dep_id);
}
graph.tasks[dep_id.index()].status = TaskStatus::Skipped;
queue.push_back(dep_id);
}
}
}
to_cancel
}
FailureStrategy::Retry => {
let retry_count = graph.tasks[failed_id.index()].retry_count;
if retry_count < max_retries {
graph.tasks[failed_id.index()].retry_count += 1;
graph.tasks[failed_id.index()].status = TaskStatus::Ready;
Vec::new()
} else {
graph.status = GraphStatus::Failed;
graph
.tasks
.iter()
.filter(|t| t.status == TaskStatus::Running)
.map(|t| t.id)
.collect()
}
}
FailureStrategy::Ask => {
graph.status = GraphStatus::Paused;
Vec::new()
}
_ => {
graph.status = GraphStatus::Failed;
Vec::new()
}
}
}
pub fn reset_for_retry(
graph: &mut TaskGraph,
rev_adj: &[Vec<TaskId>],
) -> Result<(), OrchestrationError> {
use super::graph::GraphStatus;
if graph.status != GraphStatus::Failed && graph.status != GraphStatus::Paused {
return Err(OrchestrationError::InvalidGraph(format!(
"cannot retry graph in status {}; only Failed or Paused graphs can be retried",
graph.status
)));
}
let mut seeds: Vec<TaskId> = Vec::new();
for task in &mut graph.tasks {
if task.status == TaskStatus::Failed {
task.status = TaskStatus::Ready;
task.retry_count = 0;
seeds.push(task.id);
}
}
for task in &mut graph.tasks {
if task.status == TaskStatus::Canceled {
task.status = TaskStatus::Pending;
}
}
if seeds.is_empty() {
graph.status = GraphStatus::Running;
return Ok(());
}
let mut queue: std::collections::VecDeque<TaskId> = seeds.into_iter().collect();
while let Some(current) = queue.pop_front() {
let dependents = rev_adj.get(current.index()).map_or(&[] as &[TaskId], |v| v);
for &dep_id in dependents {
if graph.tasks[dep_id.index()].status == TaskStatus::Skipped {
graph.tasks[dep_id.index()].status = TaskStatus::Pending;
queue.push_back(dep_id);
}
}
}
graph.status = GraphStatus::Running;
Ok(())
}
const KEYWORD_STOPWORDS: &[&str] = &["the", "a", "an", "in", "of", "for", "to", "from", "with"];
#[must_use]
pub fn lookahead_tools(graph: &TaskGraph, depth: u8) -> Vec<PlannedToolHint> {
let _span = tracing::debug_span!("orch.dag.lookahead", depth = depth).entered();
if depth == 0 {
return vec![];
}
let tasks = &graph.tasks;
let n = tasks.len();
let mut forward_adj: Vec<Vec<usize>> = vec![Vec::new(); n];
for task in tasks {
for dep in &task.depends_on {
forward_adj[dep.index()].push(task.id.index());
}
}
let mut visited = vec![false; n];
let mut queue: VecDeque<(usize, u8)> = VecDeque::new();
for task in tasks {
if matches!(task.status, TaskStatus::Running | TaskStatus::Ready) {
visited[task.id.index()] = true;
queue.push_back((task.id.index(), 0));
}
}
if queue.is_empty() {
return vec![];
}
let mut hints: Vec<PlannedToolHint> = Vec::new();
while let Some((idx, dist)) = queue.pop_front() {
for &child_idx in &forward_adj[idx] {
if visited[child_idx] {
continue;
}
visited[child_idx] = true;
let child_dist = dist + 1;
if child_dist <= depth {
let child = &tasks[child_idx];
let tool_name = child.agent_hint.as_deref().unwrap_or(&child.title);
hints.push(PlannedToolHint::new(
tool_name,
extract_keywords(tool_name, &child.description),
child_dist,
));
queue.push_back((child_idx, child_dist));
}
}
}
hints.sort_by_key(|h| h.distance_from_current);
hints
}
fn extract_keywords(tool_name: &str, description: &str) -> Vec<String> {
let end = description.floor_char_boundary(200);
let desc_prefix = &description[..end];
let combined = format!("{tool_name} {desc_prefix}");
let mut seen = std::collections::HashSet::new();
let mut keywords: Vec<String> = Vec::new();
let full = tool_name.to_lowercase();
seen.insert(full.clone());
keywords.push(full);
for token in combined.split(|c: char| !c.is_alphanumeric()) {
if keywords.len() == 10 {
break;
}
if token.len() < 3 {
continue;
}
let lower = token.to_lowercase();
if KEYWORD_STOPWORDS.contains(&lower.as_str()) {
continue;
}
if seen.insert(lower.clone()) {
keywords.push(lower);
}
}
keywords
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::{FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus};
use crate::topology::build_rev_adj;
fn make_node(id: u32, deps: &[u32]) -> TaskNode {
let mut n = TaskNode::new(id, format!("task-{id}"), "desc");
n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
n
}
fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
let mut g = TaskGraph::new("test");
g.tasks = nodes;
g
}
fn make_rev_adj(graph: &TaskGraph) -> Vec<Vec<TaskId>> {
build_rev_adj(&graph.tasks)
}
#[test]
fn test_validate_empty_graph() {
let err = validate(&[], 20).unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_validate_exceeds_max_tasks() {
let tasks: Vec<TaskNode> = (0..5).map(|i| make_node(i, &[])).collect();
let err = validate(&tasks, 3).unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_validate_single_task_no_deps() {
let tasks = vec![make_node(0, &[])];
assert!(validate(&tasks, 20).is_ok());
}
#[test]
fn test_validate_self_reference() {
let mut tasks = vec![make_node(0, &[])];
tasks[0].depends_on = vec![TaskId(0)];
let err = validate(&tasks, 20).unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_validate_invalid_taskid_reference() {
let mut tasks = vec![make_node(0, &[])];
tasks[0].depends_on = vec![TaskId(99)];
let err = validate(&tasks, 20).unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_validate_linear_chain() {
let tasks = vec![make_node(0, &[]), make_node(1, &[0]), make_node(2, &[1])];
assert!(validate(&tasks, 20).is_ok());
}
#[test]
fn test_validate_diamond() {
let tasks = vec![
make_node(0, &[]),
make_node(1, &[0]),
make_node(2, &[0]),
make_node(3, &[1, 2]),
];
assert!(validate(&tasks, 20).is_ok());
}
#[test]
fn test_validate_cycle_two_nodes() {
let tasks = vec![make_node(0, &[1]), make_node(1, &[0])];
let err = validate(&tasks, 20).unwrap_err();
assert!(matches!(err, OrchestrationError::CycleDetected));
}
#[test]
fn test_validate_cycle_three_nodes() {
let tasks = vec![make_node(0, &[2]), make_node(1, &[0]), make_node(2, &[1])];
let err = validate(&tasks, 20).unwrap_err();
assert!(matches!(err, OrchestrationError::CycleDetected));
}
#[test]
fn test_validate_taskid_invariant() {
let mut tasks = vec![make_node(0, &[]), make_node(1, &[0])];
tasks[1].id = TaskId(5);
let err = validate(&tasks, 20).unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_toposort_linear() {
let tasks = vec![make_node(0, &[]), make_node(1, &[0]), make_node(2, &[1])];
let order = toposort(&tasks).expect("should succeed");
assert_eq!(order, vec![TaskId(0), TaskId(1), TaskId(2)]);
}
#[test]
fn test_toposort_diamond() {
let tasks = vec![
make_node(0, &[]),
make_node(1, &[0]),
make_node(2, &[0]),
make_node(3, &[1, 2]),
];
let order = toposort(&tasks).expect("should succeed");
assert_eq!(order[0], TaskId(0));
assert_eq!(order[3], TaskId(3));
}
#[test]
fn test_toposort_wide_parallel() {
let tasks = vec![make_node(0, &[]), make_node(1, &[]), make_node(2, &[])];
let order = toposort(&tasks).expect("should succeed");
assert_eq!(order.len(), 3);
}
#[test]
fn test_toposort_single_node() {
let tasks = vec![make_node(0, &[])];
let order = toposort(&tasks).expect("should succeed");
assert_eq!(order, vec![TaskId(0)]);
}
#[test]
fn test_ready_tasks_initial_roots() {
let mut graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[0, 1]),
]);
graph.tasks[0].status = TaskStatus::Pending;
graph.tasks[1].status = TaskStatus::Pending;
graph.tasks[2].status = TaskStatus::Pending;
let ready = ready_tasks(&graph);
assert!(ready.contains(&TaskId(0)));
assert!(ready.contains(&TaskId(1)));
assert!(!ready.contains(&TaskId(2)));
}
#[test]
fn test_ready_tasks_after_completion() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[1].status = TaskStatus::Pending;
let ready = ready_tasks(&graph);
assert!(ready.contains(&TaskId(1)));
}
#[test]
fn test_ready_tasks_skipped_does_not_unblock() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Skipped;
graph.tasks[1].status = TaskStatus::Pending;
let ready = ready_tasks(&graph);
assert!(!ready.contains(&TaskId(1)));
}
#[test]
fn test_ready_tasks_partial_deps_completed() {
let mut graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[0, 1]),
]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[1].status = TaskStatus::Running;
graph.tasks[2].status = TaskStatus::Pending;
let ready = ready_tasks(&graph);
assert!(!ready.contains(&TaskId(2)));
}
#[test]
fn test_ready_tasks_all_terminal() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[1].status = TaskStatus::Completed;
let ready = ready_tasks(&graph);
assert!(ready.is_empty());
}
#[test]
fn test_ready_tasks_already_ready_included() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Ready; graph.tasks[1].status = TaskStatus::Pending;
let ready = ready_tasks(&graph);
assert!(ready.contains(&TaskId(0)));
}
#[test]
fn test_ready_tasks_predicate_gate_blocks_downstream() {
use crate::verify_predicate::VerifyPredicate;
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[0].verify_predicate = Some(VerifyPredicate::Natural(
"output must be non-empty".to_string(),
));
graph.tasks[0].predicate_outcome = None;
graph.tasks[1].status = TaskStatus::Pending;
let ready = ready_tasks(&graph);
assert!(
!ready.contains(&TaskId(1)),
"task 1 must be blocked by uncleared predicate on task 0"
);
}
#[test]
fn test_ready_tasks_predicate_gate_unblocks_on_pass() {
use crate::verify_predicate::{PredicateOutcome, VerifyPredicate};
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[0].verify_predicate = Some(VerifyPredicate::Natural("criterion".to_string()));
graph.tasks[0].predicate_outcome = Some(PredicateOutcome {
passed: true,
confidence: 0.9,
reason: "ok".to_string(),
});
graph.tasks[1].status = TaskStatus::Pending;
let ready = ready_tasks(&graph);
assert!(
ready.contains(&TaskId(1)),
"task 1 must be unblocked when predicate passed"
);
}
#[test]
fn test_ready_tasks_predicate_gate_remains_closed_on_fail() {
use crate::verify_predicate::{PredicateOutcome, VerifyPredicate};
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[0].verify_predicate = Some(VerifyPredicate::Natural("criterion".to_string()));
graph.tasks[0].predicate_outcome = Some(PredicateOutcome {
passed: false,
confidence: 0.1,
reason: "criterion not met".to_string(),
});
graph.tasks[1].status = TaskStatus::Pending;
let ready = ready_tasks(&graph);
assert!(
!ready.contains(&TaskId(1)),
"task 1 must remain blocked when predicate failed"
);
}
#[test]
fn test_ready_tasks_no_predicate_unblocks_normally() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[1].status = TaskStatus::Pending;
let ready = ready_tasks(&graph);
assert!(
ready.contains(&TaskId(1)),
"no predicate = gate always clear"
);
}
#[test]
fn test_propagate_failure_abort() {
let mut graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[0]),
make_node(2, &[0]),
]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[1].status = TaskStatus::Running;
graph.tasks[2].status = TaskStatus::Pending;
graph.default_failure_strategy = FailureStrategy::Abort;
let __ra = make_rev_adj(&graph);
let to_cancel = propagate_failure(&mut graph, TaskId(0), &__ra);
assert_eq!(graph.status, GraphStatus::Failed);
assert!(to_cancel.contains(&TaskId(1)));
assert!(!to_cancel.contains(&TaskId(2)));
}
#[test]
fn test_propagate_failure_skip_single() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
graph.tasks[1].status = TaskStatus::Pending;
let __ra = make_rev_adj(&graph);
let to_cancel = propagate_failure(&mut graph, TaskId(0), &__ra);
assert!(to_cancel.is_empty());
assert_eq!(graph.tasks[0].status, TaskStatus::Skipped);
assert_eq!(graph.tasks[1].status, TaskStatus::Skipped);
}
#[test]
fn test_propagate_failure_skip_transitive() {
let mut graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[0]),
make_node(2, &[1]),
]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
graph.tasks[1].status = TaskStatus::Pending;
graph.tasks[2].status = TaskStatus::Pending;
let __ra = make_rev_adj(&graph);
propagate_failure(&mut graph, TaskId(0), &__ra);
assert_eq!(graph.tasks[0].status, TaskStatus::Skipped);
assert_eq!(graph.tasks[1].status, TaskStatus::Skipped);
assert_eq!(graph.tasks[2].status, TaskStatus::Skipped);
}
#[test]
fn test_propagate_failure_skip_running_dependent_returned() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
graph.tasks[1].status = TaskStatus::Running;
let __ra = make_rev_adj(&graph);
let to_cancel = propagate_failure(&mut graph, TaskId(0), &__ra);
assert!(
to_cancel.contains(&TaskId(1)),
"Running dependent must be returned for cancellation"
);
assert_eq!(graph.tasks[1].status, TaskStatus::Skipped);
}
#[test]
fn test_propagate_failure_retry_under_max() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
graph.tasks[0].max_retries = Some(3);
graph.tasks[0].retry_count = 1;
let __ra = make_rev_adj(&graph);
let to_cancel = propagate_failure(&mut graph, TaskId(0), &__ra);
assert!(to_cancel.is_empty());
assert_eq!(graph.tasks[0].status, TaskStatus::Ready);
assert_eq!(graph.tasks[0].retry_count, 2);
}
#[test]
fn test_propagate_failure_retry_exhausted() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
graph.tasks[0].max_retries = Some(3);
graph.tasks[0].retry_count = 3;
let __ra = make_rev_adj(&graph);
propagate_failure(&mut graph, TaskId(0), &__ra);
assert_eq!(graph.status, GraphStatus::Failed);
}
#[test]
fn test_propagate_failure_ask() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[0].failure_strategy = Some(FailureStrategy::Ask);
let __ra = make_rev_adj(&graph);
let to_cancel = propagate_failure(&mut graph, TaskId(0), &__ra);
assert!(to_cancel.is_empty());
assert_eq!(graph.status, GraphStatus::Paused);
}
#[test]
fn test_propagate_failure_per_task_override() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.default_failure_strategy = FailureStrategy::Abort;
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
graph.tasks[1].status = TaskStatus::Pending;
let __ra = make_rev_adj(&graph);
propagate_failure(&mut graph, TaskId(0), &__ra);
assert_eq!(graph.tasks[0].status, TaskStatus::Skipped);
assert_ne!(graph.status, GraphStatus::Failed);
}
#[test]
fn test_propagate_failure_already_terminal() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Completed;
let __ra = make_rev_adj(&graph);
let to_cancel = propagate_failure(&mut graph, TaskId(0), &__ra);
assert!(to_cancel.is_empty());
assert_eq!(graph.status, GraphStatus::Created);
}
#[test]
fn test_reset_for_retry_resets_failed_to_ready() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.status = GraphStatus::Failed;
let __ra = make_rev_adj(&graph);
reset_for_retry(&mut graph, &__ra).unwrap();
assert_eq!(graph.tasks[0].status, TaskStatus::Ready);
assert_eq!(graph.status, GraphStatus::Running);
}
#[test]
fn test_reset_for_retry_resets_skipped_dependents_to_pending() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[1].status = TaskStatus::Skipped;
graph.status = GraphStatus::Failed;
let __ra = make_rev_adj(&graph);
reset_for_retry(&mut graph, &__ra).unwrap();
assert_eq!(graph.tasks[0].status, TaskStatus::Ready);
assert_eq!(graph.tasks[1].status, TaskStatus::Pending);
}
#[test]
fn test_reset_for_retry_transitive_skipped_reset() {
let mut graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[0]),
make_node(2, &[1]),
]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[1].status = TaskStatus::Skipped;
graph.tasks[2].status = TaskStatus::Skipped;
graph.status = GraphStatus::Failed;
let __ra = make_rev_adj(&graph);
reset_for_retry(&mut graph, &__ra).unwrap();
assert_eq!(graph.tasks[0].status, TaskStatus::Ready);
assert_eq!(graph.tasks[1].status, TaskStatus::Pending);
assert_eq!(graph.tasks[2].status, TaskStatus::Pending);
}
#[test]
fn test_reset_for_retry_completed_tasks_unchanged() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[1].status = TaskStatus::Failed;
graph.status = GraphStatus::Failed;
let __ra = make_rev_adj(&graph);
reset_for_retry(&mut graph, &__ra).unwrap();
assert_eq!(graph.tasks[0].status, TaskStatus::Completed);
assert_eq!(graph.tasks[1].status, TaskStatus::Ready);
}
#[test]
fn test_reset_for_retry_rejects_running_graph() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Running;
graph.status = GraphStatus::Running;
let __ra = make_rev_adj(&graph);
let err = reset_for_retry(&mut graph, &__ra).unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_reset_for_retry_paused_graph_ok() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[1].status = TaskStatus::Skipped;
graph.status = GraphStatus::Paused;
let __ra = make_rev_adj(&graph);
reset_for_retry(&mut graph, &__ra).unwrap();
assert_eq!(graph.status, GraphStatus::Running);
}
#[test]
fn test_reset_for_retry_clears_retry_count() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[0].retry_count = 5;
graph.status = GraphStatus::Failed;
let __ra = make_rev_adj(&graph);
reset_for_retry(&mut graph, &__ra).unwrap();
assert_eq!(graph.tasks[0].retry_count, 0);
}
#[test]
fn test_reset_for_retry_paused_no_failed_tasks() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.status = GraphStatus::Paused;
let __ra = make_rev_adj(&graph);
reset_for_retry(&mut graph, &__ra).unwrap();
assert_eq!(graph.status, GraphStatus::Running);
assert_eq!(graph.tasks[0].status, TaskStatus::Completed);
}
#[test]
fn test_reset_for_retry_canceled_tasks_reset_to_pending() {
let mut graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[0, 1]),
]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[1].status = TaskStatus::Canceled; graph.tasks[2].status = TaskStatus::Pending;
graph.status = GraphStatus::Failed;
let __ra = make_rev_adj(&graph);
reset_for_retry(&mut graph, &__ra).unwrap();
assert_eq!(graph.tasks[0].status, TaskStatus::Ready);
assert_eq!(
graph.tasks[1].status,
TaskStatus::Pending,
"Canceled task must be reset to Pending (IC2)"
);
assert_eq!(graph.tasks[2].status, TaskStatus::Pending);
}
#[test]
fn test_reset_for_retry_canceled_unblocks_dependents() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Failed;
graph.tasks[1].status = TaskStatus::Canceled;
graph.status = GraphStatus::Failed;
let __ra = make_rev_adj(&graph);
reset_for_retry(&mut graph, &__ra).unwrap();
assert_eq!(graph.tasks[0].status, TaskStatus::Ready);
assert_eq!(graph.tasks[1].status, TaskStatus::Pending);
}
fn make_node_titled(id: u32, deps: &[u32], title: &str, desc: &str) -> TaskNode {
let mut n = TaskNode::new(id, title, desc);
n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
n
}
#[test]
fn lookahead_depth_zero_returns_empty() {
let mut graph = graph_from_nodes(vec![
make_node_titled(0, &[], "web_search", "Search the web for results"),
make_node_titled(1, &[0], "summarize", "Summarize findings"),
]);
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[1].status = TaskStatus::Pending;
let hints = lookahead_tools(&graph, 0);
assert!(hints.is_empty(), "depth=0 must return empty vec");
}
#[test]
fn lookahead_depth_one_emits_only_direct_child() {
let mut graph = graph_from_nodes(vec![
make_node_titled(0, &[], "task-a", "Root task"),
make_node_titled(1, &[0], "web_search", "Search the web"),
make_node_titled(2, &[1], "summarize", "Summarize search results"),
]);
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[1].status = TaskStatus::Pending;
graph.tasks[2].status = TaskStatus::Pending;
let hints = lookahead_tools(&graph, 1);
assert_eq!(hints.len(), 1, "depth=1 should emit only B");
assert_eq!(hints[0].tool_name, "web_search");
assert_eq!(hints[0].distance_from_current, 1);
}
#[test]
fn lookahead_depth_two_emits_both_children() {
let mut graph = graph_from_nodes(vec![
make_node_titled(0, &[], "task-a", "Root task"),
make_node_titled(1, &[0], "web_search", "Search the web"),
make_node_titled(2, &[1], "summarize", "Summarize search results"),
]);
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[1].status = TaskStatus::Pending;
graph.tasks[2].status = TaskStatus::Pending;
let hints = lookahead_tools(&graph, 2);
assert_eq!(hints.len(), 2, "depth=2 should emit B and C");
assert_eq!(hints[0].tool_name, "web_search");
assert_eq!(hints[0].distance_from_current, 1);
assert_eq!(hints[1].tool_name, "summarize");
assert_eq!(hints[1].distance_from_current, 2);
}
#[test]
fn lookahead_no_frontier_returns_empty() {
let mut graph = graph_from_nodes(vec![
make_node_titled(0, &[], "task-a", "Root"),
make_node_titled(1, &[0], "task-b", "Child"),
]);
graph.tasks[0].status = TaskStatus::Pending;
graph.tasks[1].status = TaskStatus::Pending;
let hints = lookahead_tools(&graph, 2);
assert!(hints.is_empty(), "no frontier → empty");
}
#[test]
fn lookahead_frontier_not_emitted() {
let mut graph = graph_from_nodes(vec![
make_node_titled(0, &[], "running-tool", "Currently executing"),
make_node_titled(1, &[0], "next-tool", "Next step"),
]);
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[1].status = TaskStatus::Pending;
let hints = lookahead_tools(&graph, 3);
assert!(
hints.iter().all(|h| h.tool_name != "running-tool"),
"frontier task must not be emitted"
);
assert_eq!(hints.len(), 1);
}
#[test]
fn lookahead_uses_agent_hint_as_tool_name() {
let mut graph = graph_from_nodes(vec![
make_node_titled(0, &[], "dispatch", "Root"),
make_node_titled(1, &[0], "raw-title", "Execute shell command"),
]);
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[1].status = TaskStatus::Pending;
graph.tasks[1].agent_hint = Some("shell_executor".to_string());
let hints = lookahead_tools(&graph, 1);
assert_eq!(hints.len(), 1);
assert_eq!(
hints[0].tool_name, "shell_executor",
"agent_hint should take precedence over title"
);
}
#[test]
fn lookahead_results_sorted_by_distance() {
let mut graph = graph_from_nodes(vec![
make_node_titled(0, &[], "root", "Root"),
make_node_titled(1, &[0], "step-one", "Step one"),
make_node_titled(2, &[1], "step-two", "Step two"),
]);
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[1].status = TaskStatus::Pending;
graph.tasks[2].status = TaskStatus::Pending;
let hints = lookahead_tools(&graph, 2);
for w in hints.windows(2) {
assert!(
w[0].distance_from_current <= w[1].distance_from_current,
"hints must be sorted by distance"
);
}
}
#[test]
fn lookahead_keywords_extracted_and_deduped() {
let mut graph = graph_from_nodes(vec![
make_node_titled(0, &[], "root", "Root task"),
make_node_titled(1, &[0], "search", "search search search results web"),
]);
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[1].status = TaskStatus::Pending;
let hints = lookahead_tools(&graph, 1);
assert_eq!(hints.len(), 1);
let count = hints[0]
.keywords
.iter()
.filter(|k| k.as_str() == "search")
.count();
assert_eq!(count, 1, "duplicate keywords must be deduplicated");
}
#[test]
fn lookahead_stopwords_filtered() {
let mut graph = graph_from_nodes(vec![
make_node_titled(0, &[], "root", "Root"),
make_node_titled(
1,
&[0],
"task",
"the result of the operation from the source",
),
]);
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[1].status = TaskStatus::Pending;
let hints = lookahead_tools(&graph, 1);
assert_eq!(hints.len(), 1);
for kw in &hints[0].keywords {
assert!(
!KEYWORD_STOPWORDS.contains(&kw.as_str()),
"stopword '{kw}' must not appear in keywords"
);
}
}
}