use std::collections::{HashMap, VecDeque};
use std::fmt::Write as _;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use super::dag;
use super::error::OrchestrationError;
use super::graph::{GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus};
use super::router::AgentRouter;
use crate::config::OrchestrationConfig;
use crate::sanitizer::{
ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind,
};
use crate::subagent::SubAgentDef;
use crate::subagent::error::SubAgentError;
#[derive(Debug)]
pub enum SchedulerAction {
Spawn {
task_id: TaskId,
agent_def_name: String,
prompt: String,
},
Cancel { agent_handle_id: String },
RunInline { task_id: TaskId, prompt: String },
Done { status: GraphStatus },
}
#[derive(Debug)]
pub struct TaskEvent {
pub task_id: TaskId,
pub agent_handle_id: String,
pub outcome: TaskOutcome,
}
#[derive(Debug)]
pub enum TaskOutcome {
Completed {
output: String,
artifacts: Vec<PathBuf>,
},
Failed { error: String },
}
struct RunningTask {
agent_handle_id: String,
agent_def_name: String,
started_at: Instant,
}
pub struct DagScheduler {
graph: TaskGraph,
max_parallel: usize,
running: HashMap<TaskId, RunningTask>,
event_rx: mpsc::Receiver<TaskEvent>,
event_tx: mpsc::Sender<TaskEvent>,
task_timeout: Duration,
router: Box<dyn AgentRouter>,
available_agents: Vec<SubAgentDef>,
dependency_context_budget: usize,
buffered_events: VecDeque<TaskEvent>,
sanitizer: ContentSanitizer,
deferral_backoff: Duration,
consecutive_spawn_failures: u32,
}
impl std::fmt::Debug for DagScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DagScheduler")
.field("graph_id", &self.graph.id)
.field("graph_status", &self.graph.status)
.field("running_count", &self.running.len())
.field("max_parallel", &self.max_parallel)
.field("task_timeout_secs", &self.task_timeout.as_secs())
.finish_non_exhaustive()
}
}
impl DagScheduler {
pub fn new(
mut graph: TaskGraph,
config: &OrchestrationConfig,
router: Box<dyn AgentRouter>,
available_agents: Vec<SubAgentDef>,
) -> Result<Self, OrchestrationError> {
if graph.status != GraphStatus::Created {
return Err(OrchestrationError::InvalidGraph(format!(
"graph must be in Created status, got {}",
graph.status
)));
}
dag::validate(&graph.tasks, config.max_tasks as usize)?;
graph.status = GraphStatus::Running;
for task in &mut graph.tasks {
if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
task.status = TaskStatus::Ready;
}
}
let (event_tx, event_rx) = mpsc::channel(64);
let task_timeout = if config.task_timeout_secs > 0 {
Duration::from_secs(config.task_timeout_secs)
} else {
Duration::from_secs(600)
};
Ok(Self {
graph,
max_parallel: config.max_parallel as usize,
running: HashMap::new(),
event_rx,
event_tx,
task_timeout,
router,
available_agents,
dependency_context_budget: config.dependency_context_budget,
buffered_events: VecDeque::new(),
sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
consecutive_spawn_failures: 0,
})
}
pub fn resume_from(
mut graph: TaskGraph,
config: &OrchestrationConfig,
router: Box<dyn AgentRouter>,
available_agents: Vec<SubAgentDef>,
) -> Result<Self, OrchestrationError> {
if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
return Err(OrchestrationError::InvalidGraph(format!(
"cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
graph.status
)));
}
graph.status = GraphStatus::Running;
let running: HashMap<TaskId, RunningTask> = graph
.tasks
.iter()
.filter(|t| t.status == TaskStatus::Running)
.filter_map(|t| {
let handle_id = t.assigned_agent.clone()?;
let def_name = t.agent_hint.clone().unwrap_or_default();
Some((
t.id,
RunningTask {
agent_handle_id: handle_id,
agent_def_name: def_name,
started_at: Instant::now(),
},
))
})
.collect();
let (event_tx, event_rx) = mpsc::channel(64);
let task_timeout = if config.task_timeout_secs > 0 {
Duration::from_secs(config.task_timeout_secs)
} else {
Duration::from_secs(600)
};
Ok(Self {
graph,
max_parallel: config.max_parallel as usize,
running,
event_rx,
event_tx,
task_timeout,
router,
available_agents,
dependency_context_budget: config.dependency_context_budget,
buffered_events: VecDeque::new(),
sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
deferral_backoff: Duration::from_millis(config.deferral_backoff_ms),
consecutive_spawn_failures: 0,
})
}
#[must_use]
pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
self.event_tx.clone()
}
#[must_use]
pub fn graph(&self) -> &TaskGraph {
&self.graph
}
#[must_use]
pub fn into_graph(&self) -> TaskGraph {
self.graph.clone()
}
}
impl Drop for DagScheduler {
fn drop(&mut self) {
if !self.running.is_empty() {
tracing::warn!(
running_tasks = self.running.len(),
"DagScheduler dropped with running tasks; agents may continue until their \
CancellationToken fires or they complete naturally"
);
}
}
}
impl DagScheduler {
pub fn tick(&mut self) -> Vec<SchedulerAction> {
if self.graph.status != GraphStatus::Running {
return vec![SchedulerAction::Done {
status: self.graph.status,
}];
}
let mut actions = Vec::new();
while let Some(event) = self.buffered_events.pop_front() {
let cancel_actions = self.process_event(event);
actions.extend(cancel_actions);
}
while let Ok(event) = self.event_rx.try_recv() {
let cancel_actions = self.process_event(event);
actions.extend(cancel_actions);
}
if self.graph.status != GraphStatus::Running {
return actions;
}
let timeout_actions = self.check_timeouts();
actions.extend(timeout_actions);
if self.graph.status != GraphStatus::Running {
return actions;
}
let ready = dag::ready_tasks(&self.graph);
for task_id in ready {
let task = &self.graph.tasks[task_id.index()];
let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
tracing::debug!(
task_id = %task_id,
title = %task.title,
"no agent available, routing task to main agent inline"
);
let prompt = self.build_task_prompt(task);
self.graph.tasks[task_id.index()].status = TaskStatus::Running;
actions.push(SchedulerAction::RunInline { task_id, prompt });
continue;
};
let prompt = self.build_task_prompt(task);
self.graph.tasks[task_id.index()].status = TaskStatus::Running;
actions.push(SchedulerAction::Spawn {
task_id,
agent_def_name,
prompt,
});
}
let running_in_graph_now = self
.graph
.tasks
.iter()
.filter(|t| t.status == TaskStatus::Running)
.count();
if running_in_graph_now == 0 && self.running.is_empty() {
let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
if all_terminal {
self.graph.status = GraphStatus::Completed;
self.graph.finished_at = Some(super::graph::chrono_now());
actions.push(SchedulerAction::Done {
status: GraphStatus::Completed,
});
} else if dag::ready_tasks(&self.graph).is_empty() {
tracing::error!(
"scheduler deadlock: no running or ready tasks, but graph not complete"
);
self.graph.status = GraphStatus::Failed;
self.graph.finished_at = Some(super::graph::chrono_now());
actions.push(SchedulerAction::Done {
status: GraphStatus::Failed,
});
}
}
actions
}
fn current_deferral_backoff(&self) -> Duration {
const MAX_BACKOFF: Duration = Duration::from_secs(5);
let multiplier = 1u32
.checked_shl(self.consecutive_spawn_failures.min(10))
.unwrap_or(u32::MAX);
self.deferral_backoff
.saturating_mul(multiplier)
.min(MAX_BACKOFF)
}
pub async fn wait_event(&mut self) {
if self.running.is_empty() {
tokio::time::sleep(self.current_deferral_backoff()).await;
return;
}
let nearest_timeout = self
.running
.values()
.map(|r| {
self.task_timeout
.checked_sub(r.started_at.elapsed())
.unwrap_or(Duration::ZERO)
})
.min()
.unwrap_or(Duration::from_secs(1));
let wait_duration = nearest_timeout.max(Duration::from_millis(100));
tokio::select! {
Some(event) = self.event_rx.recv() => {
if self.buffered_events.len() >= self.graph.tasks.len() * 2 {
if let Some(dropped) = self.buffered_events.pop_front() {
tracing::error!(
task_id = %dropped.task_id,
buffer_len = self.buffered_events.len(),
"event buffer saturated; completion event dropped — task may \
remain Running until timeout"
);
}
}
self.buffered_events.push_back(event);
}
() = tokio::time::sleep(wait_duration) => {}
}
}
pub fn record_spawn(
&mut self,
task_id: TaskId,
agent_handle_id: String,
agent_def_name: String,
) {
self.consecutive_spawn_failures = 0;
self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
self.running.insert(
task_id,
RunningTask {
agent_handle_id,
agent_def_name,
started_at: Instant::now(),
},
);
}
pub fn record_spawn_failure(
&mut self,
task_id: TaskId,
error: &SubAgentError,
) -> Vec<SchedulerAction> {
if let SubAgentError::ConcurrencyLimit { active, max } = error {
tracing::warn!(
task_id = %task_id,
active,
max,
next_backoff_ms = self.current_deferral_backoff().as_millis(),
"concurrency limit reached, deferring task to next tick"
);
self.graph.tasks[task_id.index()].status = TaskStatus::Ready;
return Vec::new();
}
let error_excerpt: String = error.to_string().chars().take(512).collect();
tracing::warn!(
task_id = %task_id,
error = %error_excerpt,
"spawn failed, marking task failed"
);
self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
let mut actions = Vec::new();
for cancel_task_id in cancel_ids {
if let Some(running) = self.running.remove(&cancel_task_id) {
actions.push(SchedulerAction::Cancel {
agent_handle_id: running.agent_handle_id,
});
}
}
if self.graph.status != GraphStatus::Running {
self.graph.finished_at = Some(super::graph::chrono_now());
actions.push(SchedulerAction::Done {
status: self.graph.status,
});
}
actions
}
pub fn record_batch_backoff(&mut self, any_success: bool, any_concurrency_failure: bool) {
if any_success {
self.consecutive_spawn_failures = 0;
} else if any_concurrency_failure {
self.consecutive_spawn_failures = self.consecutive_spawn_failures.saturating_add(1);
}
}
pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
self.graph.status = GraphStatus::Canceled;
self.graph.finished_at = Some(super::graph::chrono_now());
let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
let mut actions: Vec<SchedulerAction> = running
.into_iter()
.map(|(task_id, r)| {
self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
SchedulerAction::Cancel {
agent_handle_id: r.agent_handle_id,
}
})
.collect();
for task in &mut self.graph.tasks {
if !task.status.is_terminal() {
task.status = TaskStatus::Canceled;
}
}
actions.push(SchedulerAction::Done {
status: GraphStatus::Canceled,
});
actions
}
}
impl DagScheduler {
fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
let TaskEvent {
task_id,
agent_handle_id,
outcome,
} = event;
match self.running.get(&task_id) {
Some(running) if running.agent_handle_id != agent_handle_id => {
tracing::warn!(
task_id = %task_id,
expected = %running.agent_handle_id,
got = %agent_handle_id,
"discarding stale event from previous agent incarnation"
);
return Vec::new();
}
None => {
tracing::debug!(
task_id = %task_id,
agent_handle_id = %agent_handle_id,
"ignoring event for task not in running map"
);
return Vec::new();
}
Some(_) => {}
}
let duration_ms = self.running.get(&task_id).map_or(0, |r| {
u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
});
let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
self.running.remove(&task_id);
match outcome {
TaskOutcome::Completed { output, artifacts } => {
self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
self.graph.tasks[task_id.index()].result = Some(TaskResult {
output,
artifacts,
duration_ms,
agent_id: Some(agent_handle_id),
agent_def: agent_def_name,
});
let newly_ready = dag::ready_tasks(&self.graph);
for ready_id in newly_ready {
if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
}
}
Vec::new()
}
TaskOutcome::Failed { error } => {
let error_excerpt: String = error.chars().take(512).collect();
tracing::warn!(
task_id = %task_id,
error = %error_excerpt,
"task failed"
);
self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
let mut actions = Vec::new();
for cancel_task_id in cancel_ids {
if let Some(running) = self.running.remove(&cancel_task_id) {
actions.push(SchedulerAction::Cancel {
agent_handle_id: running.agent_handle_id,
});
}
}
if self.graph.status != GraphStatus::Running {
self.graph.finished_at = Some(super::graph::chrono_now());
actions.push(SchedulerAction::Done {
status: self.graph.status,
});
}
actions
}
}
}
fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
let timed_out: Vec<(TaskId, String)> = self
.running
.iter()
.filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
.map(|(id, r)| (*id, r.agent_handle_id.clone()))
.collect();
let mut actions = Vec::new();
for (task_id, agent_handle_id) in timed_out {
tracing::warn!(
task_id = %task_id,
timeout_secs = self.task_timeout.as_secs(),
"task timed out"
);
self.running.remove(&task_id);
self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
actions.push(SchedulerAction::Cancel { agent_handle_id });
let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
for cancel_task_id in cancel_ids {
if let Some(running) = self.running.remove(&cancel_task_id) {
actions.push(SchedulerAction::Cancel {
agent_handle_id: running.agent_handle_id,
});
}
}
if self.graph.status != GraphStatus::Running {
self.graph.finished_at = Some(super::graph::chrono_now());
actions.push(SchedulerAction::Done {
status: self.graph.status,
});
break;
}
}
actions
}
fn build_task_prompt(&self, task: &TaskNode) -> String {
if task.depends_on.is_empty() {
return task.description.clone();
}
let completed_deps: Vec<&TaskNode> = task
.depends_on
.iter()
.filter_map(|dep_id| {
let dep = &self.graph.tasks[dep_id.index()];
if dep.status == TaskStatus::Completed {
Some(dep)
} else {
None
}
})
.collect();
if completed_deps.is_empty() {
return task.description.clone();
}
let budget_per_dep = self
.dependency_context_budget
.checked_div(completed_deps.len())
.unwrap_or(self.dependency_context_budget);
let mut context_block = String::from("<completed-dependencies>\n");
for dep in &completed_deps {
let escaped_id = xml_escape(&dep.id.to_string());
let escaped_title = xml_escape(&dep.title);
let _ = writeln!(
context_block,
"## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
);
if let Some(ref result) = dep.result {
let source = ContentSource::new(ContentSourceKind::A2aMessage);
let sanitized = self.sanitizer.sanitize(&result.output, source);
let safe_output = sanitized.body;
let char_count = safe_output.chars().count();
if char_count > budget_per_dep {
let truncated: String = safe_output.chars().take(budget_per_dep).collect();
let _ = write!(
context_block,
"{truncated}...\n[truncated: {char_count} chars total]"
);
} else {
context_block.push_str(&safe_output);
}
} else {
context_block.push_str("[no output recorded]\n");
}
context_block.push('\n');
}
for dep_id in &task.depends_on {
let dep = &self.graph.tasks[dep_id.index()];
if dep.status == TaskStatus::Skipped {
let escaped_id = xml_escape(&dep.id.to_string());
let escaped_title = xml_escape(&dep.title);
let _ = writeln!(
context_block,
"## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
);
}
}
context_block.push_str("</completed-dependencies>\n\n");
format!("{context_block}Your task: {}", task.description)
}
}
fn xml_escape(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'<' => out.push_str("<"),
'>' => out.push_str(">"),
'&' => out.push_str("&"),
'"' => out.push_str("""),
'\'' => out.push_str("'"),
other => out.push(other),
}
}
out
}
#[cfg(test)]
mod tests {
#![allow(clippy::default_trait_access)]
use super::*;
use crate::orchestration::graph::{
FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus,
};
fn make_node(id: u32, deps: &[u32]) -> TaskNode {
let mut n = TaskNode::new(
id,
format!("task-{id}"),
format!("description for task {id}"),
);
n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
n
}
fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
let mut g = TaskGraph::new("test goal");
g.tasks = nodes;
g
}
fn make_def(name: &str) -> SubAgentDef {
use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
SubAgentDef {
name: name.to_string(),
description: format!("{name} agent"),
model: None,
tools: ToolPolicy::InheritAll,
disallowed_tools: vec![],
permissions: SubAgentPermissions::default(),
skills: SkillFilter::default(),
system_prompt: String::new(),
hooks: crate::subagent::SubagentHooks::default(),
memory: None,
source: None,
file_path: None,
}
}
fn make_config() -> crate::config::OrchestrationConfig {
crate::config::OrchestrationConfig {
enabled: true,
max_tasks: 20,
max_parallel: 4,
default_failure_strategy: "abort".to_string(),
default_max_retries: 3,
task_timeout_secs: 300,
planner_model: None,
planner_max_tokens: 4096,
dependency_context_budget: 16384,
confirm_before_execute: true,
aggregator_max_tokens: 4096,
deferral_backoff_ms: 250,
}
}
struct FirstRouter;
impl AgentRouter for FirstRouter {
fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
available.first().map(|d| d.name.clone())
}
}
struct NoneRouter;
impl AgentRouter for NoneRouter {
fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
None
}
}
fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
let config = make_config();
let defs = vec![make_def("worker")];
DagScheduler::new(graph, &config, router, defs).unwrap()
}
fn make_scheduler(graph: TaskGraph) -> DagScheduler {
let config = make_config();
let defs = vec![make_def("worker")];
DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
}
#[test]
fn test_new_validates_graph_status() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Running; let config = make_config();
let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_new_marks_roots_ready() {
let graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[0, 1]),
]);
let scheduler = make_scheduler(graph);
assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
assert_eq!(scheduler.graph().status, GraphStatus::Running);
}
#[test]
fn test_new_validates_empty_graph() {
let graph = graph_from_nodes(vec![]);
let config = make_config();
let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
assert!(result.is_err());
}
#[test]
fn test_tick_produces_spawn_for_ready() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
let mut scheduler = make_scheduler(graph);
let actions = scheduler.tick();
let spawns: Vec<_> = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.collect();
assert_eq!(spawns.len(), 2);
}
#[test]
fn test_tick_dispatches_all_regardless_of_max_parallel() {
let graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[]),
make_node(3, &[]),
make_node(4, &[]),
]);
let mut config = make_config();
config.max_parallel = 2;
let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
let actions = scheduler.tick();
let spawn_count = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count();
assert_eq!(spawn_count, 5, "all 5 ready tasks must be dispatched");
}
#[test]
fn test_tick_detects_completion() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.tasks[0].status = TaskStatus::Completed;
let config = make_config();
let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
let actions = scheduler.tick();
let has_done = actions.iter().any(|a| {
matches!(
a,
SchedulerAction::Done {
status: GraphStatus::Completed
}
)
});
assert!(
has_done,
"should emit Done(Completed) when all tasks are terminal"
);
}
#[test]
fn test_completion_event_marks_deps_ready() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "handle-0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "handle-0".to_string(),
outcome: TaskOutcome::Completed {
output: "done".to_string(),
artifacts: vec![],
},
};
scheduler.buffered_events.push_back(event);
let actions = scheduler.tick();
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
let has_spawn_1 = actions
.iter()
.any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
assert!(
has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
"task 1 should be spawned or marked Ready"
);
}
#[test]
fn test_failure_abort_cancels_running() {
let graph = graph_from_nodes(vec![
make_node(0, &[]),
make_node(1, &[]),
make_node(2, &[0, 1]),
]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
scheduler.graph.tasks[1].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(1),
RunningTask {
agent_handle_id: "h1".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Failed {
error: "boom".to_string(),
},
};
scheduler.buffered_events.push_back(event);
let actions = scheduler.tick();
assert_eq!(scheduler.graph.status, GraphStatus::Failed);
let cancel_ids: Vec<_> = actions
.iter()
.filter_map(|a| {
if let SchedulerAction::Cancel { agent_handle_id } = a {
Some(agent_handle_id.as_str())
} else {
None
}
})
.collect();
assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
assert!(
actions
.iter()
.any(|a| matches!(a, SchedulerAction::Done { .. }))
);
}
#[test]
fn test_failure_skip_propagates() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Failed {
error: "skip me".to_string(),
},
};
scheduler.buffered_events.push_back(event);
scheduler.tick();
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
}
#[test]
fn test_failure_retry_reschedules() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
scheduler.graph.tasks[0].max_retries = Some(3);
scheduler.graph.tasks[0].retry_count = 0;
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Failed {
error: "transient".to_string(),
},
};
scheduler.buffered_events.push_back(event);
let actions = scheduler.tick();
let has_spawn = actions
.iter()
.any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
assert!(
has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
"retry should produce spawn or Ready status"
);
assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
}
#[test]
fn test_process_event_failed_retry() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
scheduler.graph.tasks[0].max_retries = Some(2);
scheduler.graph.tasks[0].retry_count = 0;
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Failed {
error: "first failure".to_string(),
},
};
scheduler.buffered_events.push_back(event);
let actions = scheduler.tick();
assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
let spawned = actions
.iter()
.any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
assert!(
spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
"retry should emit Spawn or set Ready"
);
assert_eq!(scheduler.graph.status, GraphStatus::Running);
}
#[test]
fn test_timeout_cancels_stalled() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut config = make_config();
config.task_timeout_secs = 1; let defs = vec![make_def("worker")];
let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now().checked_sub(Duration::from_secs(2)).unwrap(), },
);
let actions = scheduler.tick();
let has_cancel = actions.iter().any(
|a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
);
assert!(has_cancel, "timed-out task should emit Cancel action");
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
}
#[test]
fn test_cancel_all() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
scheduler.graph.tasks[1].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(1),
RunningTask {
agent_handle_id: "h1".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let actions = scheduler.cancel_all();
assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
assert!(scheduler.running.is_empty());
let cancel_count = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
.count();
assert_eq!(cancel_count, 2);
assert!(actions.iter().any(|a| matches!(
a,
SchedulerAction::Done {
status: GraphStatus::Canceled
}
)));
}
#[test]
fn test_record_spawn_failure() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::Spawn("spawn error".to_string());
let actions = scheduler.record_spawn_failure(TaskId(0), &error);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
assert_eq!(scheduler.graph.status, GraphStatus::Failed);
assert!(
actions
.iter()
.any(|a| matches!(a, SchedulerAction::Done { .. }))
);
}
#[test]
fn test_record_spawn_failure_concurrency_limit_reverts_to_ready() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
let actions = scheduler.record_spawn_failure(TaskId(0), &error);
assert_eq!(
scheduler.graph.tasks[0].status,
TaskStatus::Ready,
"task must revert to Ready so the next tick can retry"
);
assert_eq!(
scheduler.graph.status,
GraphStatus::Running,
"graph must stay Running, not transition to Failed"
);
assert!(
actions.is_empty(),
"no cancel or done actions expected for a transient deferral"
);
}
#[test]
fn test_record_spawn_failure_concurrency_limit_variant_spawn_for_task() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
let actions = scheduler.record_spawn_failure(TaskId(0), &error);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
assert!(actions.is_empty());
}
#[test]
fn test_concurrency_deferral_does_not_affect_running_task() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
scheduler.graph.tasks[1].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
let actions = scheduler.record_spawn_failure(TaskId(1), &error);
assert_eq!(
scheduler.graph.tasks[0].status,
TaskStatus::Running,
"task 0 must remain Running"
);
assert_eq!(
scheduler.graph.tasks[1].status,
TaskStatus::Ready,
"task 1 must revert to Ready"
);
assert_eq!(
scheduler.graph.status,
GraphStatus::Running,
"graph must stay Running"
);
assert!(actions.is_empty(), "no cancel or done actions expected");
}
#[test]
fn test_max_concurrent_zero_no_infinite_loop() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = crate::config::OrchestrationConfig {
max_parallel: 0,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let actions1 = scheduler.tick();
assert!(
actions1
.iter()
.any(|a| matches!(a, SchedulerAction::Spawn { .. })),
"Spawn expected — parallel dispatch ignores max_parallel cap in tick()"
);
assert!(
actions1
.iter()
.all(|a| !matches!(a, SchedulerAction::Done { .. })),
"no Done(Failed) expected — ready tasks exist, so no deadlock"
);
assert_eq!(scheduler.graph.status, GraphStatus::Running);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 0, max: 0 };
let extra = scheduler.record_spawn_failure(TaskId(0), &error);
assert!(
extra.is_empty(),
"ConcurrencyLimit must not produce cancel/done actions"
);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
let actions2 = scheduler.tick();
assert!(
actions2
.iter()
.all(|a| !matches!(a, SchedulerAction::Done { .. })),
"second tick must not emit Done(Failed)"
);
assert_eq!(
scheduler.graph.status,
GraphStatus::Running,
"graph must remain Running after two ticks"
);
}
#[test]
fn test_all_tasks_deferred_graph_stays_running() {
let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
let mut scheduler = make_scheduler(graph);
let actions = scheduler.tick();
assert_eq!(
actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count(),
2,
"expected 2 Spawn actions on first tick"
);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Running);
let error = SubAgentError::ConcurrencyLimit { active: 2, max: 2 };
let r0 = scheduler.record_spawn_failure(TaskId(0), &error);
let r1 = scheduler.record_spawn_failure(TaskId(1), &error);
assert!(r0.is_empty() && r1.is_empty(), "no cancel/done on deferral");
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Ready);
assert_eq!(scheduler.graph.status, GraphStatus::Running);
let retry_actions = scheduler.tick();
let spawn_count = retry_actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count();
assert!(
spawn_count > 0,
"second tick must re-emit Spawn for deferred tasks"
);
assert!(
retry_actions.iter().all(|a| !matches!(
a,
SchedulerAction::Done {
status: GraphStatus::Failed,
..
}
)),
"no Done(Failed) expected"
);
}
#[test]
fn test_build_prompt_no_deps() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let scheduler = make_scheduler(graph);
let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
assert_eq!(prompt, "description for task 0");
}
#[test]
fn test_build_prompt_with_deps_and_truncation() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
graph.tasks[0].result = Some(TaskResult {
output: "x".repeat(200),
artifacts: vec![],
duration_ms: 10,
agent_id: None,
agent_def: None,
});
let config = crate::config::OrchestrationConfig {
dependency_context_budget: 50,
..make_config()
};
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
assert!(prompt.contains("<completed-dependencies>"));
assert!(prompt.contains("[truncated:"));
assert!(prompt.contains("Your task:"));
}
#[test]
fn test_duration_ms_computed_correctly() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "h0".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now()
.checked_sub(Duration::from_millis(50))
.unwrap(),
},
);
let event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "h0".to_string(),
outcome: TaskOutcome::Completed {
output: "result".to_string(),
artifacts: vec![],
},
};
scheduler.buffered_events.push_back(event);
scheduler.tick();
let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
assert!(
result.duration_ms > 0,
"duration_ms should be > 0, got {}",
result.duration_ms
);
}
#[test]
fn test_utf8_safe_truncation() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
let unicode_output = "日本語テスト".repeat(100);
graph.tasks[0].result = Some(TaskResult {
output: unicode_output,
artifacts: vec![],
duration_ms: 10,
agent_id: None,
agent_def: None,
});
let config = crate::config::OrchestrationConfig {
dependency_context_budget: 500,
..make_config()
};
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
assert!(
prompt.contains("日"),
"Japanese characters should be in the prompt after safe truncation"
);
}
#[test]
fn test_no_agent_routes_inline() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
let actions = scheduler.tick();
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Running);
assert!(
actions
.iter()
.any(|a| matches!(a, SchedulerAction::RunInline { .. }))
);
}
#[test]
fn test_stale_event_rejected() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.running.insert(
TaskId(0),
RunningTask {
agent_handle_id: "current-handle".to_string(),
agent_def_name: "worker".to_string(),
started_at: Instant::now(),
},
);
let stale_event = TaskEvent {
task_id: TaskId(0),
agent_handle_id: "old-handle".to_string(),
outcome: TaskOutcome::Completed {
output: "stale output".to_string(),
artifacts: vec![],
},
};
scheduler.buffered_events.push_back(stale_event);
let actions = scheduler.tick();
assert_ne!(
scheduler.graph.tasks[0].status,
TaskStatus::Completed,
"stale event must not complete the task"
);
let has_done = actions
.iter()
.any(|a| matches!(a, SchedulerAction::Done { .. }));
assert!(
!has_done,
"no Done action should be emitted for a stale event"
);
assert!(
scheduler.running.contains_key(&TaskId(0)),
"running task must remain after stale event"
);
}
#[test]
fn test_build_prompt_chars_count_in_truncation_message() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.tasks[0].status = TaskStatus::Completed;
let output = "x".repeat(200);
graph.tasks[0].result = Some(TaskResult {
output,
artifacts: vec![],
duration_ms: 10,
agent_id: None,
agent_def: None,
});
let config = crate::config::OrchestrationConfig {
dependency_context_budget: 10, ..make_config()
};
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
assert!(
prompt.contains("chars total"),
"truncation message must use 'chars total' label. Prompt: {prompt}"
);
assert!(
prompt.contains("[truncated:"),
"prompt must contain truncation notice. Prompt: {prompt}"
);
}
#[test]
fn test_resume_from_accepts_paused_graph() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Paused;
graph.tasks[0].status = TaskStatus::Pending;
let scheduler =
DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.expect("resume_from should accept Paused graph");
assert_eq!(scheduler.graph.status, GraphStatus::Running);
}
#[test]
fn test_resume_from_accepts_failed_graph() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Failed;
graph.tasks[0].status = TaskStatus::Failed;
let scheduler =
DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.expect("resume_from should accept Failed graph");
assert_eq!(scheduler.graph.status, GraphStatus::Running);
}
#[test]
fn test_resume_from_rejects_completed_graph() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Completed;
let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_resume_from_rejects_canceled_graph() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Canceled;
let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.unwrap_err();
assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
}
#[test]
fn test_resume_from_reconstructs_running_tasks() {
let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
graph.status = GraphStatus::Paused;
graph.tasks[0].status = TaskStatus::Running;
graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
graph.tasks[0].agent_hint = Some("worker".to_string());
graph.tasks[1].status = TaskStatus::Pending;
let scheduler =
DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.expect("should succeed");
assert!(
scheduler.running.contains_key(&TaskId(0)),
"Running task must be reconstructed in the running map (IC1)"
);
assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
assert!(
!scheduler.running.contains_key(&TaskId(1)),
"Pending task must not appear in running map"
);
}
#[test]
fn test_resume_from_sets_status_running() {
let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
graph.status = GraphStatus::Paused;
let scheduler =
DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
.unwrap();
assert_eq!(scheduler.graph.status, GraphStatus::Running);
}
#[test]
fn test_consecutive_spawn_failures_increments_on_concurrency_limit() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
assert_eq!(scheduler.consecutive_spawn_failures, 0, "starts at zero");
let error = SubAgentError::ConcurrencyLimit { active: 4, max: 4 };
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 1,
"first deferral tick: consecutive_spawn_failures must be 1"
);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 2,
"second deferral tick: consecutive_spawn_failures must be 2"
);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 3,
"third deferral tick: consecutive_spawn_failures must be 3"
);
}
#[test]
fn test_consecutive_spawn_failures_resets_on_success() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.record_spawn_failure(TaskId(0), &error);
scheduler.record_batch_backoff(false, true);
assert_eq!(scheduler.consecutive_spawn_failures, 2);
scheduler.record_spawn(TaskId(0), "handle-0".to_string(), "worker".to_string());
assert_eq!(
scheduler.consecutive_spawn_failures, 0,
"record_spawn must reset consecutive_spawn_failures to 0"
);
}
#[tokio::test]
async fn test_exponential_backoff_duration() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = crate::config::OrchestrationConfig {
deferral_backoff_ms: 50,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(scheduler.consecutive_spawn_failures, 0);
let start = tokio::time::Instant::now();
scheduler.wait_event().await;
let elapsed0 = start.elapsed();
assert!(
elapsed0.as_millis() >= 50,
"backoff with 0 deferrals must be >= base (50ms), got {}ms",
elapsed0.as_millis()
);
scheduler.consecutive_spawn_failures = 3;
let start = tokio::time::Instant::now();
scheduler.wait_event().await;
let elapsed3 = start.elapsed();
assert!(
elapsed3.as_millis() >= 400,
"backoff with 3 deferrals must be >= 400ms (50 * 8), got {}ms",
elapsed3.as_millis()
);
scheduler.consecutive_spawn_failures = 20;
let start = tokio::time::Instant::now();
scheduler.wait_event().await;
let elapsed20 = start.elapsed();
assert!(
elapsed20.as_millis() >= 5000,
"backoff must be capped at 5000ms with high deferrals, got {}ms",
elapsed20.as_millis()
);
}
#[tokio::test]
async fn test_wait_event_sleeps_deferral_backoff_when_running_empty() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = crate::config::OrchestrationConfig {
deferral_backoff_ms: 50,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert!(scheduler.running.is_empty());
let start = tokio::time::Instant::now();
scheduler.wait_event().await;
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() >= 50,
"wait_event must sleep at least deferral_backoff (50ms) when running is empty, but only slept {}ms",
elapsed.as_millis()
);
}
#[test]
fn test_current_deferral_backoff_exponential_growth() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let config = crate::config::OrchestrationConfig {
deferral_backoff_ms: 250,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(
scheduler.current_deferral_backoff(),
Duration::from_millis(250)
);
scheduler.consecutive_spawn_failures = 1;
assert_eq!(
scheduler.current_deferral_backoff(),
Duration::from_millis(500)
);
scheduler.consecutive_spawn_failures = 2;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(1));
scheduler.consecutive_spawn_failures = 3;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(2));
scheduler.consecutive_spawn_failures = 4;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(4));
scheduler.consecutive_spawn_failures = 5;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
scheduler.consecutive_spawn_failures = 100;
assert_eq!(scheduler.current_deferral_backoff(), Duration::from_secs(5));
}
#[test]
fn test_record_spawn_resets_consecutive_failures() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = DagScheduler::new(
graph,
&make_config(),
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
scheduler.consecutive_spawn_failures = 3;
let task_id = TaskId(0);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.record_spawn(task_id, "handle-1".into(), "worker".into());
assert_eq!(scheduler.consecutive_spawn_failures, 0);
}
#[test]
fn test_record_spawn_failure_reverts_to_ready_no_counter_change() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = DagScheduler::new(
graph,
&make_config(),
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(scheduler.consecutive_spawn_failures, 0);
let task_id = TaskId(0);
scheduler.graph.tasks[0].status = TaskStatus::Running;
let error = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
scheduler.record_spawn_failure(task_id, &error);
assert_eq!(scheduler.consecutive_spawn_failures, 0);
assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Ready);
}
#[test]
fn test_parallel_dispatch_all_ready() {
let nodes: Vec<_> = (0..6).map(|i| make_node(i, &[])).collect();
let graph = graph_from_nodes(nodes);
let config = crate::config::OrchestrationConfig {
max_parallel: 2,
..make_config()
};
let mut scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
let actions = scheduler.tick();
let spawn_count = actions
.iter()
.filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
.count();
assert_eq!(spawn_count, 6, "all 6 ready tasks must be dispatched");
let running_count = scheduler
.graph
.tasks
.iter()
.filter(|t| t.status == TaskStatus::Running)
.count();
assert_eq!(running_count, 6, "all 6 tasks must be marked Running");
}
#[test]
fn test_batch_backoff_partial_success() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.consecutive_spawn_failures = 3;
scheduler.record_batch_backoff(true, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 0,
"any success in batch must reset counter"
);
}
#[test]
fn test_batch_backoff_all_failed() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.consecutive_spawn_failures = 2;
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 3,
"all-failure tick must increment counter"
);
}
#[test]
fn test_batch_backoff_no_spawns() {
let graph = graph_from_nodes(vec![make_node(0, &[])]);
let mut scheduler = make_scheduler(graph);
scheduler.consecutive_spawn_failures = 5;
scheduler.record_batch_backoff(false, false);
assert_eq!(
scheduler.consecutive_spawn_failures, 5,
"no spawns must not change counter"
);
}
#[test]
fn test_buffer_guard_uses_task_count() {
let nodes: Vec<_> = (0..10).map(|i| make_node(i, &[])).collect();
let graph = graph_from_nodes(nodes);
let config = crate::config::OrchestrationConfig {
max_parallel: 2, ..make_config()
};
let scheduler = DagScheduler::new(
graph,
&config,
Box::new(FirstRouter),
vec![make_def("worker")],
)
.unwrap();
assert_eq!(scheduler.graph.tasks.len() * 2, 20);
assert_eq!(scheduler.max_parallel * 2, 4);
}
#[test]
fn test_batch_mixed_concurrency_and_fatal_failure() {
let mut nodes = vec![make_node(0, &[]), make_node(1, &[])];
nodes[1].failure_strategy = Some(FailureStrategy::Skip);
let graph = graph_from_nodes(nodes);
let mut scheduler = make_scheduler(graph);
scheduler.graph.tasks[0].status = TaskStatus::Running;
scheduler.graph.tasks[1].status = TaskStatus::Running;
let concurrency_err = SubAgentError::ConcurrencyLimit { active: 1, max: 1 };
let actions0 = scheduler.record_spawn_failure(TaskId(0), &concurrency_err);
assert!(
actions0.is_empty(),
"ConcurrencyLimit must produce no extra actions"
);
assert_eq!(
scheduler.graph.tasks[0].status,
TaskStatus::Ready,
"task 0 must revert to Ready"
);
let fatal_err = SubAgentError::Spawn("provider unavailable".to_string());
let actions1 = scheduler.record_spawn_failure(TaskId(1), &fatal_err);
assert_eq!(
scheduler.graph.tasks[1].status,
TaskStatus::Skipped,
"task 1: Skip strategy turns Failed into Skipped via propagate_failure"
);
assert!(
actions1
.iter()
.all(|a| !matches!(a, SchedulerAction::Done { .. })),
"no Done action expected: task 0 is still Ready"
);
scheduler.consecutive_spawn_failures = 0;
scheduler.record_batch_backoff(false, true);
assert_eq!(
scheduler.consecutive_spawn_failures, 1,
"batch with only ConcurrencyLimit must increment counter"
);
}
}