use std::collections::{HashMap, VecDeque};
use std::fmt::Write as _;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use super::cascade::{CascadeConfig, CascadeDetector};
use super::dag;
use super::error::OrchestrationError;
use super::graph::{
ExecutionMode, GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus,
};
use super::router::AgentRouter;
use super::topology::{DispatchStrategy, Topology, TopologyAnalysis, TopologyClassifier};
use super::verifier::inject_tasks as verifier_inject_tasks;
use zeph_config::OrchestrationConfig;
use zeph_sanitizer::{ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind};
use zeph_subagent::{SubAgentDef, SubAgentError};
#[derive(Debug)]
pub enum SchedulerAction {
Spawn {
task_id: TaskId,
agent_def_name: String,
prompt: String,
},
Cancel { agent_handle_id: String },
RunInline { task_id: TaskId, prompt: String },
Done { status: GraphStatus },
Verify { task_id: TaskId, output: String },
}
#[derive(Debug)]
pub struct TaskEvent {
pub task_id: TaskId,
pub agent_handle_id: String,
pub outcome: TaskOutcome,
}
#[derive(Debug)]
pub enum TaskOutcome {
Completed {
output: String,
artifacts: Vec<PathBuf>,
},
Failed { error: String },
}
struct RunningTask {
agent_handle_id: String,
agent_def_name: String,
started_at: Instant,
}
#[allow(clippy::struct_excessive_bools)]
pub struct DagScheduler {
graph: TaskGraph,
max_parallel: usize,
config_max_parallel: usize,
running: HashMap<TaskId, RunningTask>,
event_rx: mpsc::Receiver<TaskEvent>,
event_tx: mpsc::Sender<TaskEvent>,
task_timeout: Duration,
router: Box<dyn AgentRouter>,
available_agents: Vec<SubAgentDef>,
dependency_context_budget: usize,
buffered_events: VecDeque<TaskEvent>,
sanitizer: ContentSanitizer,
deferral_backoff: Duration,
consecutive_spawn_failures: u32,
topology: TopologyAnalysis,
topology_dirty: bool,
current_level: usize,
verify_completeness: bool,
verify_provider: String,
task_replan_counts: HashMap<TaskId, u32>,
global_replan_count: u32,
max_replans: u32,
completeness_threshold_value: f32,
cascade_detector: Option<CascadeDetector>,
tree_optimized_dispatch: bool,
cascade_routing: bool,
}
impl std::fmt::Debug for DagScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DagScheduler")
.field("graph_id", &self.graph.id)
.field("graph_status", &self.graph.status)
.field("running_count", &self.running.len())
.field("max_parallel", &self.max_parallel)
.field("task_timeout_secs", &self.task_timeout.as_secs())
.field("topology", &self.topology.topology)
.field("strategy", &self.topology.strategy)
.field("current_level", &self.current_level)
.field("global_replan_count", &self.global_replan_count)
.field("cascade_routing", &self.cascade_routing)
.field("tree_optimized_dispatch", &self.tree_optimized_dispatch)
.finish_non_exhaustive()
}
}
impl DagScheduler {
pub fn new(
mut graph: TaskGraph,
config: &OrchestrationConfig,
router: Box<dyn AgentRouter>,
available_agents: Vec<SubAgentDef>,
) -> Result<Self, OrchestrationError> {
if graph.status != GraphStatus::Created {
return Err(OrchestrationError::InvalidGraph(format!(
"graph must be in Created status, got {}",
graph.status
)));
}
dag::validate(&graph.tasks, config.max_tasks as usize)?;
graph.status = GraphStatus::Running;
for task in &mut graph.tasks {
if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
task.status = TaskStatus::Ready;
}
}
let (event_tx, event_rx) = mpsc::channel(64);
let task_timeout = if config.task_timeout_secs > 0 {
Duration::from_secs(config.task_timeout_secs)
} else {
Duration::from_secs(600)
};
let topology = TopologyClassifier::analyze(&graph, config);
let max_parallel = topology.max_parallel;
let config_max_parallel = config.max_parallel as usize;
if config.topology_selection {
tracing::debug!(
topology = ?topology.topology,
strategy = ?topology.strategy,
max_parallel,
"topology-aware concurrency limit applied"
);
}
if config.cascade_routing && !config.topology_selection {
tracing::warn!(
"cascade_routing = true requires topology_selection = true; \
cascade routing is disabled (topology_selection is off)"
);
}
let cascade_detector = if config.cascade_routing && config.topology_selection {
Some(CascadeDetector::new(CascadeConfig {
failure_threshold: config.cascade_failure_threshold,
}))
} else {
None
};
Ok(Self {
graph,
max_parallel,
config_max_parallel,
running: HashMap::new(),
event_rx,
event_tx,
task_timeout,
router,
available_agents,
dependency_context_budget: config.dependency_context_budget,
buffered_events: VecDeque::new(),
sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
consecutive_spawn_failures: 0,
topology,
topology_dirty: false,
current_level: 0,
verify_completeness: config.verify_completeness,
verify_provider: config.verify_provider.as_str().trim().to_owned(),
task_replan_counts: HashMap::new(),
global_replan_count: 0,
max_replans: config.max_replans,
completeness_threshold_value: config.completeness_threshold,
cascade_detector,
tree_optimized_dispatch: config.tree_optimized_dispatch,
cascade_routing: config.cascade_routing && config.topology_selection,
})
}
pub fn resume_from(
mut graph: TaskGraph,
config: &OrchestrationConfig,
router: Box<dyn AgentRouter>,
available_agents: Vec<SubAgentDef>,
) -> Result<Self, OrchestrationError> {
if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
return Err(OrchestrationError::InvalidGraph(format!(
"cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
graph.status
)));
}
graph.status = GraphStatus::Running;
let running: HashMap<TaskId, RunningTask> = graph
.tasks
.iter()
.filter(|t| t.status == TaskStatus::Running)
.filter_map(|t| {
let handle_id = t.assigned_agent.clone()?;
let def_name = t.agent_hint.clone().unwrap_or_default();
Some((
t.id,
RunningTask {
agent_handle_id: handle_id,
agent_def_name: def_name,
started_at: Instant::now(),
},
))
})
.collect();
let (event_tx, event_rx) = mpsc::channel(64);
let task_timeout = if config.task_timeout_secs > 0 {
Duration::from_secs(config.task_timeout_secs)
} else {
Duration::from_secs(600)
};
let topology = TopologyClassifier::analyze(&graph, config);
let max_parallel = topology.max_parallel;
let config_max_parallel = config.max_parallel as usize;
let cascade_detector = if config.cascade_routing && config.topology_selection {
Some(CascadeDetector::new(CascadeConfig {
failure_threshold: config.cascade_failure_threshold,
}))
} else {
None
};
Ok(Self {
graph,
max_parallel,
config_max_parallel,
running,
event_rx,
event_tx,
task_timeout,
router,
available_agents,
dependency_context_budget: config.dependency_context_budget,
buffered_events: VecDeque::new(),
sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
consecutive_spawn_failures: 0,
topology,
topology_dirty: false,
current_level: 0,
verify_completeness: config.verify_completeness,
verify_provider: config.verify_provider.as_str().trim().to_owned(),
task_replan_counts: HashMap::new(),
global_replan_count: 0,
max_replans: config.max_replans,
completeness_threshold_value: config.completeness_threshold,
cascade_detector,
tree_optimized_dispatch: config.tree_optimized_dispatch,
cascade_routing: config.cascade_routing && config.topology_selection,
})
}
pub fn validate_verify_config(
&self,
provider_names: &[&str],
) -> Result<(), OrchestrationError> {
if !self.verify_completeness {
return Ok(());
}
let name = self.verify_provider.as_str();
if name.is_empty() || provider_names.is_empty() {
return Ok(());
}
if !provider_names.contains(&name) {
return Err(OrchestrationError::InvalidConfig(format!(
"verify_provider \"{}\" not found in [[llm.providers]]; available: [{}]",
name,
provider_names.join(", ")
)));
}
Ok(())
}
#[must_use]
pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
self.event_tx.clone()
}
#[must_use]
pub fn graph(&self) -> &TaskGraph {
&self.graph
}
#[must_use]
pub fn into_graph(&self) -> TaskGraph {
self.graph.clone()
}
#[must_use]
pub fn topology(&self) -> &TopologyAnalysis {
&self.topology
}
#[must_use]
pub fn completeness_threshold(&self) -> f32 {
self.completeness_threshold_value
}
#[must_use]
pub fn verify_provider_name(&self) -> &str {
&self.verify_provider
}
#[must_use]
pub fn max_replans_remaining(&self) -> u32 {
self.max_replans.saturating_sub(self.global_replan_count)
}
pub fn record_whole_plan_replan(&mut self) {
self.global_replan_count = self.global_replan_count.saturating_add(1);
}
pub fn inject_tasks(
&mut self,
verified_task_id: TaskId,
new_tasks: Vec<TaskNode>,
max_tasks: usize,
) -> Result<(), OrchestrationError> {
if new_tasks.is_empty() {
return Ok(());
}
let task_replan_count = self.task_replan_counts.entry(verified_task_id).or_insert(0);
if *task_replan_count >= 1 {
tracing::warn!(
task_id = %verified_task_id,
"per-task replan limit (1) reached, skipping replan injection"
);
return Ok(());
}
if self.global_replan_count >= self.max_replans {
tracing::warn!(
global_replan_count = self.global_replan_count,
max_replans = self.max_replans,
"global replan limit reached, skipping replan injection"
);
return Ok(());
}
verifier_inject_tasks(&mut self.graph, new_tasks, max_tasks)?;
*task_replan_count += 1;
self.global_replan_count += 1;
self.topology_dirty = true;
if let Some(ref mut det) = self.cascade_detector {
det.reset();
}
Ok(())
}
}
impl Drop for DagScheduler {
fn drop(&mut self) {
if !self.running.is_empty() {
tracing::warn!(
running_tasks = self.running.len(),
"DagScheduler dropped with running tasks; agents may continue until their \
CancellationToken fires or they complete naturally"
);
}
}
}
impl DagScheduler {
#[allow(clippy::too_many_lines)]
pub fn tick(&mut self) -> Vec<SchedulerAction> {
if self.graph.status != GraphStatus::Running {
return vec![SchedulerAction::Done {
status: self.graph.status,
}];
}
self.reanalyze_topology_if_dirty();
let mut actions = Vec::new();
while let Some(event) = self.buffered_events.pop_front() {
let cancel_actions = self.process_event(event);
actions.extend(cancel_actions);
}
while let Ok(event) = self.event_rx.try_recv() {
let cancel_actions = self.process_event(event);
actions.extend(cancel_actions);
}
if self.graph.status != GraphStatus::Running {
return actions;
}
let timeout_actions = self.check_timeouts();
actions.extend(timeout_actions);
if self.graph.status != GraphStatus::Running {
return actions;
}
let raw_ready = dag::ready_tasks(&self.graph);
let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::CascadeAware {
if let Some(ref detector) = self.cascade_detector {
let deprioritized = detector.deprioritized_tasks(&self.graph);
if deprioritized.is_empty() {
raw_ready
} else {
let (preferred, deferred): (Vec<_>, Vec<_>) =
raw_ready.into_iter().partition(|id| {
let is_sequential = self.graph.tasks[id.index()].execution_mode
== ExecutionMode::Sequential;
is_sequential || !deprioritized.contains(id)
});
preferred.into_iter().chain(deferred).collect()
}
} else {
raw_ready
}
} else {
raw_ready
};
let ready: Vec<TaskId> = if self.topology.strategy == DispatchStrategy::TreeOptimized {
let max_depth = self.topology.depth;
let mut sortable = ready;
sortable.sort_by_key(|id| {
let task_depth = self.topology.depths.get(id).copied().unwrap_or(0);
max_depth.saturating_sub(task_depth)
});
sortable
} else {
ready
};
self.advance_level_barrier_if_needed();
let mut slots = self.max_parallel.saturating_sub(self.running.len());
let mut sequential_spawned_this_tick = false;
let has_running_sequential = self
.running
.keys()
.any(|tid| self.graph.tasks[tid.index()].execution_mode == ExecutionMode::Sequential);
for task_id in ready {
if slots == 0 {
break;
}
if self.topology.strategy == DispatchStrategy::LevelBarrier {
let task_depth = self
.topology
.depths
.get(&task_id)
.copied()
.unwrap_or(usize::MAX);
if task_depth != self.current_level {
continue;
}
}
let task = &self.graph.tasks[task_id.index()];
if task.execution_mode == ExecutionMode::Sequential {
if sequential_spawned_this_tick || has_running_sequential {
continue;
}
sequential_spawned_this_tick = true;
}
let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
tracing::debug!(
task_id = %task_id,
title = %task.title,
"no agent available, routing task to main agent inline"
);
let prompt = self.build_task_prompt(task);
self.graph.tasks[task_id.index()].status = TaskStatus::Running;
actions.push(SchedulerAction::RunInline { task_id, prompt });
slots -= 1;
continue;
};
let prompt = self.build_task_prompt(task);
self.graph.tasks[task_id.index()].status = TaskStatus::Running;
actions.push(SchedulerAction::Spawn {
task_id,
agent_def_name,
prompt,
});
slots -= 1;
}
actions.extend(self.check_graph_completion());
actions
}
fn reanalyze_topology_if_dirty(&mut self) {
if !self.topology_dirty {
return;
}
let new_analysis = {
let n = self.graph.tasks.len();
if n == 0 {
TopologyAnalysis {
topology: Topology::AllParallel,
strategy: DispatchStrategy::FullParallel,
max_parallel: self.config_max_parallel,
depth: 0,
depths: std::collections::HashMap::new(),
}
} else {
let (depth, depths) = super::topology::compute_depths_for_scheduler(&self.graph);
let topo = TopologyClassifier::classify_with_depths(&self.graph, depth, &depths);
let strategy_config = zeph_config::OrchestrationConfig {
cascade_routing: self.cascade_routing,
tree_optimized_dispatch: self.tree_optimized_dispatch,
..zeph_config::OrchestrationConfig::default()
};
let strategy = TopologyClassifier::strategy(topo, &strategy_config);
let max_parallel =
TopologyClassifier::compute_max_parallel(topo, self.config_max_parallel);
TopologyAnalysis {
topology: topo,
strategy,
max_parallel,
depth,
depths,
}
}
};
self.topology = new_analysis;
self.max_parallel = self.topology.max_parallel;
self.topology_dirty = false;
if self.topology.strategy == DispatchStrategy::LevelBarrier {
let min_active = self
.graph
.tasks
.iter()
.filter(|t| !t.status.is_terminal())
.filter_map(|t| self.topology.depths.get(&t.id).copied())
.min();
if let Some(min_depth) = min_active {
self.current_level = self.current_level.min(min_depth);
}
}
}
fn advance_level_barrier_if_needed(&mut self) {
if self.topology.strategy != DispatchStrategy::LevelBarrier {
return;
}
let all_current_level_terminal = self.graph.tasks.iter().all(|t| {
let task_depth = self
.topology
.depths
.get(&t.id)
.copied()
.unwrap_or(usize::MAX);
task_depth != self.current_level || t.status.is_terminal()
});
if all_current_level_terminal {
let max_depth = self.topology.depth;
while self.current_level <= max_depth {
let has_non_terminal = self.graph.tasks.iter().any(|t| {
let d = self
.topology
.depths
.get(&t.id)
.copied()
.unwrap_or(usize::MAX);
d == self.current_level && !t.status.is_terminal()
});
if has_non_terminal {
break;
}
self.current_level += 1;
}
}
}
fn check_graph_completion(&mut self) -> Vec<SchedulerAction> {
let running_in_graph_now = self
.graph
.tasks
.iter()
.filter(|t| t.status == TaskStatus::Running)
.count();
if running_in_graph_now != 0 || !self.running.is_empty() {
return vec![];
}
let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
if all_terminal {
self.graph.status = GraphStatus::Completed;
self.graph.finished_at = Some(super::graph::chrono_now());
return vec![SchedulerAction::Done {
status: GraphStatus::Completed,
}];
}
if dag::ready_tasks(&self.graph).is_empty() {
tracing::error!(
"scheduler deadlock: no running or ready tasks, but graph not complete"
);
self.graph.status = GraphStatus::Failed;
self.graph.finished_at = Some(super::graph::chrono_now());
debug_assert!(
self.running.is_empty(),
"deadlock branch reached with non-empty running map"
);
for task in &mut self.graph.tasks {
if !task.status.is_terminal() {
task.status = TaskStatus::Canceled;
}
}
return vec![SchedulerAction::Done {
status: GraphStatus::Failed,
}];
}
vec![]
}
fn current_deferral_backoff(&self) -> Duration {
const MAX_BACKOFF: Duration = Duration::from_secs(5);
let multiplier = 1u32
.checked_shl(self.consecutive_spawn_failures.min(10))
.unwrap_or(u32::MAX);
self.deferral_backoff
.saturating_mul(multiplier)
.min(MAX_BACKOFF)
}
pub async fn wait_event(&mut self) {
if self.running.is_empty() {
tokio::time::sleep(self.current_deferral_backoff()).await;
return;
}
let nearest_timeout = self
.running
.values()
.map(|r| {
self.task_timeout
.checked_sub(r.started_at.elapsed())
.unwrap_or(Duration::ZERO)
})
.min()
.unwrap_or(Duration::from_secs(1));
let wait_duration = nearest_timeout.max(Duration::from_millis(100));
tokio::select! {
Some(event) = self.event_rx.recv() => {
if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
if let Some(dropped) = self.buffered_events.pop_front() {
tracing::error!(
task_id = %dropped.task_id,
buffer_len = self.buffered_events.len(),
"event buffer saturated; completion event dropped — task may \
remain Running until timeout"
);
}
}
self.buffered_events.push_back(event);
}
() = tokio::time::sleep(wait_duration) => {}
}
}
pub fn record_spawn(
&mut self,
task_id: TaskId,
agent_handle_id: String,
agent_def_name: String,
) {
self.consecutive_spawn_failures = 0;
self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
self.running.insert(
task_id,
RunningTask {
agent_handle_id,
agent_def_name,
started_at: Instant::now(),
},
);
}
pub fn record_spawn_failure(
&mut self,
task_id: TaskId,
error: &SubAgentError,
) -> Vec<SchedulerAction> {
if let SubAgentError::ConcurrencyLimit { active, max } = error {
tracing::warn!(
task_id = %task_id,
active,
max,
next_backoff_ms = self.current_deferral_backoff().as_millis(),
"concurrency limit reached, deferring task to next tick"
);
self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
return Vec::new();
}
let error_excerpt: String = error.to_string().chars().take(512).collect();
tracing::warn!(
task_id = %task_id,
error = %error_excerpt,
"spawn failed, marking task failed"
);
self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
let mut actions = Vec::new();
for cancel_task_id in cancel_ids {
if let Some(running) = self.running.remove(&cancel_task_id) {
actions.push(SchedulerAction::Cancel {
agent_handle_id: running.agent_handle_id,
});
}
}
if self.graph.status != GraphStatus::Running {
self.graph.finished_at = Some(super::graph::chrono_now());
actions.push(SchedulerAction::Done {
status: self.graph.status,
});
}
actions
}
pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
if any_success {
self.consecutive_spawn_failures = 0;
} else if any_concurrency_failure {
self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
}
}
pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
self.graph.status = GraphStatus::Canceled;
self.graph.finished_at = Some(super::graph::chrono_now());
let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
let mut actions: Vec<SchedulerAction> = running
.into_iter()
.map(|(task_id, r)| {
self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
SchedulerAction::Cancel {
agent_handle_id: r.agent_handle_id,
}
})
.collect();
for task in &mut self.graph.tasks {
if !task.status.is_terminal() {
task.status = TaskStatus::Canceled;
}
}
actions.push(SchedulerAction::Done {
status: GraphStatus::Canceled,
});
actions
}
}
impl DagScheduler {
fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
let TaskEvent {
task_id,
agent_handle_id,
outcome,
} = event;
match self.running.get(&task_id) {
Some(running) if running.agent_handle_id != agent_handle_id => {
tracing::warn!(
task_id = %task_id,
expected = %running.agent_handle_id,
got = %agent_handle_id,
"discarding stale event from previous agent incarnation"
);
return Vec::new();
}
None => {
tracing::debug!(
task_id = %task_id,
agent_handle_id = %agent_handle_id,
"ignoring event for task not in running map"
);
return Vec::new();
}
Some(_) => {}
}
let duration_ms = self.running.get(&task_id).map_or(0, |r| {
u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
});
let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
self.running.remove(&task_id);
match outcome {
TaskOutcome::Completed { output, artifacts } => {
self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
self.graph.tasks[task_id.index()].result = Some(TaskResult {
output: output.clone(),
artifacts,
duration_ms,
agent_id: Some(agent_handle_id),
agent_def: agent_def_name,
});
if let Some(ref mut detector) = self.cascade_detector {
detector.record_outcome(task_id, true, &self.graph);
}
let newly_ready = dag::ready_tasks(&self.graph);
for ready_id in newly_ready {
if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
}
}
if self.verify_completeness {
vec![SchedulerAction::Verify { task_id, output }]
} else {
Vec::new()
}
}
TaskOutcome::Failed { error } => {
let error_excerpt: String = error.chars().take(512).collect();
tracing::warn!(
task_id = %task_id,
error = %error_excerpt,
"task failed"
);
self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
if let Some(ref mut detector) = self.cascade_detector {
detector.record_outcome(task_id, false, &self.graph);
}
let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
let mut actions = Vec::new();
for cancel_task_id in cancel_ids {
if let Some(running) = self.running.remove(&cancel_task_id) {
actions.push(SchedulerAction::Cancel {
agent_handle_id: running.agent_handle_id,
});
}
}
if self.graph.status != GraphStatus::Running {
self.graph.finished_at = Some(super::graph::chrono_now());
actions.push(SchedulerAction::Done {
status: self.graph.status,
});
}
actions
}
}
}
fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
let timed_out: Vec<(TaskId, String)> = self
.running
.iter()
.filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
.map(|(id, r)| (*id, r.agent_handle_id.clone()))
.collect();
let mut actions = Vec::new();
for (task_id, agent_handle_id) in timed_out {
tracing::warn!(
task_id = %task_id,
timeout_secs = self.task_timeout.as_secs(),
"task timed out"
);
self.running.remove(&task_id);
self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
actions.push(SchedulerAction::Cancel { agent_handle_id });
let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
for cancel_task_id in cancel_ids {
if let Some(running) = self.running.remove(&cancel_task_id) {
actions.push(SchedulerAction::Cancel {
agent_handle_id: running.agent_handle_id,
});
}
}
if self.graph.status != GraphStatus::Running {
self.graph.finished_at = Some(super::graph::chrono_now());
actions.push(SchedulerAction::Done {
status: self.graph.status,
});
break;
}
}
actions
}
fn build_task_prompt(&self, task: &TaskNode) -> String {
if task.depends_on.is_empty() {
return task.description.clone();
}
let completed_deps: Vec<&TaskNode> = task
.depends_on
.iter()
.filter_map(|dep_id| {
let dep = &self.graph.tasks[dep_id.index()];
if dep.status == TaskStatus::Completed {
Some(dep)
} else {
None
}
})
.collect();
if completed_deps.is_empty() {
return task.description.clone();
}
let budget_per_dep = self
.dependency_context_budget
.checked_div(completed_deps.len())
.unwrap_or(self.dependency_context_budget);
let mut context_block = String::from("<completed-dependencies>\n");
for dep in &completed_deps {
let escaped_id = xml_escape(&dep.id.to_string());
let escaped_title = xml_escape(&dep.title);
let _ = writeln!(
context_block,
"## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
);
if let Some(ref result) = dep.result {
let source = ContentSource::new(ContentSourceKind::A2aMessage);
let sanitized = self.sanitizer.sanitize(&result.output, source);
let safe_output = sanitized.body;
let char_count = safe_output.chars().count();
if char_count > budget_per_dep {
let truncated: String = safe_output.chars().take(budget_per_dep).collect();
let _ = write!(
context_block,
"{truncated}...\n[truncated: {char_count} chars total]"
);
} else {
context_block.push_str(&safe_output);
}
} else {
context_block.push_str("[no output recorded]\n");
}
context_block.push('\n');
}
for dep_id in &task.depends_on {
let dep = &self.graph.tasks[dep_id.index()];
if dep.status == TaskStatus::Skipped {
let escaped_id = xml_escape(&dep.id.to_string());
let escaped_title = xml_escape(&dep.title);
let _ = writeln!(
context_block,
"## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
);
}
}
context_block.push_str("</completed-dependencies>\n\n");
format!("{context_block}Your task: {}", task.description)
}
}
fn xml_escape(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'<' => out.push_str("<"),
'>' => out.push_str(">"),
'&' => out.push_str("&"),
'"' => out.push_str("""),
'\'' => out.push_str("'"),
other => out.push(other),
}
}
out
}
#[cfg(test)]
mod tests {
#![allow(clippy::default_trait_access)]
use super::*;
use crate::graph::{FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus};
fn make_node(id: u32, deps: &[u32]) -> TaskNode {
let mut n = TaskNode::new(
id,
format!("task-{id}"),
format!("description for task {id}"),
);
n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
n
}
fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
let mut g = TaskGraph::new("test goal");
g.tasks = nodes;
g
}
fn make_def(name: &str) -> SubAgentDef {
use zeph_subagent::{SkillFilter, SubAgentPermissions, SubagentHooks, ToolPolicy};
SubAgentDef {
name: name.to_string(),
description: format!("{name} agent"),
model: None,
tools: ToolPolicy::InheritAll,
disallowed_tools: vec![],
permissions: SubAgentPermissions::default(),
skills: SkillFilter::default(),
system_prompt: String::new(),
hooks: SubagentHooks::default(),
memory: None,
source: None,
file_path: None,
}
}
fn make_config() -> zeph_config::OrchestrationConfig {
zeph_config::OrchestrationConfig {
enabled: true,
max_tasks: 20,
max_parallel: 4,
default_failure_strategy: "abort".to_string(),
default_max_retries: 3,
task_timeout_secs: 300,
planner_provider: Default::default(),
planner_max_tokens: 4096,
dependency_context_budget: 16384,
confirm_before_execute: true,
aggregator_max_tokens: 4096,
deferral_backoff_ms: 250,
plan_cache: zeph_config::PlanCacheConfig::default(),
topology_selection: false,
verify_provider: Default::default(),
verify_max_tokens: 1024,
max_replans: 2,
verify_completeness: false,
completeness_threshold: 0.7,
tool_provider: Default::default(),
cascade_routing: false,
cascade_failure_threshold: 0.5,
tree_optimized_dispatch: false,
}
}
struct FirstRouter;
impl AgentRouter for FirstRouter {
fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
available.first().map(|d| d.name.clone())
}
}
struct NoneRouter;
impl AgentRouter for NoneRouter {
fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
None
}
}
fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
let config = make_config();
let defs = vec![make_def("worker")];
DagScheduler::new(graph, &config, router, defs).unwrap()
}
fn make_scheduler(graph: TaskGraph) -> DagScheduler {
let config = make_config();
let defs = vec![make_def("worker")];
DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
}
#[test]
fn test_new_validates_graph_status() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Running; let config = make_config();
let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_new_marks_roots_ready() {
let graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[0, 1]),
]);
let scheduler = make_scheduler(graph);
assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
assert_eq!(scheduler.graph().status, GraphStatus::Running);
}
#[test]
fn test_new_validates_empty_graph() {
let graph = graph_from_nodes(vec![]);
let config = make_config();
let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
assert!(result.is_err());
}
#[test]
fn test_tick_produces_spawn_for_ready() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
let mut scheduler = make_scheduler(graph);
let actions = scheduler.tick();
let spawns: Vec<_> = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.collect();
assert_eq!(spawns.len(), 2);
}
#[test]
fn test_tick_dispatches_all_regardless_of_max_parallel() {
let graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[]),
make_node(3, &[]),
make_node(4, &[]),
]);
let mut config = make_config();
config.max_parallel = 2;
let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
let actions = scheduler.tick();
let spawn_count = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count();
assert_eq!(
spawn_count, 2,
"max_parallel=2 caps dispatched tasks per tick"
);
}
#[test]
fn test_tick_detects_completion() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Completed;
let config = make_config();
let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
let actions = scheduler.tick();
let has_done = actions.iter().any(|a| {
matches!(
a,
SchedulerAction::Done {
status: GraphStatus::Completed
}
)
});
assert!(
has_done,
"should emit Done(Completed) when all tasks are terminal"
);
}
#[test]
fn test_completion_event_marks_deps_ready() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "handle-0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "handle-0".to_string(),
outcome: TaskOutcome::Completed {
output: "done".to_string(),
artifacts: vec![],
},
};
scheduler.buffered_events.push_back(event);
let actions = scheduler.tick();
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
let has_spawn_1 = actions
.iter()
.any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
assert!(
has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
"task 1 should be spawned or marked Ready"
);
}
#[test]
fn test_failure_abort_cancels_running() {
let graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[0, 1]),
]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
scheduler.graph.tasks[1].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(1),
RunningTask {
agent_handle_id: "h1".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Failed {
error: "boom".to_string(),
},
};
scheduler.buffered_events.push_back(event);
let actions = scheduler.tick();
assert_eq!(scheduler.graph.status, GraphStatus::Failed);
let cancel_ids: Vec<_> = actions
.iter()
.filter_map(|a| {
if let SchedulerAction::Cancel { agent_handle_id } = a {
Some(agent_handle_id.as_str())
} else {
None
}
})
.collect();
assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
assert!(
actions
.iter()
.any(|a| matches!(a, SchedulerAction::Done { .. }))
);
}
#[test]
fn test_failure_skip_propagates() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Failed {
error: "skip me".to_string(),
},
};
scheduler.buffered_events.push_back(event);
scheduler.tick();
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
}
#[test]
fn test_failure_retry_reschedules() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
scheduler.graph.tasks[0].max_retries = Some(3);
scheduler.graph.tasks[0].retry_count = 0;
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Failed {
error: "transient".to_string(),
},
};
scheduler.buffered_events.push_back(event);
let actions = scheduler.tick();
let has_spawn = actions
.iter()
.any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
assert!(
has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
"retry should produce spawn or Ready status"
);
assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
}
#[test]
fn test_process_event_failed_retry() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
scheduler.graph.tasks[0].max_retries = Some(2);
scheduler.graph.tasks[0].retry_count = 0;
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Failed {
error: "first failure".to_string(),
},
};
scheduler.buffered_events.push_back(event);
let actions = scheduler.tick();
assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
let spawned = actions
.iter()
.any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
assert!(
spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
"retry should emit Spawn or set Ready"
);
assert_eq!(scheduler.graph.status, GraphStatus::Running);
}
#[test]
fn test_timeout_cancels_stalled() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut config = make_config();
config.task_timeout_secs = 1; let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), },
);
let actions = scheduler.tick();
let has_cancel = actions.iter().any(
|a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
);
assert!(has_cancel, "timed-out task should emit Cancel action");
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
}
#[test]
fn test_cancel_all() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
scheduler.graph.tasks[1].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(1),
RunningTask {
agent_handle_id: "h1".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let actions = scheduler.cancel_all();
assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
assert!(scheduler.running.is_empty());
let cancel_count = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
.count();
assert_eq!(cancel_count, 2);
assert!(actions.iter().any(|a| matches!(
a,
SchedulerAction::Done {
status: GraphStatus::Canceled
}
)));
}
#[test]
fn test_record_spawn_failure() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::Spawn("spawn error".to_string());
let actions = scheduler.record_spawn_failure(TaskId(0), &error);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
assert_eq!(scheduler.graph.status, GraphStatus::Failed);
assert!(
actions
.iter()
.any(|a| matches!(a, SchedulerAction::Done { .. }))
);
}
#[test]
fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
let actions = scheduler.record_spawn_failure(TaskId(0), &error);
assert_eq!(
scheduler.graph.tasks[0].status,
TaskStatus::Ready,
"task must revert to Ready so the next tick can retry"
);
assert_eq!(
scheduler.graph.status,
GraphStatus::Running,
"graph must stay Running, not transition to Failed"
);
assert!(
actions.is_empty(),
"no cancel or done actions expected for a transient deferral"
);
}
#[test]
fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
let actions = scheduler.record_spawn_failure(TaskId(0), &error);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
assert!(actions.is_empty());
}
#[test]
fn test_concurrency_deferral_does_not_affect_running_task() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
scheduler.graph.tasks[1].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
let actions = scheduler.record_spawn_failure(TaskId(1), &error);
assert_eq!(
scheduler.graph.tasks[0].status,
TaskStatus::Running,
"task 0 must remain Running"
);
assert_eq!(
scheduler.graph.tasks[1].status,
TaskStatus::Ready,
"task 1 must revert to Ready"
);
assert_eq!(
scheduler.graph.status,
GraphStatus::Running,
"graph must stay Running"
);
assert!(actions.is_empty(), "no cancel or done actions expected");
}
#[test]
fn test_max_concurrent_zero_no_infinite_loop() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = zeph_config::OrchestrationConfig {
max_parallel: 0,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let actions1 = scheduler.tick();
assert!(
actions1
.iter()
.all(|a| !matches!(a, SchedulerAction::Spawn { .. })),
"no Spawn expected when max_parallel=0"
);
assert!(
actions1
.iter()
.all(|a| !matches!(a, SchedulerAction::Done { .. })),
"no Done(Failed) expected — ready tasks exist, so no deadlock"
);
assert_eq!(scheduler.graph.status, GraphStatus::Running);
let actions2 = scheduler.tick();
assert!(
actions2
.iter()
.all(|a| !matches!(a, SchedulerAction::Done { .. })),
"second tick must not emit Done(Failed) — ready tasks still exist"
);
assert_eq!(
scheduler.graph.status,
GraphStatus::Running,
"graph must remain Running"
);
}
#[test]
fn test_all_tasks_deferred_graph_stays_running() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
let mut scheduler = make_scheduler(graph);
let actions = scheduler.tick();
assert_eq!(
actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count(),
2,
"expected 2 Spawn actions on first tick"
);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
assert_eq!(scheduler.graph.status, GraphStatus::Running);
let retry_actions = scheduler.tick();
let spawn_count = retry_actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count();
assert!(
spawn_count > 0,
"second tick must re-emit Spawn for deferred tasks"
);
assert!(
retry_actions.iter().all(|a| !matches!(
a,
SchedulerAction::Done {
status: GraphStatus::Failed,
..
}
)),
"no Done(Failed) expected"
);
}
#[test]
fn test_build_prompt_no_deps() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let scheduler = make_scheduler(graph);
let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
assert_eq!(prompt, "description for task 0");
}
#[test]
fn test_build_prompt_with_deps_and_truncation() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[0].result = Some(TaskResult {
output: "x".repeat(200),
artifacts: vec![],
duration_ms: 10,
agent_id: None,
agent_def: None,
});
let config = zeph_config::OrchestrationConfig {
dependency_context_budget: 50,
..make_config()
};
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
assert!(prompt.contains("<completed-dependencies>"));
assert!(prompt.contains("[truncated:"));
assert!(prompt.contains("Your task:"));
}
#[test]
fn test_duration_ms_computed_correctly() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now()
.checked_sub(Duration::from_millis(50))
.unwrap(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Completed {
output: "result".to_string(),
artifacts: vec![],
},
};
scheduler.buffered_events.push_back(event);
scheduler.tick();
let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
assert!(
result.duration_ms > 0,
"duration_ms should be > 0, got {}",
result.duration_ms
);
}
#[test]
fn test_utf8_safe_truncation() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
let unicode_output = "日本語テスト".repeat(100);
graph.tasks[0].result = Some(TaskResult {
output: unicode_output,
artifacts: vec![],
duration_ms: 10,
agent_id: None,
agent_def: None,
});
let config = zeph_config::OrchestrationConfig {
dependency_context_budget: 500,
..make_config()
};
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
assert!(
prompt.contains("日"),
"Japanese characters should be in the prompt after safe truncation"
);
}
#[test]
fn test_no_agent_routes_inline() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
let actions = scheduler.tick();
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
assert!(
actions
.iter()
.any(|a| matches!(a, SchedulerAction::RunInline { .. }))
);
}
#[test]
fn test_stale_event_rejected() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "current-handle".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let stale_event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "old-handle".to_string(),
outcome: TaskOutcome::Completed {
output: "stale output".to_string(),
artifacts: vec![],
},
};
scheduler.buffered_events.push_back(stale_event);
let actions = scheduler.tick();
assert_ne!(
scheduler.graph.tasks[0].status,
TaskStatus::Completed,
"stale event must not complete the task"
);
let has_done = actions
.iter()
.any(|a| matches!(a, SchedulerAction::Done { .. }));
assert!(
!has_done,
"no Done action should be emitted for a stale event"
);
assert!(
scheduler.running.contains_key(&TaskId(0)),
"running task must remain after stale event"
);
}
#[test]
fn test_build_prompt_chars_count_in_truncation_message() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
let output = "x".repeat(200);
graph.tasks[0].result = Some(TaskResult {
output,
artifacts: vec![],
duration_ms: 10,
agent_id: None,
agent_def: None,
});
let config = zeph_config::OrchestrationConfig {
dependency_context_budget: 10, ..make_config()
};
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
assert!(
prompt.contains("chars total"),
"truncation message must use 'chars total' label. Prompt: {prompt}"
);
assert!(
prompt.contains("[truncated:"),
"prompt must contain truncation notice. Prompt: {prompt}"
);
}
#[test]
fn test_resume_from_accepts_paused_graph() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Paused;
graph.tasks[0].status = TaskStatus::Pending;
let scheduler =
DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.expect("resume_from should accept Paused graph");
assert_eq!(scheduler.graph.status, GraphStatus::Running);
}
#[test]
fn test_resume_from_accepts_failed_graph() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Failed;
graph.tasks[0].status = TaskStatus::Failed;
let scheduler =
DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.expect("resume_from should accept Failed graph");
assert_eq!(scheduler.graph.status, GraphStatus::Running);
}
#[test]
fn test_resume_from_rejects_completed_graph() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Completed;
let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_resume_from_rejects_canceled_graph() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Canceled;
let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_resume_from_reconstructs_running_tasks() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.status = GraphStatus::Paused;
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
graph.tasks[0].agent_hint = Some("worker".to_string());
graph.tasks[1].status = TaskStatus::Pending;
let scheduler =
DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.expect("should succeed");
assert!(
scheduler.running.contains_key(&TaskId(0)),
"Running task must be reconstructed in the running map (IC1)"
);
assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
assert!(
!scheduler.running.contains_key(&TaskId(1)),
"Pending task must not appear in running map"
);
}
#[test]
fn test_resume_from_sets_status_running() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Paused;
let scheduler =
DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.unwrap();
assert_eq!(scheduler.graph.status, GraphStatus::Running);
}
#[test]
fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 1,
"first deferral tick: consecutive_spawn_failures must be 1"
);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 2,
"second deferral tick: consecutive_spawn_failures must be 2"
);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 3,
"third deferral tick: consecutive_spawn_failures must be 3"
);
}
#[test]
fn test_consecutive_spawn_failures_resets_on_success() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
assert_eq!(scheduler.consecutive_spawn_failures, 2);
scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
assert_eq!(
scheduler.consecutive_spawn_failures, 0,
"record_spawn must reset consecutive_spawn_failures to 0"
);
}
#[tokio::test]
async fn test_exponential_backoff_duration() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = zeph_config::OrchestrationConfig {
deferral_backoff_ms: 50,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(scheduler.consecutive_spawn_failures, 0);
let start = tokio::time::Instant::now();
scheduler.wait_event().await;
let elapsed0 = start.elapsed();
assert!(
elapsed0.as_millis() >= 50,
"backoff with 0 deferrals must be >= base (50ms), got {}ms",
elapsed0.as_millis()
);
scheduler.consecutive_spawn_failures = 3;
let start = tokio::time::Instant::now();
scheduler.wait_event().await;
let elapsed3 = start.elapsed();
assert!(
elapsed3.as_millis() >= 400,
"backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
elapsed3.as_millis()
);
scheduler.consecutive_spawn_failures = 20;
let start = tokio::time::Instant::now();
scheduler.wait_event().await;
let elapsed_capped = start.elapsed();
assert!(
elapsed_capped.as_millis() >= 5000,
"backoff must be capped at 5000ms with high deferrals, got {}ms",
elapsed_capped.as_millis()
);
}
#[tokio::test]
async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = zeph_config::OrchestrationConfig {
deferral_backoff_ms: 50,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert!(scheduler.running.is_empty());
let start = tokio::time::Instant::now();
scheduler.wait_event().await;
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() >= 50,
"wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
elapsed.as_millis()
);
}
#[test]
fn test_current_deferral_backoff_exponential_growth() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = zeph_config::OrchestrationConfig {
deferral_backoff_ms: 250,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(
scheduler.current_deferral_backoff(),
Duration::from_millis(250)
);
scheduler.consecutive_spawn_failures = 1;
assert_eq!(
scheduler.current_deferral_backoff(),
Duration::from_millis(500)
);
scheduler.consecutive_spawn_failures = 2;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
scheduler.consecutive_spawn_failures = 3;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
scheduler.consecutive_spawn_failures = 4;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
scheduler.consecutive_spawn_failures = 5;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
scheduler.consecutive_spawn_failures = 100;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
}
#[test]
fn test_record_spawn_resets_consecutive_failures() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = DagScheduler::new(
graph,
&make_config(),
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
scheduler.consecutive_spawn_failures = 3;
let task_id = TaskId(0);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
assert_eq!(scheduler.consecutive_spawn_failures, 0);
}
#[test]
fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = DagScheduler::new(
graph,
&make_config(),
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(scheduler.consecutive_spawn_failures, 0);
let task_id = TaskId(0);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
scheduler.record_spawn_failure(task_id, &error);
assert_eq!(scheduler.consecutive_spawn_failures, 0);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
}
#[test]
fn test_parallel_dispatch_all_ready() {
let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
let graph = graph_from_nodes(nodes);
let config = zeph_config::OrchestrationConfig {
max_parallel: 2,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let actions = scheduler.tick();
let spawn_count = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count();
assert_eq!(
spawn_count, 2,
"only max_parallel=2 tasks dispatched per tick"
);
let running_count = scheduler
.graph
.tasks
.iter()
.filter(|t| t.status == TaskStatus::Running)
.count();
assert_eq!(running_count, 2, "only 2 tasks marked Running");
}
#[test]
fn test_batch_backoff_partial_success() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.consecutive_spawn_failures = 3;
scheduler.record_batch_backoff(true, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 0,
"any success in batch must reset counter"
);
}
#[test]
fn test_batch_backoff_all_failed() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.consecutive_spawn_failures = 2;
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 3,
"all-failure tick must increment counter"
);
}
#[test]
fn test_batch_backoff_no_spawns() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.consecutive_spawn_failures = 5;
scheduler.record_batch_backoff(false, false);
assert_eq!(
scheduler.consecutive_spawn_failures, 5,
"no spawns must not change counter"
);
}
#[test]
fn test_buffer_guard_uses_task_count() {
let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
let graph = graph_from_nodes(nodes);
let config = zeph_config::OrchestrationConfig {
max_parallel: 2, ..make_config()
};
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(scheduler.graph.tasks.len() * 2, 20);
assert_eq!(scheduler.max_parallel * 2, 4);
}
#[test]
fn test_batch_mixed_concurrency_and_fatal_failure() {
let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
nodes[1].failure_strategy = Some(FailureStrategy::Skip);
let graph = graph_from_nodes(nodes);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.graph.tasks[1].status = TaskStatus::Running;
let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
assert!(
actions0.is_empty(),
"ConcurrencyLimit must produce no extra actions"
);
assert_eq!(
scheduler.graph.tasks[0].status,
TaskStatus::Ready,
"task 0 must revert to Ready"
);
let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
assert_eq!(
scheduler.graph.tasks[1].status,
TaskStatus::Skipped,
"task 1: Skip strategy turns Failed into Skipped via propagate_failure"
);
assert!(
actions1
.iter()
.all(|a| !matches!(a, SchedulerAction::Done { .. })),
"no Done action expected: task 0 is still Ready"
);
scheduler.consecutive_spawn_failures = 0;
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 1,
"batch with only ConcurrencyLimit must increment counter"
);
}
#[test]
fn test_deadlock_marks_non_terminal_tasks_canceled() {
let mut nodes = vec![make_node(0, &[]), make_node(1, &[0]), make_node(2, &[0])];
nodes[0].status = TaskStatus::Failed;
nodes[1].status = TaskStatus::Pending;
nodes[2].status = TaskStatus::Pending;
let mut graph = graph_from_nodes(nodes);
graph.status = GraphStatus::Failed;
let mut scheduler = DagScheduler::resume_from(
graph,
&make_config(),
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let actions = scheduler.tick();
assert!(
actions.iter().any(|a| matches!(
a,
SchedulerAction::Done {
status: GraphStatus::Failed
}
)),
"deadlock must emit Done(Failed); got: {actions:?}"
);
assert_eq!(scheduler.graph.status, GraphStatus::Failed);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
assert_eq!(
scheduler.graph.tasks[1].status,
TaskStatus::Canceled,
"Pending task must be Canceled on deadlock"
);
assert_eq!(
scheduler.graph.tasks[2].status,
TaskStatus::Canceled,
"Pending task must be Canceled on deadlock"
);
}
#[test]
fn test_deadlock_not_triggered_when_task_running() {
let mut nodes = vec![make_node(0, &[]), make_node(1, &[0])];
nodes[0].status = TaskStatus::Running;
nodes[0].assigned_agent = Some("handle-1".into());
nodes[1].status = TaskStatus::Pending;
let mut graph = graph_from_nodes(nodes);
graph.status = GraphStatus::Failed;
let mut scheduler = DagScheduler::resume_from(
graph,
&make_config(),
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let actions = scheduler.tick();
assert!(
actions
.iter()
.all(|a| !matches!(a, SchedulerAction::Done { .. })),
"no Done action expected when a task is running; got: {actions:?}"
);
assert_eq!(scheduler.graph.status, GraphStatus::Running);
}
#[test]
fn topology_linear_chain_limits_parallelism_to_one() {
let graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[0]),
make_node(2, &[1]),
]);
let config = zeph_config::OrchestrationConfig {
topology_selection: true,
max_parallel: 4,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(
scheduler.topology().topology,
crate::topology::Topology::LinearChain
);
assert_eq!(scheduler.max_parallel, 1);
let actions = scheduler.tick();
let spawn_count = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count();
assert_eq!(spawn_count, 1, "linear chain: only 1 task dispatched");
}
#[test]
fn topology_all_parallel_dispatches_all_ready() {
let graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[]),
make_node(3, &[]),
]);
let config = zeph_config::OrchestrationConfig {
topology_selection: true,
max_parallel: 4,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(
scheduler.topology().topology,
crate::topology::Topology::AllParallel
);
let actions = scheduler.tick();
let spawn_count = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count();
assert_eq!(spawn_count, 4, "all-parallel: all 4 tasks dispatched");
}
#[test]
fn sequential_dispatch_one_at_a_time_parallel_unblocked() {
use crate::graph::ExecutionMode;
let mut a = make_node(0, &[]);
a.execution_mode = ExecutionMode::Sequential;
let mut b = make_node(1, &[]);
b.execution_mode = ExecutionMode::Sequential;
let mut c = make_node(2, &[]);
c.execution_mode = ExecutionMode::Parallel;
let graph = graph_from_nodes(vec![a, b, c]);
let config = zeph_config::OrchestrationConfig {
max_parallel: 4,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let actions = scheduler.tick();
let spawned: Vec<TaskId> = actions
.iter()
.filter_map(|a| {
if let SchedulerAction::Spawn { task_id, .. } = a {
Some(*task_id)
} else {
None
}
})
.collect();
assert!(
spawned.contains(&TaskId(0)),
"A(sequential) must be dispatched"
);
assert!(
spawned.contains(&TaskId(2)),
"C(parallel) must be dispatched"
);
assert!(!spawned.contains(&TaskId(1)), "B(sequential) must be held");
assert_eq!(spawned.len(), 2);
}
#[test]
fn test_inject_tasks_per_task_cap_skips_second() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
let mut scheduler = make_scheduler(graph);
let first = make_node(2, &[]);
scheduler.inject_tasks(TaskId(0), vec![first], 20).unwrap();
assert_eq!(
scheduler.graph.tasks.len(),
3,
"first inject must append the task"
);
assert_eq!(scheduler.global_replan_count, 1);
let second = make_node(3, &[]);
scheduler.inject_tasks(TaskId(0), vec![second], 20).unwrap();
assert_eq!(
scheduler.graph.tasks.len(),
3,
"second inject must be silently skipped (per-task cap)"
);
assert_eq!(
scheduler.global_replan_count, 1,
"global counter must not increment on skipped inject"
);
}
#[test]
fn test_inject_tasks_global_cap_skips_when_exhausted() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
let mut config = make_config();
config.max_replans = 1;
let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
let new1 = make_node(2, &[]);
scheduler.inject_tasks(TaskId(0), vec![new1], 20).unwrap();
assert_eq!(scheduler.global_replan_count, 1);
let new2 = make_node(3, &[]);
scheduler.inject_tasks(TaskId(1), vec![new2], 20).unwrap();
assert_eq!(
scheduler.graph.tasks.len(),
3,
"global cap must prevent the second inject"
);
assert_eq!(
scheduler.global_replan_count, 1,
"global counter must not increment past cap"
);
}
#[test]
fn test_inject_tasks_sets_topology_dirty() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
assert!(
!scheduler.topology_dirty,
"topology_dirty must be false initially"
);
let new_task = make_node(1, &[]);
scheduler
.inject_tasks(TaskId(0), vec![new_task], 20)
.unwrap();
assert!(
scheduler.topology_dirty,
"inject_tasks must set topology_dirty=true"
);
scheduler.tick();
assert!(
!scheduler.topology_dirty,
"tick() must clear topology_dirty after re-analysis"
);
}
#[test]
fn test_inject_tasks_rejects_cycle() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
let cyclic_task = make_node(1, &[1]);
let result = scheduler.inject_tasks(TaskId(0), vec![cyclic_task], 20);
assert!(result.is_err(), "cyclic injection must return an error");
assert!(
matches!(
result.unwrap_err(),
OrchestrationError::VerificationFailed(_)
),
"must return VerificationFailed for cycle"
);
assert_eq!(scheduler.global_replan_count, 0);
assert!(
!scheduler.topology_dirty,
"topology_dirty must not be set when inject fails"
);
}
fn make_hierarchical_config() -> zeph_config::OrchestrationConfig {
zeph_config::OrchestrationConfig {
topology_selection: true,
max_parallel: 4,
..make_config()
}
}
fn make_hierarchical_graph() -> TaskGraph {
graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[0]),
make_node(2, &[0]),
make_node(3, &[1]),
])
}
#[test]
fn test_level_barrier_advances_on_terminal_level() {
let graph = make_hierarchical_graph();
let config = make_hierarchical_config();
let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
assert_eq!(
scheduler.topology().strategy,
crate::topology::DispatchStrategy::LevelBarrier,
"must use LevelBarrier strategy for Hierarchical graph"
);
assert_eq!(scheduler.current_level, 0);
let actions = scheduler.tick();
let spawned_ids: Vec<_> = actions
.iter()
.filter_map(|a| {
if let SchedulerAction::Spawn { task_id, .. } = a {
Some(*task_id)
} else {
None
}
})
.collect();
assert_eq!(
spawned_ids,
vec![TaskId(0)],
"first tick must dispatch only A at level 0"
);
scheduler.graph.tasks[0].status = TaskStatus::Completed;
scheduler.running.clear();
scheduler.graph.tasks[1].status = TaskStatus::Ready;
scheduler.graph.tasks[2].status = TaskStatus::Ready;
let actions2 = scheduler.tick();
assert_eq!(
scheduler.current_level, 1,
"current_level must advance to 1 after level-0 tasks terminate"
);
let spawned2: Vec<_> = actions2
.iter()
.filter_map(|a| {
if let SchedulerAction::Spawn { task_id, .. } = a {
Some(*task_id)
} else {
None
}
})
.collect();
assert!(
spawned2.contains(&TaskId(1)),
"B must be dispatched after level advance"
);
assert!(
spawned2.contains(&TaskId(2)),
"C must be dispatched after level advance"
);
}
#[test]
fn test_level_barrier_failure_propagates_transitively() {
let graph = make_hierarchical_graph();
let config = make_hierarchical_config();
let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
scheduler.graph.tasks[0].failure_strategy = Some(crate::graph::FailureStrategy::Skip);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
scheduler.buffered_events.push_back(TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Failed {
error: "simulated failure".to_string(),
},
});
scheduler.tick();
assert_eq!(
scheduler.graph.tasks[0].status,
TaskStatus::Skipped,
"A must be Skipped (Skip strategy)"
);
assert_eq!(
scheduler.graph.tasks[1].status,
TaskStatus::Skipped,
"B must be transitively Skipped"
);
assert_eq!(
scheduler.graph.tasks[2].status,
TaskStatus::Skipped,
"C must be transitively Skipped"
);
assert_eq!(
scheduler.graph.tasks[3].status,
TaskStatus::Skipped,
"D must be transitively Skipped"
);
}
#[test]
fn test_level_barrier_current_level_reset_after_inject() {
let graph = make_hierarchical_graph(); let config = make_hierarchical_config();
let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
scheduler.graph.tasks[0].status = TaskStatus::Completed; scheduler.graph.tasks[1].status = TaskStatus::Completed; scheduler.graph.tasks[2].status = TaskStatus::Completed; scheduler.current_level = 2;
let e = make_node(4, &[0]);
scheduler.inject_tasks(TaskId(3), vec![e], 20).unwrap();
assert!(scheduler.topology_dirty);
scheduler.tick();
assert_eq!(
scheduler.current_level, 1,
"current_level must reset to min non-terminal depth (1) after inject at depth 1"
);
}
#[test]
fn resume_from_preserves_topology_classification() {
let mut graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[0]),
make_node(2, &[1]),
]);
graph.status = GraphStatus::Paused;
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[1].status = TaskStatus::Pending;
graph.tasks[2].status = TaskStatus::Pending;
let config = zeph_config::OrchestrationConfig {
topology_selection: true,
max_parallel: 4,
..make_config()
};
let scheduler = DagScheduler::resume_from(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(
scheduler.topology().topology,
crate::topology::Topology::LinearChain,
"resume_from must classify topology"
);
assert_eq!(
scheduler.max_parallel, 1,
"resume_from must apply topology limit"
);
}
fn make_verify_config(provider: &str) -> zeph_config::OrchestrationConfig {
zeph_config::OrchestrationConfig {
verify_completeness: true,
verify_provider: zeph_config::ProviderName::new(provider),
..make_config()
}
}
#[test]
fn validate_verify_config_unknown_provider_returns_err() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = make_verify_config("nonexistent");
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let result = scheduler.validate_verify_config(&["fast", "quality"]);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("nonexistent"));
assert!(err_msg.contains("fast"));
}
#[test]
fn validate_verify_config_known_provider_returns_ok() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = make_verify_config("fast");
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert!(
scheduler
.validate_verify_config(&["fast", "quality"])
.is_ok()
);
}
#[test]
fn validate_verify_config_empty_provider_always_ok() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = make_verify_config("");
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
}
#[test]
fn validate_verify_config_disabled_skips_validation() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let scheduler = make_scheduler(graph);
assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
}
#[test]
fn validate_verify_config_empty_pool_skips_validation() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = make_verify_config("nonexistent");
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert!(scheduler.validate_verify_config(&[]).is_ok());
}
#[test]
fn validate_verify_config_trims_whitespace_in_config() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = make_verify_config(" fast ");
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert!(scheduler.validate_verify_config(&["fast"]).is_ok());
}
#[test]
fn config_max_parallel_initialized_from_config() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
let config = zeph_config::OrchestrationConfig {
topology_selection: true,
max_parallel: 6,
..make_config()
};
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(
scheduler.config_max_parallel, 6,
"config_max_parallel must equal config.max_parallel"
);
assert_eq!(
scheduler.max_parallel, 1,
"max_parallel reduced by topology analysis"
);
assert_eq!(
scheduler.config_max_parallel, 6,
"config_max_parallel must not be reduced by topology"
);
}
#[test]
fn max_parallel_does_not_drift_across_inject_tick_cycles() {
let graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[0]),
make_node(2, &[0]),
make_node(3, &[1, 2]), ]);
let config = zeph_config::OrchestrationConfig {
topology_selection: true,
max_parallel: 4,
max_tasks: 50,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(
scheduler.topology().topology,
crate::topology::Topology::Mixed,
"initial topology must be Mixed"
);
let expected_max_parallel = (4usize / 2 + 1).clamp(1, 4); assert_eq!(scheduler.max_parallel, expected_max_parallel);
let extra_task_id = 4u32;
let extra_task = {
let mut n = crate::graph::TaskNode::new(
extra_task_id,
"extra".to_string(),
"extra task injected by replan",
);
n.depends_on = vec![TaskId(3)];
n
};
scheduler.graph.tasks[3].status = TaskStatus::Completed;
scheduler
.inject_tasks(TaskId(3), vec![extra_task], 50)
.expect("inject must succeed");
assert!(
scheduler.topology_dirty,
"topology_dirty must be true after inject"
);
let _ = scheduler.tick();
let max_after_first_inject = scheduler.max_parallel;
assert_eq!(
max_after_first_inject, expected_max_parallel,
"max_parallel must not drift after first inject+tick"
);
let extra_task2 = {
let mut n = crate::graph::TaskNode::new(5u32, "extra2".to_string(), "second replan");
n.depends_on = vec![TaskId(extra_task_id)];
n
};
scheduler.graph.tasks[extra_task_id as usize].status = TaskStatus::Completed;
scheduler
.inject_tasks(TaskId(extra_task_id), vec![extra_task2], 50)
.expect("second inject must succeed");
let _ = scheduler.tick();
let max_after_second_inject = scheduler.max_parallel;
assert_eq!(
max_after_second_inject, expected_max_parallel,
"max_parallel must not drift after second inject+tick (was: {max_after_second_inject}, expected: {expected_max_parallel})"
);
}
#[test]
fn completeness_threshold_returns_config_value() {
let mut config = make_config();
config.completeness_threshold = 0.85;
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let scheduler =
DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
assert!((scheduler.completeness_threshold() - 0.85).abs() < f32::EPSILON);
}
#[test]
fn completeness_threshold_default_is_0_7() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let scheduler = make_scheduler(graph);
assert!((scheduler.completeness_threshold() - 0.7).abs() < f32::EPSILON);
}
#[test]
fn verify_provider_name_returns_config_value() {
let mut config = make_config();
config.verify_provider = zeph_config::ProviderName::new("fast");
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let scheduler =
DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
assert_eq!(scheduler.verify_provider_name(), "fast");
}
#[test]
fn verify_provider_name_empty_when_not_set() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let scheduler = make_scheduler(graph);
assert_eq!(scheduler.verify_provider_name(), "");
}
#[test]
fn max_replans_remaining_initial_equals_max_replans() {
let mut config = make_config();
config.max_replans = 3;
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let scheduler =
DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![make_def("w")]).unwrap();
assert_eq!(scheduler.max_replans_remaining(), 3);
}
#[test]
fn max_replans_remaining_decrements_after_record() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
assert_eq!(scheduler.max_replans_remaining(), 2);
scheduler.record_whole_plan_replan();
assert_eq!(scheduler.max_replans_remaining(), 1);
scheduler.record_whole_plan_replan();
assert_eq!(scheduler.max_replans_remaining(), 0);
scheduler.record_whole_plan_replan();
assert_eq!(scheduler.max_replans_remaining(), 0);
}
#[test]
fn record_whole_plan_replan_does_not_modify_graph() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
let task_count_before = scheduler.graph().tasks.len();
scheduler.record_whole_plan_replan();
assert_eq!(
scheduler.graph().tasks.len(),
task_count_before,
"record_whole_plan_replan must not modify the task graph"
);
}
fn make_cascade_config() -> zeph_config::OrchestrationConfig {
zeph_config::OrchestrationConfig {
topology_selection: true,
cascade_routing: true,
cascade_failure_threshold: 0.4,
max_parallel: 4,
..make_config()
}
}
#[test]
fn inject_tasks_resets_cascade_detector() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[1].status = TaskStatus::Completed;
let config = make_cascade_config();
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
if let Some(ref mut det) = scheduler.cascade_detector {
let g = &scheduler.graph;
det.record_outcome(TaskId(1), false, g);
assert_eq!(det.region_health().len(), 1);
} else {
panic!(
"cascade_detector must be Some when cascade_routing=true and topology_selection=true"
);
}
let new_task = make_node(2, &[1]);
scheduler
.inject_tasks(TaskId(1), vec![new_task], 20)
.unwrap();
assert!(
scheduler
.cascade_detector
.as_ref()
.is_some_and(|d| d.region_health().is_empty()),
"cascade_detector must be cleared after inject_tasks (C13 fix)"
);
}
#[test]
fn sequential_tasks_not_reordered_by_cascade() {
let mut graph = graph_from_nodes(vec![
make_node(0, &[]), make_node(1, &[]), make_node(2, &[1]), ]);
graph.tasks[2].execution_mode = ExecutionMode::Sequential;
let config = make_cascade_config();
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
if let Some(ref mut det) = scheduler.cascade_detector {
let g = &scheduler.graph;
det.record_outcome(TaskId(1), false, g);
det.record_outcome(TaskId(2), false, g);
} else {
panic!("cascade_detector must be Some");
}
let actions = scheduler.tick();
let spawned_ids: Vec<TaskId> = actions
.iter()
.filter_map(|a| {
if let SchedulerAction::Spawn { task_id, .. }
| SchedulerAction::RunInline { task_id, .. } = a
{
Some(*task_id)
} else {
None
}
})
.collect();
assert!(
!spawned_ids.is_empty(),
"tick must dispatch at least one ready task; Sequential tasks must not be dropped by cascade logic"
);
}
#[test]
fn cascade_routing_without_topology_selection_creates_no_detector() {
let config = zeph_config::OrchestrationConfig {
cascade_routing: true,
topology_selection: false,
..make_config()
};
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert!(
scheduler.cascade_detector.is_none(),
"cascade_detector must be None when topology_selection=false"
);
}
}