mod orchestration_integration {
use zeph_core::config::{OrchestrationConfig, ProviderName};
use zeph_llm::mock::MockProvider;
use zeph_orchestration::{
AgentRouter, Aggregator, DagScheduler, FailureStrategy, GraphStatus, LlmAggregator,
SchedulerAction, TaskEvent, TaskGraph, TaskId, TaskNode, TaskOutcome, TaskStatus,
};
use zeph_subagent::{SkillFilter, SubAgentDef, SubAgentPermissions, SubagentHooks, ToolPolicy};
fn make_agent(name: &str) -> SubAgentDef {
SubAgentDef {
name: name.to_string(),
description: format!("{name} agent"),
model: None,
tools: ToolPolicy::InheritAll,
disallowed_tools: vec![],
permissions: SubAgentPermissions::default(),
skills: SkillFilter::default(),
system_prompt: String::new(),
hooks: SubagentHooks::default(),
memory: None,
source: None,
file_path: None,
}
}
fn default_config() -> OrchestrationConfig {
OrchestrationConfig {
enabled: true,
max_tasks: 20,
max_parallel: 4,
task_timeout_secs: 0,
default_failure_strategy: FailureStrategy::Abort,
default_max_retries: 3,
planner_provider: ProviderName::default(),
planner_max_tokens: 4096,
dependency_context_budget: 16384,
confirm_before_execute: false,
aggregator_max_tokens: 1024,
deferral_backoff_ms: 250,
..OrchestrationConfig::default()
}
}
struct FirstRouter;
impl AgentRouter for FirstRouter {
fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
available.first().map(|d| d.name.clone())
}
}
fn linear_2_task_graph(failure_strategy: Option<FailureStrategy>) -> TaskGraph {
let mut graph = TaskGraph::new("test goal");
let mut t0 = TaskNode::new(0, "task-0", "do first thing");
let mut t1 = TaskNode::new(1, "task-1", "do second thing");
t1.depends_on = serde_json::from_str("[0]").expect("valid TaskId JSON");
if let Some(s) = failure_strategy {
t0.failure_strategy = Some(s);
t1.failure_strategy = Some(s);
}
graph.tasks = vec![t0, t1];
graph
}
async fn drive_scheduler<F>(scheduler: &mut DagScheduler, mut outcome_fn: F) -> GraphStatus
where
F: FnMut(TaskId) -> TaskOutcome,
{
let tx = scheduler.event_sender();
loop {
let actions = scheduler.tick();
let mut done_status = None;
let mut spawned = Vec::new();
for action in actions {
match action {
SchedulerAction::Spawn {
task_id,
agent_def_name,
..
} => {
let handle_id = format!("handle-{}", task_id.index());
scheduler.record_spawn(task_id, handle_id.clone(), agent_def_name);
spawned.push((task_id, handle_id));
}
SchedulerAction::Done { status } => {
done_status = Some(status);
}
SchedulerAction::Cancel { .. }
| SchedulerAction::RunInline { .. }
| SchedulerAction::Verify { .. }
| SchedulerAction::VerifyPredicate { .. } => {}
}
}
for (task_id, handle_id) in spawned {
let outcome = outcome_fn(task_id);
tx.send(TaskEvent {
task_id,
agent_handle_id: handle_id,
outcome,
})
.await
.expect("event channel open");
}
if let Some(status) = done_status {
return status;
}
scheduler.wait_event().await;
}
}
#[tokio::test]
async fn test_plan_execute_aggregate_happy_path() {
let graph = linear_2_task_graph(None);
let config = default_config();
let agents = vec![make_agent("worker")];
let mut scheduler =
DagScheduler::new(graph, &config, Box::new(FirstRouter), agents, None).unwrap();
let final_status = drive_scheduler(&mut scheduler, |task_id| TaskOutcome::Completed {
output: format!("output from task {}", task_id.index()),
artifacts: vec![],
})
.await;
assert_eq!(final_status, GraphStatus::Completed);
let graph = scheduler.into_graph();
assert_eq!(graph.status, GraphStatus::Completed);
assert_eq!(graph.tasks[0].status, TaskStatus::Completed);
assert_eq!(graph.tasks[1].status, TaskStatus::Completed);
let provider = MockProvider::with_responses(vec!["synthesis result".to_string()]);
let aggregator = LlmAggregator::new(provider, &config);
let (result, _usage) = aggregator.aggregate(&graph).await.unwrap();
assert!(
!result.is_empty(),
"aggregator must produce non-empty output"
);
assert_eq!(result, "synthesis result");
}
#[tokio::test]
async fn test_single_task_graph() {
let mut graph = TaskGraph::new("single task goal");
graph.tasks = vec![TaskNode::new(0, "solo-task", "do the thing")];
let config = default_config();
let agents = vec![make_agent("worker")];
let mut scheduler =
DagScheduler::new(graph, &config, Box::new(FirstRouter), agents, None).unwrap();
let final_status = drive_scheduler(&mut scheduler, |_| TaskOutcome::Completed {
output: "done".to_string(),
artifacts: vec![],
})
.await;
assert_eq!(final_status, GraphStatus::Completed);
let graph = scheduler.into_graph();
assert_eq!(graph.tasks.len(), 1);
assert_eq!(graph.tasks[0].status, TaskStatus::Completed);
}
#[tokio::test]
async fn test_failure_abort_strategy() {
let graph = linear_2_task_graph(Some(FailureStrategy::Abort));
let config = default_config();
let agents = vec![make_agent("worker")];
let mut scheduler =
DagScheduler::new(graph, &config, Box::new(FirstRouter), agents, None).unwrap();
let final_status = drive_scheduler(&mut scheduler, |task_id| {
if task_id.index() == 0 {
TaskOutcome::Failed {
error: "task 0 failed".to_string(),
}
} else {
TaskOutcome::Completed {
output: "unreachable".to_string(),
artifacts: vec![],
}
}
})
.await;
assert_eq!(final_status, GraphStatus::Failed);
let graph = scheduler.into_graph();
assert_eq!(graph.status, GraphStatus::Failed);
assert_eq!(graph.tasks[0].status, TaskStatus::Failed);
assert_ne!(
graph.tasks[1].status,
TaskStatus::Running,
"task 1 must not be running after abort"
);
assert_ne!(
graph.tasks[1].status,
TaskStatus::Completed,
"task 1 must not complete after abort"
);
}
#[tokio::test]
async fn test_failure_skip_strategy() {
let graph = linear_2_task_graph(Some(FailureStrategy::Skip));
let config = default_config();
let agents = vec![make_agent("worker")];
let mut scheduler =
DagScheduler::new(graph, &config, Box::new(FirstRouter), agents, None).unwrap();
let final_status = drive_scheduler(&mut scheduler, |task_id| {
if task_id.index() == 0 {
TaskOutcome::Failed {
error: "task 0 failed with skip".to_string(),
}
} else {
TaskOutcome::Completed {
output: "unreachable".to_string(),
artifacts: vec![],
}
}
})
.await;
assert_eq!(
final_status,
GraphStatus::Completed,
"Skip strategy: all tasks Skipped → graph reaches Completed, not Failed"
);
let graph = scheduler.into_graph();
assert_eq!(graph.status, GraphStatus::Completed);
assert_eq!(
graph.tasks[0].status,
TaskStatus::Skipped,
"task 0 (failed with Skip) must be Skipped"
);
assert_eq!(
graph.tasks[1].status,
TaskStatus::Skipped,
"task 1 (dependent of skipped task 0) must be Skipped"
);
}
#[tokio::test]
async fn test_retry_exhausted() {
let mut graph = TaskGraph::new("retry test goal");
let mut task = TaskNode::new(0, "retryable-task", "this task will fail");
task.failure_strategy = Some(FailureStrategy::Retry);
task.max_retries = Some(1); graph.tasks = vec![task];
let config = default_config();
let agents = vec![make_agent("worker")];
let mut scheduler =
DagScheduler::new(graph, &config, Box::new(FirstRouter), agents, None).unwrap();
let tx = scheduler.event_sender();
let mut attempt = 0u32;
let final_status = loop {
let actions = scheduler.tick();
let mut done = None;
let mut spawned = Vec::new();
for action in actions {
match action {
SchedulerAction::Spawn {
task_id,
agent_def_name,
..
} => {
let handle_id = format!("handle-{attempt}");
attempt += 1;
scheduler.record_spawn(task_id, handle_id.clone(), agent_def_name);
spawned.push((task_id, handle_id));
}
SchedulerAction::Done { status } => {
done = Some(status);
}
SchedulerAction::Cancel { .. }
| SchedulerAction::RunInline { .. }
| SchedulerAction::Verify { .. }
| SchedulerAction::VerifyPredicate { .. } => {}
}
}
for (task_id, handle_id) in spawned {
tx.send(TaskEvent {
task_id,
agent_handle_id: handle_id,
outcome: TaskOutcome::Failed {
error: format!("attempt {attempt} failed"),
},
})
.await
.expect("channel open");
}
if let Some(status) = done {
break status;
}
scheduler.wait_event().await;
};
assert_eq!(
final_status,
GraphStatus::Failed,
"graph must be Failed after retries exhausted"
);
assert_eq!(scheduler.into_graph().status, GraphStatus::Failed);
assert!(
attempt >= 2,
"task must be attempted at least twice (initial + 1 retry), got {attempt}"
);
}
}