Skip to main content

zeph_core/orchestration/
scheduler.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! DAG execution scheduler: drives task graph execution by emitting `SchedulerAction` commands.
5
6use std::collections::{HashMap, VecDeque};
7use std::fmt::Write as _;
8use std::path::PathBuf;
9use std::time::{Duration, Instant};
10
11use tokio::sync::mpsc;
12
13use super::dag;
14use super::error::OrchestrationError;
15use super::graph::{GraphStatus, TaskGraph, TaskId, TaskNode, TaskResult, TaskStatus};
16use super::router::AgentRouter;
17use crate::config::OrchestrationConfig;
18use crate::sanitizer::{
19    ContentIsolationConfig, ContentSanitizer, ContentSource, ContentSourceKind,
20};
21use crate::subagent::SubAgentDef;
22
23/// Actions the scheduler requests the caller to perform.
24///
25/// The scheduler never holds `&mut SubAgentManager` — it produces these
26/// actions for the caller to execute (ADR-026 command pattern).
27#[derive(Debug)]
28pub enum SchedulerAction {
29    /// Spawn a sub-agent for a task.
30    Spawn {
31        task_id: TaskId,
32        agent_def_name: String,
33        prompt: String,
34    },
35    /// Cancel a running sub-agent (on graph abort/skip).
36    Cancel { agent_handle_id: String },
37    /// Graph reached a terminal or paused state.
38    Done { status: GraphStatus },
39}
40
41/// Event sent by a sub-agent loop when it terminates.
42#[derive(Debug)]
43pub struct TaskEvent {
44    pub task_id: TaskId,
45    pub agent_handle_id: String,
46    pub outcome: TaskOutcome,
47}
48
49/// Outcome of a sub-agent execution.
50#[derive(Debug)]
51pub enum TaskOutcome {
52    /// Agent completed successfully.
53    Completed {
54        output: String,
55        artifacts: Vec<PathBuf>,
56    },
57    /// Agent failed.
58    Failed { error: String },
59}
60
61/// Tracks a running task's spawn time and definition name for timeout detection.
62struct RunningTask {
63    agent_handle_id: String,
64    agent_def_name: String,
65    started_at: Instant,
66}
67
68/// DAG execution engine.
69///
70/// Drives task graph execution by producing `SchedulerAction` values
71/// that the caller executes against `SubAgentManager`.
72///
73/// # Caller Loop
74///
75/// ```text
76/// loop {
77///     let actions = scheduler.tick();
78///     for action in actions {
79///         match action {
80///             Spawn { task_id, agent_def_name, prompt } => {
81///                 match manager.spawn_for_task(...) {
82///                     Ok(handle_id) => scheduler.record_spawn(task_id, handle_id),
83///                     Err(e) => { for a in scheduler.record_spawn_failure(task_id, &e.to_string()) { /* exec */ } }
84///                 }
85///             }
86///             Cancel { agent_handle_id } => { manager.cancel(&agent_handle_id); }
87///             Done { .. } => break,
88///         }
89///     }
90///     scheduler.wait_event().await;
91/// }
92/// ```
93pub struct DagScheduler {
94    graph: TaskGraph,
95    max_parallel: usize,
96    /// Maps `TaskId` -> running sub-agent state.
97    running: HashMap<TaskId, RunningTask>,
98    /// Receives completion/failure events from sub-agent loops.
99    event_rx: mpsc::Receiver<TaskEvent>,
100    /// Sender cloned into each spawned sub-agent via `spawn_for_task`.
101    event_tx: mpsc::Sender<TaskEvent>,
102    /// Per-task wall-clock timeout.
103    task_timeout: Duration,
104    /// Router for agent selection.
105    router: Box<dyn AgentRouter>,
106    /// Available agent definitions (cached from `SubAgentManager`).
107    available_agents: Vec<SubAgentDef>,
108    /// Total character budget for cross-task dependency context injection.
109    dependency_context_budget: usize,
110    /// Events buffered by `wait_event` for processing in the next `tick`.
111    buffered_events: VecDeque<TaskEvent>,
112    /// Sanitizer for dependency output injected into task prompts (SEC-ORCH-01).
113    sanitizer: ContentSanitizer,
114}
115
116impl std::fmt::Debug for DagScheduler {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        f.debug_struct("DagScheduler")
119            .field("graph_id", &self.graph.id)
120            .field("graph_status", &self.graph.status)
121            .field("running_count", &self.running.len())
122            .field("max_parallel", &self.max_parallel)
123            .field("task_timeout_secs", &self.task_timeout.as_secs())
124            .finish_non_exhaustive()
125    }
126}
127
128impl DagScheduler {
129    /// Create a new scheduler for the given graph.
130    ///
131    /// The graph must be in `Created` status. The scheduler transitions
132    /// it to `Running` and marks root tasks as `Ready`.
133    ///
134    /// # Errors
135    ///
136    /// Returns `OrchestrationError::InvalidGraph` if the graph is not in
137    /// `Created` status or has no tasks.
138    pub fn new(
139        mut graph: TaskGraph,
140        config: &OrchestrationConfig,
141        router: Box<dyn AgentRouter>,
142        available_agents: Vec<SubAgentDef>,
143    ) -> Result<Self, OrchestrationError> {
144        if graph.status != GraphStatus::Created {
145            return Err(OrchestrationError::InvalidGraph(format!(
146                "graph must be in Created status, got {}",
147                graph.status
148            )));
149        }
150
151        dag::validate(&graph.tasks, config.max_tasks as usize)?;
152
153        graph.status = GraphStatus::Running;
154
155        for task in &mut graph.tasks {
156            if task.depends_on.is_empty() && task.status == TaskStatus::Pending {
157                task.status = TaskStatus::Ready;
158            }
159        }
160
161        let (event_tx, event_rx) = mpsc::channel(64);
162
163        let task_timeout = if config.task_timeout_secs > 0 {
164            Duration::from_secs(config.task_timeout_secs)
165        } else {
166            Duration::from_secs(600)
167        };
168
169        Ok(Self {
170            graph,
171            max_parallel: config.max_parallel as usize,
172            running: HashMap::new(),
173            event_rx,
174            event_tx,
175            task_timeout,
176            router,
177            available_agents,
178            dependency_context_budget: config.dependency_context_budget,
179            buffered_events: VecDeque::new(),
180            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
181        })
182    }
183
184    /// Create a scheduler from a graph that is in `Paused` or `Failed` status.
185    ///
186    /// Used for resume and retry flows. The caller is responsible for calling
187    /// [`dag::reset_for_retry`] (for retry) before passing the graph here.
188    ///
189    /// This constructor sets `graph.status = Running` (II3) and reconstructs
190    /// the `running` map from tasks that are still in `Running` state (IC1), so
191    /// their completion events are not silently dropped on the next tick.
192    ///
193    /// # Errors
194    ///
195    /// Returns `OrchestrationError::InvalidGraph` if the graph is in `Completed`
196    /// or `Canceled` status (terminal states that cannot be resumed).
197    pub fn resume_from(
198        mut graph: TaskGraph,
199        config: &OrchestrationConfig,
200        router: Box<dyn AgentRouter>,
201        available_agents: Vec<SubAgentDef>,
202    ) -> Result<Self, OrchestrationError> {
203        if graph.status == GraphStatus::Completed || graph.status == GraphStatus::Canceled {
204            return Err(OrchestrationError::InvalidGraph(format!(
205                "cannot resume a {} graph; only Paused, Failed, or Running graphs are resumable",
206                graph.status
207            )));
208        }
209
210        // II3: ensure the graph is in Running state so tick() does not immediately
211        // return Done{Paused}.
212        graph.status = GraphStatus::Running;
213
214        // IC1: reconstruct the `running` map from tasks that were still Running at
215        // pause time. Without this their completion events would arrive but
216        // process_event would ignore them (it checks self.running), leaving the
217        // task stuck until timeout.
218        let running: HashMap<TaskId, RunningTask> = graph
219            .tasks
220            .iter()
221            .filter(|t| t.status == TaskStatus::Running)
222            .filter_map(|t| {
223                let handle_id = t.assigned_agent.clone()?;
224                let def_name = t.agent_hint.clone().unwrap_or_default();
225                Some((
226                    t.id,
227                    RunningTask {
228                        agent_handle_id: handle_id,
229                        agent_def_name: def_name,
230                        // Conservative: treat as just-started so timeout window is reset.
231                        started_at: Instant::now(),
232                    },
233                ))
234            })
235            .collect();
236
237        let (event_tx, event_rx) = mpsc::channel(64);
238
239        let task_timeout = if config.task_timeout_secs > 0 {
240            Duration::from_secs(config.task_timeout_secs)
241        } else {
242            Duration::from_secs(600)
243        };
244
245        Ok(Self {
246            graph,
247            max_parallel: config.max_parallel as usize,
248            running,
249            event_rx,
250            event_tx,
251            task_timeout,
252            router,
253            available_agents,
254            dependency_context_budget: config.dependency_context_budget,
255            buffered_events: VecDeque::new(),
256            sanitizer: ContentSanitizer::new(&ContentIsolationConfig::default()),
257        })
258    }
259
260    /// Get a clone of the event sender for injection into sub-agent loops.
261    #[must_use]
262    pub fn event_sender(&self) -> mpsc::Sender<TaskEvent> {
263        self.event_tx.clone()
264    }
265
266    /// Immutable reference to the current graph state.
267    #[must_use]
268    pub fn graph(&self) -> &TaskGraph {
269        &self.graph
270    }
271
272    /// Return the final graph state.
273    ///
274    /// Clones the graph since `Drop` is implemented on the scheduler.
275    #[must_use]
276    pub fn into_graph(&self) -> TaskGraph {
277        self.graph.clone()
278    }
279}
280
281impl Drop for DagScheduler {
282    fn drop(&mut self) {
283        if !self.running.is_empty() {
284            tracing::warn!(
285                running_tasks = self.running.len(),
286                "DagScheduler dropped with running tasks; agents may continue until their \
287                 CancellationToken fires or they complete naturally"
288            );
289        }
290    }
291}
292
293impl DagScheduler {
294    /// Process pending events and produce actions for the caller.
295    ///
296    /// Call `wait_event` after processing all actions to block until the next event.
297    pub fn tick(&mut self) -> Vec<SchedulerAction> {
298        if self.graph.status != GraphStatus::Running {
299            return vec![SchedulerAction::Done {
300                status: self.graph.status,
301            }];
302        }
303
304        let mut actions = Vec::new();
305
306        // Drain events buffered by wait_event, then any new ones in the channel.
307        while let Some(event) = self.buffered_events.pop_front() {
308            let cancel_actions = self.process_event(event);
309            actions.extend(cancel_actions);
310        }
311        while let Ok(event) = self.event_rx.try_recv() {
312            let cancel_actions = self.process_event(event);
313            actions.extend(cancel_actions);
314        }
315
316        if self.graph.status != GraphStatus::Running {
317            return actions;
318        }
319
320        // Check for timed-out tasks.
321        let timeout_actions = self.check_timeouts();
322        actions.extend(timeout_actions);
323
324        if self.graph.status != GraphStatus::Running {
325            return actions;
326        }
327
328        // Find ready tasks and schedule them up to max_parallel.
329        let ready = dag::ready_tasks(&self.graph);
330        // Count tasks that are Running in the graph (includes optimistically-marked ones
331        // that haven't been added to self.running yet via record_spawn). This prevents
332        // the false-deadlock detection from firing while Spawn actions are in-flight.
333        let running_in_graph = self
334            .graph
335            .tasks
336            .iter()
337            .filter(|t| t.status == TaskStatus::Running)
338            .count();
339        let slots_available = self.max_parallel.saturating_sub(running_in_graph);
340
341        for task_id in ready.into_iter().take(slots_available) {
342            let task = &self.graph.tasks[task_id.index()];
343
344            let Some(agent_def_name) = self.router.route(task, &self.available_agents) else {
345                tracing::warn!(
346                    task_id = %task_id,
347                    title = %task.title,
348                    "no agent available for task, marking failed"
349                );
350                self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
351                let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
352                for cancel_task_id in cancel_ids {
353                    if let Some(running) = self.running.remove(&cancel_task_id) {
354                        actions.push(SchedulerAction::Cancel {
355                            agent_handle_id: running.agent_handle_id,
356                        });
357                    }
358                }
359                if self.graph.status != GraphStatus::Running {
360                    self.graph.finished_at = Some(super::graph::chrono_now());
361                    actions.push(SchedulerAction::Done {
362                        status: self.graph.status,
363                    });
364                    return actions;
365                }
366                continue;
367            };
368
369            let prompt = self.build_task_prompt(task);
370
371            // Mark task as Running optimistically (before record_spawn is called).
372            self.graph.tasks[task_id.index()].status = TaskStatus::Running;
373
374            actions.push(SchedulerAction::Spawn {
375                task_id,
376                agent_def_name,
377                prompt,
378            });
379        }
380
381        // Check for completion or deadlock.
382        // Use graph Running status count to avoid false positives while Spawn actions
383        // are in-flight (record_spawn hasn't been called yet for freshly emitted spawns).
384        let running_in_graph_now = self
385            .graph
386            .tasks
387            .iter()
388            .filter(|t| t.status == TaskStatus::Running)
389            .count();
390        if running_in_graph_now == 0 && self.running.is_empty() {
391            let all_terminal = self.graph.tasks.iter().all(|t| t.status.is_terminal());
392            if all_terminal {
393                self.graph.status = GraphStatus::Completed;
394                self.graph.finished_at = Some(super::graph::chrono_now());
395                actions.push(SchedulerAction::Done {
396                    status: GraphStatus::Completed,
397                });
398            } else if dag::ready_tasks(&self.graph).is_empty() {
399                tracing::error!(
400                    "scheduler deadlock: no running or ready tasks, but graph not complete"
401                );
402                self.graph.status = GraphStatus::Failed;
403                self.graph.finished_at = Some(super::graph::chrono_now());
404                actions.push(SchedulerAction::Done {
405                    status: GraphStatus::Failed,
406                });
407            }
408        }
409
410        actions
411    }
412
413    /// Wait for the next event from a running sub-agent.
414    ///
415    /// Buffers the received event for processing in the next `tick` call.
416    /// Returns immediately if no tasks are running. Uses a timeout so that
417    /// periodic timeout checking can occur.
418    pub async fn wait_event(&mut self) {
419        if self.running.is_empty() {
420            return;
421        }
422
423        // Find the nearest timeout deadline among running tasks.
424        let nearest_timeout = self
425            .running
426            .values()
427            .map(|r| {
428                self.task_timeout
429                    .checked_sub(r.started_at.elapsed())
430                    .unwrap_or(Duration::ZERO)
431            })
432            .min()
433            .unwrap_or(Duration::from_secs(1));
434
435        // Clamp to at least 100 ms to avoid busy-looping.
436        let wait_duration = nearest_timeout.max(Duration::from_millis(100));
437
438        tokio::select! {
439            Some(event) = self.event_rx.recv() => {
440                // SEC-ORCH-02: guard against unbounded buffer growth.
441                if self.buffered_events.len() >= self.max_parallel * 2 {
442                    // PERF-SCHED-02: log at error level — a dropped completion event
443                    // leaves a task stuck in Running until its timeout fires.
444                    if let Some(dropped) = self.buffered_events.pop_front() {
445                        tracing::error!(
446                            task_id = %dropped.task_id,
447                            buffer_len = self.buffered_events.len(),
448                            "event buffer saturated; completion event dropped — task may \
449                             remain Running until timeout"
450                        );
451                    }
452                }
453                self.buffered_events.push_back(event);
454            }
455            () = tokio::time::sleep(wait_duration) => {}
456        }
457    }
458
459    /// Record that a spawn action was successfully executed.
460    ///
461    /// Called by the caller after successfully spawning via `SubAgentManager`.
462    pub fn record_spawn(
463        &mut self,
464        task_id: TaskId,
465        agent_handle_id: String,
466        agent_def_name: String,
467    ) {
468        self.graph.tasks[task_id.index()].assigned_agent = Some(agent_handle_id.clone());
469        self.running.insert(
470            task_id,
471            RunningTask {
472                agent_handle_id,
473                agent_def_name,
474                started_at: Instant::now(),
475            },
476        );
477    }
478
479    /// Handle a failed spawn attempt.
480    ///
481    /// Reverts the task from Running to Failed and propagates failure.
482    /// Returns any cancel actions needed.
483    ///
484    /// # Errors (via returned actions)
485    ///
486    /// Propagates failure per the task's effective `FailureStrategy`.
487    pub fn record_spawn_failure(&mut self, task_id: TaskId, error: &str) -> Vec<SchedulerAction> {
488        // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
489        let error_excerpt: String = error.chars().take(512).collect();
490        tracing::warn!(
491            task_id = %task_id,
492            error = %error_excerpt,
493            "spawn failed, marking task failed"
494        );
495        self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
496        let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
497        let mut actions = Vec::new();
498        for cancel_task_id in cancel_ids {
499            if let Some(running) = self.running.remove(&cancel_task_id) {
500                actions.push(SchedulerAction::Cancel {
501                    agent_handle_id: running.agent_handle_id,
502                });
503            }
504        }
505        if self.graph.status != GraphStatus::Running {
506            self.graph.finished_at = Some(super::graph::chrono_now());
507            actions.push(SchedulerAction::Done {
508                status: self.graph.status,
509            });
510        }
511        actions
512    }
513
514    /// Cancel all running tasks (for user-initiated plan cancellation).
515    ///
516    /// # Warning: Cooperative Cancellation
517    ///
518    /// Cancellation is cooperative and asynchronous. Tool operations (file writes, shell
519    /// executions) in progress at the time of cancellation complete before the agent loop
520    /// checks the cancellation token. Callers should inspect the task graph state and clean
521    /// up partially-written artifacts manually.
522    pub fn cancel_all(&mut self) -> Vec<SchedulerAction> {
523        self.graph.status = GraphStatus::Canceled;
524        self.graph.finished_at = Some(super::graph::chrono_now());
525
526        // Drain running map first to avoid split borrow issues (M3).
527        let running: Vec<(TaskId, RunningTask)> = self.running.drain().collect();
528        let mut actions: Vec<SchedulerAction> = running
529            .into_iter()
530            .map(|(task_id, r)| {
531                self.graph.tasks[task_id.index()].status = TaskStatus::Canceled;
532                SchedulerAction::Cancel {
533                    agent_handle_id: r.agent_handle_id,
534                }
535            })
536            .collect();
537
538        for task in &mut self.graph.tasks {
539            if !task.status.is_terminal() {
540                task.status = TaskStatus::Canceled;
541            }
542        }
543
544        actions.push(SchedulerAction::Done {
545            status: GraphStatus::Canceled,
546        });
547        actions
548    }
549}
550
551impl DagScheduler {
552    /// Process a single `TaskEvent` and return any cancel actions needed.
553    fn process_event(&mut self, event: TaskEvent) -> Vec<SchedulerAction> {
554        let TaskEvent {
555            task_id,
556            agent_handle_id,
557            outcome,
558        } = event;
559
560        // Guard against stale events from previous incarnations (e.g. after timeout+retry).
561        // A timed-out agent's event_tx outlives the timeout and may send a completion later.
562        match self.running.get(&task_id) {
563            Some(running) if running.agent_handle_id != agent_handle_id => {
564                tracing::warn!(
565                    task_id = %task_id,
566                    expected = %running.agent_handle_id,
567                    got = %agent_handle_id,
568                    "discarding stale event from previous agent incarnation"
569                );
570                return Vec::new();
571            }
572            None => {
573                tracing::debug!(
574                    task_id = %task_id,
575                    agent_handle_id = %agent_handle_id,
576                    "ignoring event for task not in running map"
577                );
578                return Vec::new();
579            }
580            Some(_) => {}
581        }
582
583        // Compute duration BEFORE removing from running map (C1 fix).
584        let duration_ms = self.running.get(&task_id).map_or(0, |r| {
585            u64::try_from(r.started_at.elapsed().as_millis()).unwrap_or(u64::MAX)
586        });
587        let agent_def_name = self.running.get(&task_id).map(|r| r.agent_def_name.clone());
588
589        self.running.remove(&task_id);
590
591        match outcome {
592            TaskOutcome::Completed { output, artifacts } => {
593                self.graph.tasks[task_id.index()].status = TaskStatus::Completed;
594                self.graph.tasks[task_id.index()].result = Some(TaskResult {
595                    output,
596                    artifacts,
597                    duration_ms,
598                    agent_id: Some(agent_handle_id),
599                    agent_def: agent_def_name,
600                });
601
602                // Mark newly unblocked tasks as Ready.
603                let newly_ready = dag::ready_tasks(&self.graph);
604                for ready_id in newly_ready {
605                    if self.graph.tasks[ready_id.index()].status == TaskStatus::Pending {
606                        self.graph.tasks[ready_id.index()].status = TaskStatus::Ready;
607                    }
608                }
609
610                Vec::new()
611            }
612
613            TaskOutcome::Failed { error } => {
614                // SEC-ORCH-04: truncate error to avoid logging sensitive internal details.
615                let error_excerpt: String = error.chars().take(512).collect();
616                tracing::warn!(
617                    task_id = %task_id,
618                    error = %error_excerpt,
619                    "task failed"
620                );
621                self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
622
623                let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
624                let mut actions = Vec::new();
625
626                for cancel_task_id in cancel_ids {
627                    if let Some(running) = self.running.remove(&cancel_task_id) {
628                        actions.push(SchedulerAction::Cancel {
629                            agent_handle_id: running.agent_handle_id,
630                        });
631                    }
632                }
633
634                if self.graph.status != GraphStatus::Running {
635                    self.graph.finished_at = Some(super::graph::chrono_now());
636                    actions.push(SchedulerAction::Done {
637                        status: self.graph.status,
638                    });
639                }
640
641                actions
642            }
643        }
644    }
645
646    /// Check all running tasks for timeout violations.
647    ///
648    /// # Warning: Cooperative Cancellation
649    ///
650    /// Cancel actions emitted here signal agents cooperatively. Tool operations in progress
651    /// at the time of cancellation complete before the agent loop checks the cancellation
652    /// token. Partially-written artifacts may remain on disk after cancellation.
653    fn check_timeouts(&mut self) -> Vec<SchedulerAction> {
654        let timed_out: Vec<(TaskId, String)> = self
655            .running
656            .iter()
657            .filter(|(_, r)| r.started_at.elapsed() > self.task_timeout)
658            .map(|(id, r)| (*id, r.agent_handle_id.clone()))
659            .collect();
660
661        let mut actions = Vec::new();
662        for (task_id, agent_handle_id) in timed_out {
663            tracing::warn!(
664                task_id = %task_id,
665                timeout_secs = self.task_timeout.as_secs(),
666                "task timed out"
667            );
668            self.running.remove(&task_id);
669            self.graph.tasks[task_id.index()].status = TaskStatus::Failed;
670
671            actions.push(SchedulerAction::Cancel { agent_handle_id });
672
673            let cancel_ids = dag::propagate_failure(&mut self.graph, task_id);
674            for cancel_task_id in cancel_ids {
675                if let Some(running) = self.running.remove(&cancel_task_id) {
676                    actions.push(SchedulerAction::Cancel {
677                        agent_handle_id: running.agent_handle_id,
678                    });
679                }
680            }
681
682            if self.graph.status != GraphStatus::Running {
683                self.graph.finished_at = Some(super::graph::chrono_now());
684                actions.push(SchedulerAction::Done {
685                    status: self.graph.status,
686                });
687                break;
688            }
689        }
690
691        actions
692    }
693
694    /// Build the task prompt with dependency context injection (Section 14).
695    ///
696    /// Uses char-boundary-safe truncation (S1 fix) to avoid panics on multi-byte UTF-8.
697    /// Dependency output is sanitized (SEC-ORCH-01) and titles are XML-escaped to prevent
698    /// prompt injection via crafted task outputs.
699    fn build_task_prompt(&self, task: &TaskNode) -> String {
700        if task.depends_on.is_empty() {
701            return task.description.clone();
702        }
703
704        let completed_deps: Vec<&TaskNode> = task
705            .depends_on
706            .iter()
707            .filter_map(|dep_id| {
708                let dep = &self.graph.tasks[dep_id.index()];
709                if dep.status == TaskStatus::Completed {
710                    Some(dep)
711                } else {
712                    None
713                }
714            })
715            .collect();
716
717        if completed_deps.is_empty() {
718            return task.description.clone();
719        }
720
721        let budget_per_dep = self
722            .dependency_context_budget
723            .checked_div(completed_deps.len())
724            .unwrap_or(self.dependency_context_budget);
725
726        let mut context_block = String::from("<completed-dependencies>\n");
727
728        for dep in &completed_deps {
729            // SEC-ORCH-01: XML-escape dep.id and dep.title to prevent breaking out of the
730            // <completed-dependencies> wrapper via crafted titles.
731            let escaped_id = xml_escape(&dep.id.to_string());
732            let escaped_title = xml_escape(&dep.title);
733            let _ = writeln!(
734                context_block,
735                "## Task \"{escaped_id}\": \"{escaped_title}\" (completed)",
736            );
737
738            if let Some(ref result) = dep.result {
739                // SEC-ORCH-01: sanitize dep output to prevent prompt injection from upstream tasks.
740                let source = ContentSource::new(ContentSourceKind::A2aMessage);
741                let sanitized = self.sanitizer.sanitize(&result.output, source);
742                let safe_output = sanitized.body;
743
744                // Char-boundary-safe truncation (S1): use chars().take() instead of byte slicing.
745                let char_count = safe_output.chars().count();
746                if char_count > budget_per_dep {
747                    let truncated: String = safe_output.chars().take(budget_per_dep).collect();
748                    let _ = write!(
749                        context_block,
750                        "{truncated}...\n[truncated: {char_count} chars total]"
751                    );
752                } else {
753                    context_block.push_str(&safe_output);
754                }
755            } else {
756                context_block.push_str("[no output recorded]\n");
757            }
758            context_block.push('\n');
759        }
760
761        // Add notes for skipped deps.
762        for dep_id in &task.depends_on {
763            let dep = &self.graph.tasks[dep_id.index()];
764            if dep.status == TaskStatus::Skipped {
765                let escaped_id = xml_escape(&dep.id.to_string());
766                let escaped_title = xml_escape(&dep.title);
767                let _ = writeln!(
768                    context_block,
769                    "## Task \"{escaped_id}\": \"{escaped_title}\" (skipped -- no output available)\n",
770                );
771            }
772        }
773
774        context_block.push_str("</completed-dependencies>\n\n");
775        format!("{context_block}Your task: {}", task.description)
776    }
777}
778
779/// Escape XML special characters in a string to prevent tag injection.
780fn xml_escape(s: &str) -> String {
781    let mut out = String::with_capacity(s.len());
782    for c in s.chars() {
783        match c {
784            '<' => out.push_str("&lt;"),
785            '>' => out.push_str("&gt;"),
786            '&' => out.push_str("&amp;"),
787            '"' => out.push_str("&quot;"),
788            '\'' => out.push_str("&#39;"),
789            other => out.push(other),
790        }
791    }
792    out
793}
794
795#[cfg(test)]
796mod tests {
797    use super::*;
798    use crate::orchestration::graph::{
799        FailureStrategy, GraphStatus, TaskGraph, TaskNode, TaskStatus,
800    };
801
802    fn make_node(id: u32, deps: &[u32]) -> TaskNode {
803        let mut n = TaskNode::new(
804            id,
805            format!("task-{id}"),
806            format!("description for task {id}"),
807        );
808        n.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
809        n
810    }
811
812    fn graph_from_nodes(nodes: Vec<TaskNode>) -> TaskGraph {
813        let mut g = TaskGraph::new("test goal");
814        g.tasks = nodes;
815        g
816    }
817
818    fn make_def(name: &str) -> SubAgentDef {
819        use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
820        SubAgentDef {
821            name: name.to_string(),
822            description: format!("{name} agent"),
823            model: None,
824            tools: ToolPolicy::InheritAll,
825            disallowed_tools: vec![],
826            permissions: SubAgentPermissions::default(),
827            skills: SkillFilter::default(),
828            system_prompt: String::new(),
829            hooks: Default::default(),
830            memory: None,
831            source: None,
832            file_path: None,
833        }
834    }
835
836    fn make_config() -> crate::config::OrchestrationConfig {
837        crate::config::OrchestrationConfig {
838            enabled: true,
839            max_tasks: 20,
840            max_parallel: 4,
841            default_failure_strategy: "abort".to_string(),
842            default_max_retries: 3,
843            task_timeout_secs: 300,
844            planner_model: None,
845            planner_max_tokens: 4096,
846            dependency_context_budget: 16384,
847            confirm_before_execute: true,
848            aggregator_max_tokens: 4096,
849        }
850    }
851
852    struct FirstRouter;
853    impl AgentRouter for FirstRouter {
854        fn route(&self, _task: &TaskNode, available: &[SubAgentDef]) -> Option<String> {
855            available.first().map(|d| d.name.clone())
856        }
857    }
858
859    struct NoneRouter;
860    impl AgentRouter for NoneRouter {
861        fn route(&self, _task: &TaskNode, _available: &[SubAgentDef]) -> Option<String> {
862            None
863        }
864    }
865
866    fn make_scheduler_with_router(graph: TaskGraph, router: Box<dyn AgentRouter>) -> DagScheduler {
867        let config = make_config();
868        let defs = vec![make_def("worker")];
869        DagScheduler::new(graph, &config, router, defs).unwrap()
870    }
871
872    fn make_scheduler(graph: TaskGraph) -> DagScheduler {
873        let config = make_config();
874        let defs = vec![make_def("worker")];
875        DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap()
876    }
877
878    // --- constructor tests ---
879
880    #[test]
881    fn test_new_validates_graph_status() {
882        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
883        graph.status = GraphStatus::Running; // wrong status
884        let config = make_config();
885        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
886        assert!(result.is_err());
887        let err = result.unwrap_err();
888        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
889    }
890
891    #[test]
892    fn test_new_marks_roots_ready() {
893        let graph = graph_from_nodes(vec![
894            make_node(0, &[]),
895            make_node(1, &[]),
896            make_node(2, &[0, 1]),
897        ]);
898        let scheduler = make_scheduler(graph);
899        assert_eq!(scheduler.graph().tasks[0].status, TaskStatus::Ready);
900        assert_eq!(scheduler.graph().tasks[1].status, TaskStatus::Ready);
901        assert_eq!(scheduler.graph().tasks[2].status, TaskStatus::Pending);
902        assert_eq!(scheduler.graph().status, GraphStatus::Running);
903    }
904
905    #[test]
906    fn test_new_validates_empty_graph() {
907        let graph = graph_from_nodes(vec![]);
908        let config = make_config();
909        let result = DagScheduler::new(graph, &config, Box::new(FirstRouter), vec![]);
910        assert!(result.is_err());
911    }
912
913    // --- tick tests ---
914
915    #[test]
916    fn test_tick_produces_spawn_for_ready() {
917        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
918        let mut scheduler = make_scheduler(graph);
919        let actions = scheduler.tick();
920        let spawns: Vec<_> = actions
921            .iter()
922            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
923            .collect();
924        assert_eq!(spawns.len(), 2);
925    }
926
927    #[test]
928    fn test_tick_respects_max_parallel() {
929        let graph = graph_from_nodes(vec![
930            make_node(0, &[]),
931            make_node(1, &[]),
932            make_node(2, &[]),
933            make_node(3, &[]),
934            make_node(4, &[]),
935        ]);
936        let mut config = make_config();
937        config.max_parallel = 2;
938        let defs = vec![make_def("worker")];
939        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
940        let actions = scheduler.tick();
941        let spawn_count = actions
942            .iter()
943            .filter(|a| matches!(a, SchedulerAction::Spawn { .. }))
944            .count();
945        assert_eq!(spawn_count, 2);
946    }
947
948    #[test]
949    fn test_tick_detects_completion() {
950        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
951        graph.tasks[0].status = TaskStatus::Completed;
952        let config = make_config();
953        let defs = vec![make_def("worker")];
954        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
955        // Manually set graph to Running since new() validated Created status
956        // — but all tasks are terminal. tick() should detect completion.
957        let actions = scheduler.tick();
958        let has_done = actions.iter().any(|a| {
959            matches!(
960                a,
961                SchedulerAction::Done {
962                    status: GraphStatus::Completed
963                }
964            )
965        });
966        assert!(
967            has_done,
968            "should emit Done(Completed) when all tasks are terminal"
969        );
970    }
971
972    // --- completion event tests ---
973
974    #[test]
975    fn test_completion_event_marks_deps_ready() {
976        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
977        let mut scheduler = make_scheduler(graph);
978
979        // Simulate task 0 running.
980        scheduler.graph.tasks[0].status = TaskStatus::Running;
981        scheduler.running.insert(
982            TaskId(0),
983            RunningTask {
984                agent_handle_id: "handle-0".to_string(),
985                agent_def_name: "worker".to_string(),
986                started_at: Instant::now(),
987            },
988        );
989
990        let event = TaskEvent {
991            task_id: TaskId(0),
992            agent_handle_id: "handle-0".to_string(),
993            outcome: TaskOutcome::Completed {
994                output: "done".to_string(),
995                artifacts: vec![],
996            },
997        };
998        scheduler.buffered_events.push_back(event);
999
1000        let actions = scheduler.tick();
1001        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Completed);
1002        // Task 1 should now be Ready or Spawn action emitted.
1003        let has_spawn_1 = actions
1004            .iter()
1005            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(1)));
1006        assert!(
1007            has_spawn_1 || scheduler.graph.tasks[1].status == TaskStatus::Ready,
1008            "task 1 should be spawned or marked Ready"
1009        );
1010    }
1011
1012    #[test]
1013    fn test_failure_abort_cancels_running() {
1014        let graph = graph_from_nodes(vec![
1015            make_node(0, &[]),
1016            make_node(1, &[]),
1017            make_node(2, &[0, 1]),
1018        ]);
1019        let mut scheduler = make_scheduler(graph);
1020
1021        // Simulate tasks 0 and 1 running.
1022        scheduler.graph.tasks[0].status = TaskStatus::Running;
1023        scheduler.running.insert(
1024            TaskId(0),
1025            RunningTask {
1026                agent_handle_id: "h0".to_string(),
1027                agent_def_name: "worker".to_string(),
1028                started_at: Instant::now(),
1029            },
1030        );
1031        scheduler.graph.tasks[1].status = TaskStatus::Running;
1032        scheduler.running.insert(
1033            TaskId(1),
1034            RunningTask {
1035                agent_handle_id: "h1".to_string(),
1036                agent_def_name: "worker".to_string(),
1037                started_at: Instant::now(),
1038            },
1039        );
1040
1041        // Task 0 fails with default Abort strategy.
1042        let event = TaskEvent {
1043            task_id: TaskId(0),
1044            agent_handle_id: "h0".to_string(),
1045            outcome: TaskOutcome::Failed {
1046                error: "boom".to_string(),
1047            },
1048        };
1049        scheduler.buffered_events.push_back(event);
1050
1051        let actions = scheduler.tick();
1052        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1053        let cancel_ids: Vec<_> = actions
1054            .iter()
1055            .filter_map(|a| {
1056                if let SchedulerAction::Cancel { agent_handle_id } = a {
1057                    Some(agent_handle_id.as_str())
1058                } else {
1059                    None
1060                }
1061            })
1062            .collect();
1063        assert!(cancel_ids.contains(&"h1"), "task 1 should be canceled");
1064        assert!(
1065            actions
1066                .iter()
1067                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1068        );
1069    }
1070
1071    #[test]
1072    fn test_failure_skip_propagates() {
1073        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1074        let mut scheduler = make_scheduler(graph);
1075
1076        // Set failure strategy to Skip on task 0.
1077        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Skip);
1078        scheduler.graph.tasks[0].status = TaskStatus::Running;
1079        scheduler.running.insert(
1080            TaskId(0),
1081            RunningTask {
1082                agent_handle_id: "h0".to_string(),
1083                agent_def_name: "worker".to_string(),
1084                started_at: Instant::now(),
1085            },
1086        );
1087
1088        let event = TaskEvent {
1089            task_id: TaskId(0),
1090            agent_handle_id: "h0".to_string(),
1091            outcome: TaskOutcome::Failed {
1092                error: "skip me".to_string(),
1093            },
1094        };
1095        scheduler.buffered_events.push_back(event);
1096        scheduler.tick();
1097
1098        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Skipped);
1099        assert_eq!(scheduler.graph.tasks[1].status, TaskStatus::Skipped);
1100    }
1101
1102    #[test]
1103    fn test_failure_retry_reschedules() {
1104        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1105        let mut scheduler = make_scheduler(graph);
1106
1107        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1108        scheduler.graph.tasks[0].max_retries = Some(3);
1109        scheduler.graph.tasks[0].retry_count = 0;
1110        scheduler.graph.tasks[0].status = TaskStatus::Running;
1111        scheduler.running.insert(
1112            TaskId(0),
1113            RunningTask {
1114                agent_handle_id: "h0".to_string(),
1115                agent_def_name: "worker".to_string(),
1116                started_at: Instant::now(),
1117            },
1118        );
1119
1120        let event = TaskEvent {
1121            task_id: TaskId(0),
1122            agent_handle_id: "h0".to_string(),
1123            outcome: TaskOutcome::Failed {
1124                error: "transient".to_string(),
1125            },
1126        };
1127        scheduler.buffered_events.push_back(event);
1128        let actions = scheduler.tick();
1129
1130        // Task should be rescheduled (Ready) and a Spawn action emitted.
1131        let has_spawn = actions
1132            .iter()
1133            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1134        assert!(
1135            has_spawn || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1136            "retry should produce spawn or Ready status"
1137        );
1138        // retry_count incremented
1139        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1140    }
1141
1142    #[test]
1143    fn test_process_event_failed_retry() {
1144        // End-to-end: send Failed event, verify retry path produces Ready -> Spawn.
1145        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1146        let mut scheduler = make_scheduler(graph);
1147
1148        scheduler.graph.tasks[0].failure_strategy = Some(FailureStrategy::Retry);
1149        scheduler.graph.tasks[0].max_retries = Some(2);
1150        scheduler.graph.tasks[0].retry_count = 0;
1151        scheduler.graph.tasks[0].status = TaskStatus::Running;
1152        scheduler.running.insert(
1153            TaskId(0),
1154            RunningTask {
1155                agent_handle_id: "h0".to_string(),
1156                agent_def_name: "worker".to_string(),
1157                started_at: Instant::now(),
1158            },
1159        );
1160
1161        let event = TaskEvent {
1162            task_id: TaskId(0),
1163            agent_handle_id: "h0".to_string(),
1164            outcome: TaskOutcome::Failed {
1165                error: "first failure".to_string(),
1166            },
1167        };
1168        scheduler.buffered_events.push_back(event);
1169        let actions = scheduler.tick();
1170
1171        // After retry: retry_count = 1, status = Ready or Spawn emitted.
1172        assert_eq!(scheduler.graph.tasks[0].retry_count, 1);
1173        let spawned = actions
1174            .iter()
1175            .any(|a| matches!(a, SchedulerAction::Spawn { task_id, .. } if *task_id == TaskId(0)));
1176        assert!(
1177            spawned || scheduler.graph.tasks[0].status == TaskStatus::Ready,
1178            "retry should emit Spawn or set Ready"
1179        );
1180        // Graph must still be Running.
1181        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1182    }
1183
1184    #[test]
1185    fn test_timeout_cancels_stalled() {
1186        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1187        let mut config = make_config();
1188        config.task_timeout_secs = 1; // 1 second timeout
1189        let defs = vec![make_def("worker")];
1190        let mut scheduler = DagScheduler::new(graph, &config, Box::new(FirstRouter), defs).unwrap();
1191
1192        // Simulate a running task that started just over 1 second ago.
1193        scheduler.graph.tasks[0].status = TaskStatus::Running;
1194        scheduler.running.insert(
1195            TaskId(0),
1196            RunningTask {
1197                agent_handle_id: "h0".to_string(),
1198                agent_def_name: "worker".to_string(),
1199                started_at: Instant::now() - Duration::from_secs(2), // already timed out
1200            },
1201        );
1202
1203        let actions = scheduler.tick();
1204        let has_cancel = actions.iter().any(
1205            |a| matches!(a, SchedulerAction::Cancel { agent_handle_id } if agent_handle_id == "h0"),
1206        );
1207        assert!(has_cancel, "timed-out task should emit Cancel action");
1208        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1209    }
1210
1211    #[test]
1212    fn test_cancel_all() {
1213        let graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[])]);
1214        let mut scheduler = make_scheduler(graph);
1215
1216        scheduler.graph.tasks[0].status = TaskStatus::Running;
1217        scheduler.running.insert(
1218            TaskId(0),
1219            RunningTask {
1220                agent_handle_id: "h0".to_string(),
1221                agent_def_name: "worker".to_string(),
1222                started_at: Instant::now(),
1223            },
1224        );
1225        scheduler.graph.tasks[1].status = TaskStatus::Running;
1226        scheduler.running.insert(
1227            TaskId(1),
1228            RunningTask {
1229                agent_handle_id: "h1".to_string(),
1230                agent_def_name: "worker".to_string(),
1231                started_at: Instant::now(),
1232            },
1233        );
1234
1235        let actions = scheduler.cancel_all();
1236
1237        assert_eq!(scheduler.graph.status, GraphStatus::Canceled);
1238        assert!(scheduler.running.is_empty());
1239        let cancel_count = actions
1240            .iter()
1241            .filter(|a| matches!(a, SchedulerAction::Cancel { .. }))
1242            .count();
1243        assert_eq!(cancel_count, 2);
1244        assert!(actions.iter().any(|a| matches!(
1245            a,
1246            SchedulerAction::Done {
1247                status: GraphStatus::Canceled
1248            }
1249        )));
1250    }
1251
1252    #[test]
1253    fn test_record_spawn_failure() {
1254        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1255        let mut scheduler = make_scheduler(graph);
1256
1257        // Simulate task marked Running (by tick) but spawn failed.
1258        scheduler.graph.tasks[0].status = TaskStatus::Running;
1259
1260        let actions = scheduler.record_spawn_failure(TaskId(0), "spawn error");
1261        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1262        // With Abort strategy and no other running tasks, graph should be Failed.
1263        assert_eq!(scheduler.graph.status, GraphStatus::Failed);
1264        assert!(
1265            actions
1266                .iter()
1267                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1268        );
1269    }
1270
1271    #[test]
1272    fn test_build_prompt_no_deps() {
1273        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1274        let scheduler = make_scheduler(graph);
1275        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[0]);
1276        assert_eq!(prompt, "description for task 0");
1277    }
1278
1279    #[test]
1280    fn test_build_prompt_with_deps_and_truncation() {
1281        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1282        graph.tasks[0].status = TaskStatus::Completed;
1283        // Create output longer than budget
1284        graph.tasks[0].result = Some(TaskResult {
1285            output: "x".repeat(200),
1286            artifacts: vec![],
1287            duration_ms: 10,
1288            agent_id: None,
1289            agent_def: None,
1290        });
1291
1292        let config = crate::config::OrchestrationConfig {
1293            dependency_context_budget: 50,
1294            ..make_config()
1295        };
1296        let scheduler = DagScheduler::new(
1297            graph,
1298            &config,
1299            Box::new(FirstRouter),
1300            vec![make_def("worker")],
1301        )
1302        .unwrap();
1303
1304        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1305        assert!(prompt.contains("<completed-dependencies>"));
1306        assert!(prompt.contains("[truncated:"));
1307        assert!(prompt.contains("Your task:"));
1308    }
1309
1310    #[test]
1311    fn test_duration_ms_computed_correctly() {
1312        // Regression test for C1: duration_ms must be non-zero after completion.
1313        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1314        let mut scheduler = make_scheduler(graph);
1315
1316        scheduler.graph.tasks[0].status = TaskStatus::Running;
1317        scheduler.running.insert(
1318            TaskId(0),
1319            RunningTask {
1320                agent_handle_id: "h0".to_string(),
1321                agent_def_name: "worker".to_string(),
1322                started_at: Instant::now() - Duration::from_millis(50),
1323            },
1324        );
1325
1326        let event = TaskEvent {
1327            task_id: TaskId(0),
1328            agent_handle_id: "h0".to_string(),
1329            outcome: TaskOutcome::Completed {
1330                output: "result".to_string(),
1331                artifacts: vec![],
1332            },
1333        };
1334        scheduler.buffered_events.push_back(event);
1335        scheduler.tick();
1336
1337        let result = scheduler.graph.tasks[0].result.as_ref().unwrap();
1338        assert!(
1339            result.duration_ms > 0,
1340            "duration_ms should be > 0, got {}",
1341            result.duration_ms
1342        );
1343    }
1344
1345    #[test]
1346    fn test_utf8_safe_truncation() {
1347        // S1 regression: truncation must not panic on multi-byte UTF-8.
1348        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1349        graph.tasks[0].status = TaskStatus::Completed;
1350        // Unicode: each char is 3 bytes in UTF-8.
1351        let unicode_output = "日本語テスト".repeat(100);
1352        graph.tasks[0].result = Some(TaskResult {
1353            output: unicode_output,
1354            artifacts: vec![],
1355            duration_ms: 10,
1356            agent_id: None,
1357            agent_def: None,
1358        });
1359
1360        // Budget large enough to hold the spotlighting wrapper + some Japanese chars.
1361        // The sanitizer adds ~200 chars of spotlight header, so 500 chars is sufficient.
1362        let config = crate::config::OrchestrationConfig {
1363            dependency_context_budget: 500,
1364            ..make_config()
1365        };
1366        let scheduler = DagScheduler::new(
1367            graph,
1368            &config,
1369            Box::new(FirstRouter),
1370            vec![make_def("worker")],
1371        )
1372        .unwrap();
1373
1374        // Must not panic, and Japanese chars must be preserved in the output.
1375        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1376        assert!(
1377            prompt.contains("日"),
1378            "Japanese characters should be in the prompt after safe truncation"
1379        );
1380    }
1381
1382    #[test]
1383    fn test_no_agent_marks_task_failed() {
1384        // NoneRouter: when no agent is available, task is marked failed.
1385        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1386        let mut scheduler = make_scheduler_with_router(graph, Box::new(NoneRouter));
1387        let actions = scheduler.tick();
1388        assert_eq!(scheduler.graph.tasks[0].status, TaskStatus::Failed);
1389        assert!(
1390            actions
1391                .iter()
1392                .any(|a| matches!(a, SchedulerAction::Done { .. }))
1393        );
1394    }
1395
1396    #[test]
1397    fn test_stale_event_rejected() {
1398        // Regression: events from a previous agent incarnation must be discarded.
1399        let graph = graph_from_nodes(vec![make_node(0, &[])]);
1400        let mut scheduler = make_scheduler(graph);
1401
1402        // Simulate task running with handle "current-handle".
1403        scheduler.graph.tasks[0].status = TaskStatus::Running;
1404        scheduler.running.insert(
1405            TaskId(0),
1406            RunningTask {
1407                agent_handle_id: "current-handle".to_string(),
1408                agent_def_name: "worker".to_string(),
1409                started_at: Instant::now(),
1410            },
1411        );
1412
1413        // Send a completion event from the OLD agent (stale handle).
1414        let stale_event = TaskEvent {
1415            task_id: TaskId(0),
1416            agent_handle_id: "old-handle".to_string(),
1417            outcome: TaskOutcome::Completed {
1418                output: "stale output".to_string(),
1419                artifacts: vec![],
1420            },
1421        };
1422        scheduler.buffered_events.push_back(stale_event);
1423        let actions = scheduler.tick();
1424
1425        // Stale event must be discarded — task must NOT be completed.
1426        assert_ne!(
1427            scheduler.graph.tasks[0].status,
1428            TaskStatus::Completed,
1429            "stale event must not complete the task"
1430        );
1431        // No Spawn or Done actions should result from a discarded stale event.
1432        let has_done = actions
1433            .iter()
1434            .any(|a| matches!(a, SchedulerAction::Done { .. }));
1435        assert!(
1436            !has_done,
1437            "no Done action should be emitted for a stale event"
1438        );
1439        // Task must still be in the running map.
1440        assert!(
1441            scheduler.running.contains_key(&TaskId(0)),
1442            "running task must remain after stale event"
1443        );
1444    }
1445
1446    #[test]
1447    fn test_build_prompt_chars_count_in_truncation_message() {
1448        // Fix #3: truncation message must report char count, not byte count.
1449        // Use pure ASCII so sanitization doesn't significantly change char count.
1450        // Budget < output length => truncation triggered; verify the count label is "chars total".
1451        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1452        graph.tasks[0].status = TaskStatus::Completed;
1453        // ASCII output: byte count == char count, so both old and new code produce the same number,
1454        // but the label "chars total" (not "bytes total") is what matters here.
1455        let output = "x".repeat(200);
1456        graph.tasks[0].result = Some(TaskResult {
1457            output,
1458            artifacts: vec![],
1459            duration_ms: 10,
1460            agent_id: None,
1461            agent_def: None,
1462        });
1463
1464        let config = crate::config::OrchestrationConfig {
1465            dependency_context_budget: 10, // truncate: sanitized output >> 10 chars
1466            ..make_config()
1467        };
1468        let scheduler = DagScheduler::new(
1469            graph,
1470            &config,
1471            Box::new(FirstRouter),
1472            vec![make_def("worker")],
1473        )
1474        .unwrap();
1475
1476        let prompt = scheduler.build_task_prompt(&scheduler.graph.tasks[1]);
1477        // Truncation must have been triggered and the message must use "chars total" label.
1478        assert!(
1479            prompt.contains("chars total"),
1480            "truncation message must use 'chars total' label. Prompt: {prompt}"
1481        );
1482        assert!(
1483            prompt.contains("[truncated:"),
1484            "prompt must contain truncation notice. Prompt: {prompt}"
1485        );
1486    }
1487
1488    // --- resume_from tests (MT-1) ---
1489
1490    #[test]
1491    fn test_resume_from_accepts_paused_graph() {
1492        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1493        graph.status = GraphStatus::Paused;
1494        graph.tasks[0].status = TaskStatus::Pending;
1495
1496        let scheduler =
1497            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1498                .expect("resume_from should accept Paused graph");
1499        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1500    }
1501
1502    #[test]
1503    fn test_resume_from_accepts_failed_graph() {
1504        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1505        graph.status = GraphStatus::Failed;
1506        graph.tasks[0].status = TaskStatus::Failed;
1507
1508        let scheduler =
1509            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1510                .expect("resume_from should accept Failed graph");
1511        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1512    }
1513
1514    #[test]
1515    fn test_resume_from_rejects_completed_graph() {
1516        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1517        graph.status = GraphStatus::Completed;
1518
1519        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1520            .unwrap_err();
1521        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1522    }
1523
1524    #[test]
1525    fn test_resume_from_rejects_canceled_graph() {
1526        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1527        graph.status = GraphStatus::Canceled;
1528
1529        let err = DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1530            .unwrap_err();
1531        assert!(matches!(err, OrchestrationError::InvalidGraph(_)));
1532    }
1533
1534    #[test]
1535    fn test_resume_from_reconstructs_running_tasks() {
1536        // IC1: tasks that were Running at pause time must appear in the running map.
1537        let mut graph = graph_from_nodes(vec![make_node(0, &[]), make_node(1, &[0])]);
1538        graph.status = GraphStatus::Paused;
1539        graph.tasks[0].status = TaskStatus::Running;
1540        graph.tasks[0].assigned_agent = Some("handle-abc".to_string());
1541        graph.tasks[0].agent_hint = Some("worker".to_string());
1542        graph.tasks[1].status = TaskStatus::Pending;
1543
1544        let scheduler =
1545            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1546                .expect("should succeed");
1547
1548        assert!(
1549            scheduler.running.contains_key(&TaskId(0)),
1550            "Running task must be reconstructed in the running map (IC1)"
1551        );
1552        assert_eq!(scheduler.running[&TaskId(0)].agent_handle_id, "handle-abc");
1553        assert!(
1554            !scheduler.running.contains_key(&TaskId(1)),
1555            "Pending task must not appear in running map"
1556        );
1557    }
1558
1559    #[test]
1560    fn test_resume_from_sets_status_running() {
1561        // II3: resume_from must set graph.status = Running regardless of input status.
1562        let mut graph = graph_from_nodes(vec![make_node(0, &[])]);
1563        graph.status = GraphStatus::Paused;
1564
1565        let scheduler =
1566            DagScheduler::resume_from(graph, &make_config(), Box::new(FirstRouter), vec![])
1567                .unwrap();
1568        assert_eq!(scheduler.graph.status, GraphStatus::Running);
1569    }
1570}