Skip to main content

deepstrike_core/orchestration/workflow/
run.rs

1//! W0: a kernel-resident workflow run — the DAG state for one in-flight [`WorkflowSpec`].
2//!
3//! Pure data + pure advance logic, no I/O and no syscall: the [`crate::scheduler::state_machine::
4//! LoopStateMachine`] drives this, gating each ready node's spawn through
5//! `evaluate_syscall(Syscall::Spawn)` and reusing the existing batch-await barrier
6//! (`SuspendState::SubAgentAwait`). This module only tracks *which* nodes are ready, spawned,
7//! done, or denied, and builds each node's [`IsolationManifest`].
8//!
9//! Lifecycle: `ready_batch()` → (gate each) `mark_spawned` / `mark_denied` → on completion
10//! `record_completion` → repeat until `is_complete()`.
11
12use std::collections::HashMap;
13
14use serde::{Deserialize, Serialize};
15
16use crate::orchestration::task_graph::{TaskGraph, TaskStatus};
17use crate::orchestration::tournament::{EntrantId, Match, Tournament, TournamentAction};
18use super::{NodeKind, NodeTrust, WorkflowNode, WorkflowSpec};
19use crate::types::agent::{AgentIsolation, AgentRole, ContextInheritance, IsolationManifest};
20use crate::types::error::Result;
21use crate::types::result::{LoopResult, TerminationReason};
22
23/// Deterministic kernel agent id for a workflow node (stable across resume / audit).
24pub fn node_agent_id(node: usize) -> String {
25    format!("wf-node{node}")
26}
27
28/// Enough to run one spawned workflow node, carried to the SDK in the `WorkflowBatchSpawned`
29/// observation. Role/isolation/inheritance are canonical snake_case strings (serde names) so the
30/// host SDK can rebuild an agent run spec — the kernel generates these specs internally, so this
31/// is how the goal reaches the SDK that actually executes the node.
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
33pub struct WorkflowSpawnInfo {
34    pub agent_id: String,
35    pub goal: String,
36    pub role: String,
37    pub isolation: String,
38    pub context_inheritance: String,
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub model_hint: Option<String>,
41    /// W3 trust level (`"trusted"` | `"quarantined"`) — the SDK runs quarantined nodes without
42    /// privileges and crosses their output back only as a structured summary.
43    #[serde(default = "default_trust")]
44    pub trust: String,
45    /// G3 structured output: the JSON Schema the node's output must conform to, carried verbatim
46    /// from [`WorkflowNode::output_schema`]. The SDK instructs the agent with it and validates +
47    /// retries on its result. `None` when the node declared no schema. Additive ABI.
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub output_schema: Option<serde_json::Value>,
50    /// G2 deterministic compute: present only for a [`NodeKind::Reduce`] node — the name of the
51    /// SDK-registered pure function the SDK runs (over `input_agent_ids`' outputs) instead of an LLM
52    /// agent. `None` for every ordinary node. Additive ABI.
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub reducer: Option<String>,
55    /// G2: the dependency agent ids whose outputs a [`NodeKind::Reduce`] node consumes (its
56    /// `depends_on`, resolved to stable agent ids). Empty for non-reduce nodes. Additive ABI.
57    #[serde(default, skip_serializing_if = "Vec::is_empty")]
58    pub input_agent_ids: Vec<String>,
59    /// Present only for a tournament *judge* spawn (A#2): the two entrant agent ids whose outputs
60    /// this judge must compare. The SDK looks up those entrants' produced candidates, runs the
61    /// judge, and reports the winner in the result's `tournament_winner`. `None` for every ordinary
62    /// (entrant / spawn / loop / classify) node. Additive ABI: omitted on the wire when `None`.
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub judge_match: Option<JudgeMatch>,
65    /// Present only for a [`NodeKind::Loop`] iteration spawn (A#2 v2): the loop's `max_iters`. It
66    /// both *marks* the spawn as a loop iteration — so the SDK knows to solicit and report a
67    /// `loop_continue` stop signal from the agent — and gives the cap for the agent's prompt. `None`
68    /// for every non-loop node. Mirrors how `reducer` / `judge_match` distinguish reduce / judge
69    /// spawns. Additive ABI: omitted on the wire when `None`.
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub loop_max_iters: Option<usize>,
72    /// Present only for a [`NodeKind::Classify`] spawn (A#2): the branch labels the classifier must
73    /// choose among. Non-empty *marks* the spawn as a classifier — the SDK instructs the agent to
74    /// pick exactly one label and reports it in the result's `classify_branch`. Empty for every
75    /// non-classify node. Additive ABI: omitted on the wire when empty.
76    #[serde(default, skip_serializing_if = "Vec::is_empty")]
77    pub classify_labels: Vec<String>,
78    /// M4/G5: the node's per-node cumulative token cap, if set. The SDK sets the child run's
79    /// `max_total_tokens` to this so the node self-terminates at the cap. Additive ABI: omitted when
80    /// `None`.
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub token_budget: Option<u64>,
83}
84
85fn default_trust() -> String {
86    "trusted".to_string()
87}
88
89/// A pairwise judge assignment carried to the SDK on a tournament judge's `WorkflowSpawnInfo`:
90/// the two entrant agent ids whose produced outputs are to be compared. The SDK maps each id back
91/// to that entrant's candidate and asks the judge which is better.
92#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
93pub struct JudgeMatch {
94    pub left: String,
95    pub right: String,
96}
97
98/// G4 budget-as-signal: a snapshot of the workflow's remaining headroom under the active resource
99/// quota, carried to the SDK on every `WorkflowBatchSpawned`. A coordinator/submitter node reads it
100/// to *scale its next submission to what is actually available* — the analogue of the host-side
101/// `budget.remaining()` in the code-orchestration model — instead of blindly hitting the cap and
102/// eating a `Deny`. `None` remaining fields mean that dimension is unbounded (no quota set).
103#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
104pub struct WorkflowBudget {
105    /// Nodes currently in the DAG (spec + every runtime submission so far).
106    pub nodes_used: usize,
107    /// `ResourceQuota::max_workflow_nodes`, if set.
108    #[serde(default, skip_serializing_if = "Option::is_none")]
109    pub nodes_max: Option<usize>,
110    /// `nodes_max - nodes_used` (saturating), if a node cap is set — how many more nodes may be
111    /// submitted before the `max_workflow_nodes` backstop denies further growth.
112    #[serde(default, skip_serializing_if = "Option::is_none")]
113    pub nodes_remaining: Option<usize>,
114    /// Sub-agents currently in the `running` state.
115    pub running_subagents: usize,
116    /// `ResourceQuota::max_concurrent_subagents`, if set.
117    #[serde(default, skip_serializing_if = "Option::is_none")]
118    pub max_concurrent_subagents: Option<usize>,
119    /// `max_concurrent_subagents - running_subagents` (saturating), if a concurrency cap is set —
120    /// how many of a submission's nodes can spawn *immediately* rather than deferring for a slot.
121    #[serde(default, skip_serializing_if = "Option::is_none")]
122    pub concurrency_remaining: Option<usize>,
123    /// M4/G5: cumulative tokens spent across the run so far (the scheduler's `total_tokens`).
124    /// `#[serde(default)]` keeps older JSON (without this field) deserializing to 0 — additive ABI.
125    #[serde(default)]
126    pub tokens_used: u64,
127    /// M4/G5: `SchedulerBudget::max_total_tokens` — the run's cumulative token cap.
128    #[serde(default, skip_serializing_if = "Option::is_none")]
129    pub tokens_max: Option<u64>,
130    /// M4/G5: `tokens_max - tokens_used` (saturating) — how many tokens remain before the run-level
131    /// token budget terminates the workflow. Lets a coordinator scale its next submission to token
132    /// headroom (the analogue of "use 10k tokens").
133    #[serde(default, skip_serializing_if = "Option::is_none")]
134    pub tokens_remaining: Option<u64>,
135}
136
137fn role_label(role: AgentRole) -> &'static str {
138    match role {
139        AgentRole::Explore => "explore",
140        AgentRole::Plan => "plan",
141        AgentRole::Implement => "implement",
142        AgentRole::Verify => "verify",
143        AgentRole::Custom => "custom",
144    }
145}
146
147fn isolation_label(isolation: AgentIsolation) -> &'static str {
148    match isolation {
149        AgentIsolation::Shared => "shared",
150        AgentIsolation::ReadOnly => "read_only",
151        AgentIsolation::Worktree => "worktree",
152        AgentIsolation::Remote => "remote",
153    }
154}
155
156fn inheritance_label(inheritance: ContextInheritance) -> &'static str {
157    match inheritance {
158        ContextInheritance::None => "none",
159        ContextInheritance::SystemOnly => "system_only",
160        ContextInheritance::Full => "full",
161    }
162}
163
164fn trust_label(trust: NodeTrust) -> &'static str {
165    match trust {
166        NodeTrust::Trusted => "trusted",
167        NodeTrust::Quarantined => "quarantined",
168    }
169}
170
171/// Synthetic terminal result for a node recovered as already-completed during resume.
172fn resumed_result() -> LoopResult {
173    LoopResult {
174        termination: crate::types::result::TerminationReason::Completed,
175        final_message: None,
176        turns_used: 0,
177        total_tokens_used: 0,
178        loop_continue: None,
179        classify_branch: None,
180        tournament_winner: None,
181    }
182}
183
184/// In-flight bracket state for one `NodeKind::Tournament` controller node. Entrant and judge
185/// children are appended as ordinary graph nodes (so they flow through the unchanged spawn loop);
186/// this just tracks the phase and the current round's judges so completions advance the bracket.
187struct TournamentState {
188    /// Entrant child node indices (the generators), in entrant order.
189    entrant_nodes: Vec<usize>,
190    /// Entrants still generating; the bracket starts when this reaches 0.
191    entrants_remaining: usize,
192    /// Single-elimination bracket — `None` during the entrant phase, `Some` once judging begins.
193    bracket: Option<Tournament>,
194    /// Current round's judge child node indices, aligned to the bracket's pending matches.
195    judge_nodes: Vec<usize>,
196    /// Winner reported per current-round match (aligned to `judge_nodes`); `None` until judged.
197    judge_winners: Vec<Option<EntrantId>>,
198    /// Judges still deliberating this round; the round resolves when this reaches 0.
199    judges_remaining: usize,
200}
201
202/// The state of one in-flight workflow execution.
203pub struct WorkflowRun {
204    graph: TaskGraph,
205    nodes: Vec<WorkflowNode>,
206    /// Parent session id stamped onto each node's spawned-agent manifest.
207    parent_session_id: String,
208    /// Completed-event lookup: kernel agent id → DAG node index.
209    node_of_agent: HashMap<String, usize>,
210    /// Nodes spawned in the current batch, awaiting completion.
211    batch: Vec<usize>,
212    /// Completed-iteration count per `Loop` node (absent / 0 = no iterations finished yet). The
213    /// in-flight iteration's agent id is `wf-node{N}-i{iter_counts[N]}`.
214    iter_counts: HashMap<usize, usize>,
215    /// In-flight bracket state per `NodeKind::Tournament` controller node index.
216    tournaments: HashMap<usize, TournamentState>,
217    /// Reverse map: an appended entrant/judge child node index → its controller node index.
218    child_controller: HashMap<usize, usize>,
219    /// Judge-match descriptor per judge child node index (read by `spawn_info`).
220    judge_matches: HashMap<usize, JudgeMatch>,
221}
222
223impl WorkflowRun {
224    /// Build from a spec. Validates dependency indices + acyclicity (reuses `WorkflowSpec`).
225    pub fn new(spec: &WorkflowSpec, parent_session_id: &str) -> Result<Self> {
226        spec.validate()?;
227        Ok(Self {
228            graph: spec.to_task_graph()?,
229            nodes: spec.nodes.clone(),
230            parent_session_id: parent_session_id.to_string(),
231            node_of_agent: HashMap::new(),
232            batch: Vec::new(),
233            iter_counts: HashMap::new(),
234            tournaments: HashMap::new(),
235            child_controller: HashMap::new(),
236            judge_matches: HashMap::new(),
237        })
238    }
239
240    /// W0-ABI resume: rebuild an in-flight run by replaying which node agent-ids already completed
241    /// (e.g. recovered from the session log after an interruption). Those nodes are pre-marked
242    /// done so [`ready_batch`](Self::ready_batch) returns only the remaining work — the kernel then
243    /// continues the DAG from where it left off. Unknown ids are ignored.
244    ///
245    /// R3-1: `submissions` are the runtime [`Self::submit_nodes`] batches recorded (in order) before
246    /// the interruption. They are re-applied **first**, reconstructing dynamically-appended nodes at
247    /// the same indices/ids they had originally — `submit_nodes` appends by order alone (independent
248    /// of completion state), so re-applying every submission up front and then marking completions
249    /// reproduces the exact pre-interruption graph. Without this, appended nodes (not in the spec)
250    /// would vanish on resume and their completed ids would match nothing.
251    pub fn resume(
252        spec: &WorkflowSpec,
253        parent_session_id: &str,
254        submissions: &[Vec<WorkflowNode>],
255        completed: &[String],
256    ) -> Result<Self> {
257        let mut run = Self::new(spec, parent_session_id)?;
258        for batch in submissions {
259            run.submit_nodes(batch.clone());
260        }
261        let n = run.graph.len();
262        for id in completed {
263            if let Some(node) = (0..n).find(|&i| node_agent_id(i) == *id) {
264                run.graph.start(node);
265                run.graph.complete(node, resumed_result());
266            }
267        }
268        Ok(run)
269    }
270
271    /// Node indices whose dependencies are satisfied and that have not yet started.
272    pub fn ready_batch(&self) -> Vec<usize> {
273        self.graph.ready_tasks()
274    }
275
276    /// The agent id for a node's *current* spawn. For a `Spawn` node this is the stable
277    /// `wf-node{N}`; for a `Loop` node it is `wf-node{N}-i{k}` where `k` is the count of iterations
278    /// already finished — so each iteration gets a distinct id without any new ABI (the SDK simply
279    /// spawns the id it is given and feeds it back as a `sub_agent_completed`).
280    pub fn current_agent_id(&self, node: usize) -> String {
281        match self.nodes[node].kind {
282            NodeKind::Loop { .. } => {
283                let k = self.iter_counts.get(&node).copied().unwrap_or(0);
284                format!("{}-i{k}", node_agent_id(node))
285            }
286            // Spawn / Classify run once, a Tournament controller never spawns its own agent (its
287            // entrant/judge children are separate Spawn nodes), and a Reduce node runs once as host
288            // compute → stable plain id.
289            NodeKind::Spawn
290            | NodeKind::Classify { .. }
291            | NodeKind::Tournament { .. }
292            | NodeKind::Reduce { .. } => node_agent_id(node),
293        }
294    }
295
296    /// Build the isolation manifest for a node's current spawn, preserving its explicit isolation +
297    /// context-inheritance (the `AgentRunSpec`→`from_spec` path would overwrite these with
298    /// role defaults). Capability inheritance for workflow nodes is left to a later round.
299    pub fn manifest_for(&self, node: usize) -> IsolationManifest {
300        let n = &self.nodes[node];
301        IsolationManifest {
302            agent_id: self.current_agent_id(node).into(),
303            parent_session_id: self.parent_session_id.as_str().into(),
304            role: n.role,
305            isolation: n.isolation,
306            context_inheritance: n.context_inheritance,
307            permitted_capability_ids: Vec::new(),
308        }
309    }
310
311    /// The goal text for a node (for the spawn's run spec / context injection).
312    pub fn goal_of(&self, node: usize) -> &str {
313        &self.nodes[node].task.goal
314    }
315
316    /// W3 quarantine invariant: a quarantined node reads untrusted content and must run read-only.
317    /// Returns `true` if the node is `Quarantined` yet declares a write-capable isolation
318    /// (`Shared`/`Worktree`/`Remote`) — a privilege contradiction the kernel refuses to spawn,
319    /// turning the SDK's "self-discipline" quarantine into an in-kernel, auditable enforcement.
320    pub fn quarantine_violation(&self, node: usize) -> bool {
321        let n = &self.nodes[node];
322        matches!(n.trust, NodeTrust::Quarantined)
323            && !matches!(n.isolation, AgentIsolation::ReadOnly)
324    }
325
326    /// The SDK-facing spawn descriptor for a node (agent id + goal + canonical role/isolation/
327    /// inheritance strings + model hint). The kernel owns the spec; this is how the goal reaches
328    /// the host that runs the node.
329    pub fn spawn_info(&self, node: usize) -> WorkflowSpawnInfo {
330        let n = &self.nodes[node];
331        // G2: a Reduce node carries its reducer name + the stable agent ids of its dependencies, so
332        // the SDK can gather those outputs and run the pure function. Non-reduce nodes carry neither.
333        let (reducer, input_agent_ids) = match &n.kind {
334            NodeKind::Reduce { reducer } => (
335                Some(reducer.clone()),
336                n.depends_on.iter().map(|&d| node_agent_id(d)).collect(),
337            ),
338            _ => (None, Vec::new()),
339        };
340        // A#2 v2 / classify: surface the control-flow kind so the SDK can solicit + report the
341        // matching result signal (`loop_continue` / `classify_branch`), mirroring how `reducer` /
342        // `judge_match` distinguish reduce / judge spawns.
343        let loop_max_iters = match &n.kind {
344            NodeKind::Loop { max_iters } => Some(*max_iters),
345            _ => None,
346        };
347        let classify_labels = match &n.kind {
348            NodeKind::Classify { branches } => branches.iter().map(|b| b.label.clone()).collect(),
349            _ => Vec::new(),
350        };
351        WorkflowSpawnInfo {
352            agent_id: self.current_agent_id(node),
353            goal: n.task.goal.clone(),
354            role: role_label(n.role).to_string(),
355            isolation: isolation_label(n.isolation).to_string(),
356            context_inheritance: inheritance_label(n.context_inheritance).to_string(),
357            model_hint: n.model_hint.clone(),
358            trust: trust_label(n.trust).to_string(),
359            output_schema: n.output_schema.clone(),
360            reducer,
361            input_agent_ids,
362            judge_match: self.judge_matches.get(&node).cloned(),
363            loop_max_iters,
364            classify_labels,
365            token_budget: n.token_budget,
366        }
367    }
368
369    /// Mark a node as spawned: start it in the graph, record it in the live batch, and map its
370    /// kernel agent id back to the node for completion routing.
371    pub fn mark_spawned(&mut self, node: usize, agent_id: &str) {
372        self.graph.start(node);
373        self.batch.push(node);
374        self.node_of_agent.insert(agent_id.to_string(), node);
375    }
376
377    /// Mark a node as denied by the syscall gate: fail it in the graph (dependents stay pending
378    /// and will never become ready). Does not enter the live batch.
379    pub fn mark_denied(&mut self, node: usize) {
380        self.graph.fail(node);
381    }
382
383    /// Record a completed sub-agent against its node. Returns the node index if `agent_id`
384    /// belonged to this workflow (and removes it from the live batch), else `None`.
385    ///
386    /// For a `Loop` node this counts the finished iteration: while more iterations remain
387    /// (`< max_iters`) the node is re-armed (`set_ready`) — so the next `ready_batch`/spawn round
388    /// runs `wf-node{N}-i{k+1}` — and the node stays non-terminal, keeping its dependents pending.
389    /// Only when the loop is exhausted is the node `complete`d, promoting its dependents.
390    pub fn record_completion(&mut self, agent_id: &str, result: LoopResult) -> Option<usize> {
391        let node = *self.node_of_agent.get(agent_id)?;
392        self.batch.retain(|&n| n != node);
393
394        // A tournament entrant/judge child: route the completion into its controller's bracket
395        // rather than treating it as an ordinary node (it has no dependents of its own).
396        if let Some(&controller) = self.child_controller.get(&node) {
397            return self.advance_tournament(controller, node, result);
398        }
399
400        match &self.nodes[node].kind {
401            NodeKind::Loop { max_iters } => {
402                // v2 semantic stop: the iteration may signal "done" (`loop_continue == Some(false)`),
403                // ending the loop before `max_iters`. `None`/`Some(true)` run to the cap (v1 behavior).
404                let max_iters = *max_iters;
405                let stop_requested = result.loop_continue == Some(false);
406                let done = self.iter_counts.entry(node).or_insert(0);
407                *done += 1;
408                if *done < max_iters && !stop_requested {
409                    // More iterations: re-arm the node, keep it (and its dependents) in flight.
410                    self.graph.set_ready(node);
411                    return Some(node);
412                }
413            }
414            NodeKind::Classify { branches } => {
415                // Route to the branch matching the classifier's reported label; prune every other
416                // branch's nodes (fail them) *before* completing this node, so that `complete`'s
417                // dependent-promotion only arms the chosen branch (failed nodes are never re-armed).
418                let chosen = result.classify_branch.clone();
419                let prune: Vec<usize> = branches
420                    .iter()
421                    .filter(|b| Some(&b.label) != chosen.as_ref())
422                    .flat_map(|b| b.nodes.iter().copied())
423                    .collect();
424                for bn in prune {
425                    self.graph.fail(bn);
426                }
427            }
428            // A Tournament controller never reaches here (it spawns no agent of its own; its
429            // children route through `child_controller` above). A Reduce node completes like a Spawn
430            // (its host-compute result feeds back as an ordinary completion). Defensive no-op.
431            NodeKind::Spawn | NodeKind::Tournament { .. } | NodeKind::Reduce { .. } => {}
432        }
433
434        // Spawn node, loop's final iteration, or a completed classifier. A node whose agent
435        // terminated in `Error` is *failed* (its dependents starve) rather than completed — an
436        // errored agent must not promote dependents that would run on missing/garbage input. This
437        // is also the SDK's only lever to fail a node from a result: G3 schema enforcement returns
438        // an `Error`-terminated result when output never conforms, failing the node here. Other
439        // terminations (max-turns / budget / timeout) still complete — they may carry partial output.
440        if matches!(result.termination, crate::types::result::TerminationReason::Error) {
441            self.graph.fail(node);
442        } else {
443            self.graph.complete(node, result);
444        }
445        Some(node)
446    }
447
448    // ── Tournament controller (A#2) ─────────────────────────────────────────────────────────────
449
450    /// Append an entrant/judge *child* node (no dependencies → immediately Ready) and return its
451    /// index. Keeps `self.nodes` and `self.graph` index-aligned (both grow in lockstep), so the
452    /// child flows through the unchanged spawn loop as an ordinary `wf-node{idx}` spawn.
453    fn append_child(&mut self, node: WorkflowNode) -> usize {
454        let idx = self.graph.add(node.task.clone(), Vec::new());
455        debug_assert_eq!(idx, self.nodes.len(), "graph/nodes index drift");
456        self.nodes.push(node);
457        idx
458    }
459
460    /// Expand every tournament controller node whose dependencies are now satisfied (status
461    /// `Ready`) into its entrant children. The controller is moved to `Running` (it spawns no agent
462    /// of its own) and stays non-terminal until its bracket resolves. Called by the executor before
463    /// each spawn round, so a controller behind upstream deps expands the moment those complete.
464    pub fn expand_ready_controllers(&mut self) {
465        let pending: Vec<usize> = (0..self.nodes.len())
466            .filter(|i| !self.tournaments.contains_key(i))
467            .filter(|&i| matches!(self.nodes[i].kind, NodeKind::Tournament { .. }))
468            .filter(|&i| self.graph.get(i).map(|n| n.status) == Some(TaskStatus::Ready))
469            .collect();
470        for c in pending {
471            self.expand_tournament(c);
472        }
473    }
474
475    /// Fan a controller out into its entrant generators. Entrants run independent + read-only (a
476    /// clean context per candidate, quarantine-safe), inheriting the controller's trust.
477    fn expand_tournament(&mut self, c: usize) {
478        let entrants = match &self.nodes[c].kind {
479            NodeKind::Tournament { entrants } => entrants.clone(),
480            _ => return,
481        };
482        let trust = self.nodes[c].trust;
483        // Controller spawns no agent of its own → take it out of the ready set until we complete it.
484        self.graph.start(c);
485        let mut entrant_nodes = Vec::with_capacity(entrants.len());
486        for task in entrants {
487            let child = WorkflowNode::new(task, AgentRole::Custom)
488                .with_isolation(AgentIsolation::ReadOnly)
489                .with_trust(trust);
490            let idx = self.append_child(child);
491            self.child_controller.insert(idx, c);
492            entrant_nodes.push(idx);
493        }
494        let entrants_remaining = entrant_nodes.len();
495        self.tournaments.insert(
496            c,
497            TournamentState {
498                entrant_nodes,
499                entrants_remaining,
500                bracket: None,
501                judge_nodes: Vec::new(),
502                judge_winners: Vec::new(),
503                judges_remaining: 0,
504            },
505        );
506    }
507
508    /// A tournament child (entrant or judge) completed: advance the controller's bracket. Returns
509    /// the controller node index (the node that conceptually progressed).
510    fn advance_tournament(
511        &mut self,
512        controller: usize,
513        child: usize,
514        result: LoopResult,
515    ) -> Option<usize> {
516        // The child has no dependents; mark it terminal so the graph's done/outcome accounting works.
517        self.graph.complete(child, result.clone());
518
519        let in_entrant_phase = self.tournaments.get(&controller)?.bracket.is_none();
520        if in_entrant_phase {
521            let all_in = {
522                let st = self.tournaments.get_mut(&controller)?;
523                st.entrants_remaining = st.entrants_remaining.saturating_sub(1);
524                st.entrants_remaining == 0
525            };
526            if all_in {
527                self.begin_bracket(controller);
528            }
529        } else {
530            let round_done = {
531                let st = self.tournaments.get_mut(&controller)?;
532                if let Some(pos) = st.judge_nodes.iter().position(|&n| n == child) {
533                    st.judge_winners[pos] = result.tournament_winner.clone();
534                }
535                st.judges_remaining = st.judges_remaining.saturating_sub(1);
536                st.judges_remaining == 0
537            };
538            if round_done {
539                self.finish_round(controller);
540            }
541        }
542        Some(controller)
543    }
544
545    /// All entrants are in: embed the bracket over their agent ids and emit round 1's judges.
546    fn begin_bracket(&mut self, controller: usize) {
547        let entrant_ids: Vec<EntrantId> = self
548            .tournaments
549            .get(&controller)
550            .map(|st| st.entrant_nodes.iter().map(|&n| node_agent_id(n)).collect())
551            .unwrap_or_default();
552        // ≥2 entrants is guaranteed by `validate`; `Tournament::new` only rejects an empty field.
553        let mut bracket = match Tournament::new(entrant_ids) {
554            Ok(b) => b,
555            Err(_) => return self.complete_tournament(controller, None),
556        };
557        let action = bracket.start();
558        if let Some(st) = self.tournaments.get_mut(&controller) {
559            st.bracket = Some(bracket);
560        }
561        self.apply_action(controller, action);
562    }
563
564    /// This round's judges all reported: feed the winners to the bracket and act on what comes next.
565    fn finish_round(&mut self, controller: usize) {
566        let winners: Vec<EntrantId> = self
567            .tournaments
568            .get(&controller)
569            .map(|st| st.judge_winners.iter().filter_map(|w| w.clone()).collect())
570            .unwrap_or_default();
571        let action = {
572            let st = match self.tournaments.get_mut(&controller) {
573                Some(st) => st,
574                None => return,
575            };
576            match st.bracket.as_mut() {
577                // A judge that reported no winner shrinks `winners` below the match count, so
578                // `feed_round` errors — we surface that as a tournament with no champion.
579                Some(b) => b.feed_round(winners),
580                None => return,
581            }
582        };
583        match action {
584            Ok(act) => self.apply_action(controller, act),
585            Err(_) => self.complete_tournament(controller, None),
586        }
587    }
588
589    /// Act on a bracket step: spawn the round's judges, or finish with the champion.
590    fn apply_action(&mut self, controller: usize, action: TournamentAction) {
591        match action {
592            TournamentAction::JudgeRound { matches, .. } => self.emit_judges(controller, matches),
593            TournamentAction::Done { winner, .. } => {
594                self.complete_tournament(controller, Some(winner))
595            }
596        }
597    }
598
599    /// Append one judge child per match (bias-resistant `Verify`: read-only, no inherited context),
600    /// each carrying its `JudgeMatch`. The controller's own goal is the judging criterion.
601    fn emit_judges(&mut self, controller: usize, matches: Vec<Match>) {
602        let criterion = self.nodes[controller].task.clone();
603        let trust = self.nodes[controller].trust;
604        let mut judge_nodes = Vec::with_capacity(matches.len());
605        for m in &matches {
606            let judge = WorkflowNode::new(criterion.clone(), AgentRole::Verify).with_trust(trust);
607            let idx = self.append_child(judge);
608            self.child_controller.insert(idx, controller);
609            self.judge_matches.insert(
610                idx,
611                JudgeMatch {
612                    left: m.left.clone(),
613                    right: m.right.clone(),
614                },
615            );
616            judge_nodes.push(idx);
617        }
618        if let Some(st) = self.tournaments.get_mut(&controller) {
619            st.judge_winners = vec![None; judge_nodes.len()];
620            st.judges_remaining = judge_nodes.len();
621            st.judge_nodes = judge_nodes;
622        }
623    }
624
625    /// Resolve the controller: drop its bracket state and `complete` it with the champion's id in
626    /// `tournament_winner`, promoting its dependents.
627    fn complete_tournament(&mut self, controller: usize, winner: Option<EntrantId>) {
628        self.tournaments.remove(&controller);
629        let result = LoopResult {
630            termination: TerminationReason::Completed,
631            final_message: None,
632            turns_used: 0,
633            total_tokens_used: 0,
634            loop_continue: None,
635            classify_branch: None,
636            tournament_winner: winner,
637        };
638        self.graph.complete(controller, result);
639    }
640
641    // ── R3-1: runtime node submission (true loop-until-done / dynamic fan-out) ────────────────────
642
643    /// Append a batch of nodes to the in-flight DAG at runtime — the kernel side of the dynamic
644    /// "submit nodes" capability, generalizing the tournament's [`Self::append_child`]. A running
645    /// node, on completion, can ask for more work to be spawned: unknown-size discovery
646    /// (loop-until-done) and per-item fan-out (e.g. a claim-extractor spawning one verifier per
647    /// claim) both reduce to "append these nodes now".
648    ///
649    /// Each submitted node's `depends_on` is interpreted **batch-relative and backward-only**: index
650    /// `d` refers to the `d`-th node of *this* submission, and only `d < this node's position` is
651    /// honored — so a submission can carry its own internal forward chain (extractor → dependents)
652    /// while forward/self/out-of-range references are dropped rather than stranding the node behind
653    /// an unsatisfiable dependency. Nodes with no (remaining) deps are immediately `Ready`, exactly
654    /// like tournament entrants, and flow through the unchanged gated spawn loop — so quota / depth /
655    /// quarantine apply per node with **no new gate**. Returns the appended node indices (their
656    /// agent ids are the deterministic `wf-node{idx}`).
657    ///
658    /// Pure graph mutation: the caller (state machine) is responsible for routing the trigger
659    /// through `evaluate_syscall` before calling this, keeping the kernel's zero-I/O contract.
660    ///
661    /// G1 no-privilege-escalation: when `submitter` names a [`NodeTrust::Quarantined`] node, every
662    /// node in this submission is coerced to `Quarantined` before append. A quarantined agent read
663    /// untrusted content (which may be adversarial), so the topology it asks for is itself untrusted:
664    /// it must not be able to launch a *trusted* (or write-capable) child and thereby escape its
665    /// sandbox. This is transitive taint — a quarantined origin's descendants inherit quarantine —
666    /// the topological analogue of a process spawned by an untrusted process inheriting its label.
667    /// Trusted (or absent) submitters pass through unchanged. The coercion is enforced here in the
668    /// kernel rather than trusting the SDK, and composes with the spawn-time
669    /// [`Self::quarantine_violation`] gate (a coerced node that also asked for write isolation is
670    /// then denied at spawn).
671    pub fn submit_nodes_from(
672        &mut self,
673        submitter: Option<&str>,
674        mut nodes: Vec<WorkflowNode>,
675    ) -> Vec<usize> {
676        let submitter_quarantined = submitter.is_some_and(|s| self.is_agent_quarantined(s));
677        if submitter_quarantined {
678            for node in &mut nodes {
679                node.trust = NodeTrust::Quarantined;
680            }
681        }
682        self.submit_nodes(nodes)
683    }
684
685    pub fn submit_nodes(&mut self, nodes: Vec<WorkflowNode>) -> Vec<usize> {
686        let base = self.nodes.len();
687        let batch_len = nodes.len();
688        let mut ids = Vec::with_capacity(nodes.len());
689        for (offset, mut node) in nodes.into_iter().enumerate() {
690            let deps: Vec<usize> = node
691                .depends_on
692                .iter()
693                .filter(|&&d| d < offset)
694                .map(|&d| base + d)
695                .collect();
696            node.depends_on = deps.clone();
697            // A#2/G2: a submitted `Classify` node's branch indices are *batch-relative* — they point
698            // at other nodes in this same submission, whose absolute graph index the submitter cannot
699            // know. Remap each branch node index `d` (0-based within the batch) to its absolute index
700            // `base + d`, dropping out-of-range references. Mirrors the `depends_on` batch-relative
701            // convention; without it a runtime-submitted classifier would prune the wrong nodes.
702            if let NodeKind::Classify { branches } = &mut node.kind {
703                for branch in branches.iter_mut() {
704                    branch.nodes = branch
705                        .nodes
706                        .iter()
707                        .filter(|&&d| d < batch_len)
708                        .map(|&d| base + d)
709                        .collect();
710                }
711            }
712            let idx = self.graph.add(node.task.clone(), deps);
713            debug_assert_eq!(idx, self.nodes.len(), "graph/nodes index drift");
714            self.nodes.push(node);
715            ids.push(idx);
716        }
717        ids
718    }
719
720    /// Whether `agent_id` belongs to this workflow.
721    pub fn owns_agent(&self, agent_id: &str) -> bool {
722        self.node_of_agent.contains_key(agent_id)
723    }
724
725    /// R3-3: whether the node behind `agent_id` is `Quarantined` (it read untrusted content). The
726    /// kernel uses this to label that node's output as untrusted-origin when it crosses into the
727    /// trusted parent context — the provenance half of the cross-boundary contract (shaping the
728    /// output into a structured summary stays the SDK's job; the kernel cannot inspect content).
729    pub fn is_agent_quarantined(&self, agent_id: &str) -> bool {
730        self.node_of_agent
731            .get(agent_id)
732            .is_some_and(|&node| matches!(self.nodes[node].trust, NodeTrust::Quarantined))
733    }
734
735    /// The parent session id for this workflow (stamped on each node's manifest).
736    pub fn parent_session_id(&self) -> &str {
737        &self.parent_session_id
738    }
739
740    /// True once the current batch has drained (every spawned node reported back).
741    pub fn batch_drained(&self) -> bool {
742        self.batch.is_empty()
743    }
744
745    /// True once every node is terminal (completed or failed) and nothing is in flight.
746    pub fn is_complete(&self) -> bool {
747        self.graph.all_done() && self.batch.is_empty()
748    }
749
750    /// Outcome at finish: `(completed_agent_ids, failed_agent_ids)` by node. Nodes left
751    /// `Pending`/`Ready` (stalled behind a gated dependency) appear in neither.
752    pub fn outcome(&self) -> (Vec<String>, Vec<String>) {
753        let mut completed = Vec::new();
754        let mut failed = Vec::new();
755        for i in 0..self.graph.len() {
756            match self.graph.get(i).map(|n| n.status) {
757                Some(TaskStatus::Completed) => completed.push(node_agent_id(i)),
758                Some(TaskStatus::Failed) => failed.push(node_agent_id(i)),
759                _ => {}
760            }
761        }
762        (completed, failed)
763    }
764
765    /// #2-B abort: outcome when the workflow is preempted — every node that has not already
766    /// `Completed` counts as `failed` (running / ready / pending all abort). Used to emit a terminal
767    /// `WorkflowCompleted` when an `InterruptNow` tears the whole `WorkflowRun` down.
768    pub fn abort_outcome(&self) -> (Vec<String>, Vec<String>) {
769        let mut completed = Vec::new();
770        let mut failed = Vec::new();
771        for i in 0..self.graph.len() {
772            match self.graph.get(i).map(|n| n.status) {
773                Some(TaskStatus::Completed) => completed.push(node_agent_id(i)),
774                _ => failed.push(node_agent_id(i)),
775            }
776        }
777        (completed, failed)
778    }
779
780    /// Total node count.
781    pub fn len(&self) -> usize {
782        self.graph.len()
783    }
784
785    pub fn is_empty(&self) -> bool {
786        self.graph.is_empty()
787    }
788}
789
790#[cfg(test)]
791mod tests {
792    use super::*;
793    use crate::orchestration::workflow::fanout_synthesize;
794    use crate::types::result::{LoopResult, TerminationReason};
795    use crate::types::task::RuntimeTask;
796
797    fn done() -> LoopResult {
798        LoopResult {
799            termination: TerminationReason::Completed,
800            final_message: None,
801            turns_used: 1,
802            total_tokens_used: 0,
803            loop_continue: None,
804            classify_branch: None,
805            tournament_winner: None,
806        }
807    }
808
809    fn fanout2() -> WorkflowRun {
810        // 2 workers (nodes 0,1) → synthesize (node 2, depends on both)
811        let spec = fanout_synthesize(
812            vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")],
813            RuntimeTask::new("synth"),
814        );
815        WorkflowRun::new(&spec, "parent-sess").unwrap()
816    }
817
818    /// A judge completion reporting its winning entrant id.
819    fn judge_done(winner: &str) -> LoopResult {
820        LoopResult {
821            tournament_winner: Some(winner.to_string()),
822            ..done()
823        }
824    }
825
826    /// Mimic one executor spawn round on a `WorkflowRun`: expand any ready controllers, then mark
827    /// every ready node spawned (mapping its current agent id). Returns the spawned `(node, id)`s.
828    fn spawn_round(run: &mut WorkflowRun) -> Vec<(usize, String)> {
829        run.expand_ready_controllers();
830        let ready = run.ready_batch();
831        let mut out = Vec::new();
832        for node in ready {
833            let id = run.current_agent_id(node);
834            run.mark_spawned(node, &id);
835            out.push((node, id));
836        }
837        out
838    }
839
840    #[test]
841    fn first_batch_is_the_workers() {
842        let run = fanout2();
843        assert_eq!(run.ready_batch(), vec![0, 1]);
844        assert_eq!(run.len(), 3);
845        assert!(!run.is_complete());
846    }
847
848    // ── R3-1: runtime node submission ────────────────────────────────────────────────────────
849
850    #[test]
851    fn submit_nodes_appends_independent_nodes_ready_immediately() {
852        use crate::orchestration::workflow::WorkflowNode;
853        use crate::types::agent::AgentRole;
854
855        let mut run = fanout2(); // nodes 0,1 (workers) → 2 (synth)
856        assert_eq!(run.len(), 3);
857        let ids = run.submit_nodes(vec![
858            WorkflowNode::new(RuntimeTask::new("extra-a"), AgentRole::Implement),
859            WorkflowNode::new(RuntimeTask::new("extra-b"), AgentRole::Implement),
860        ]);
861        assert_eq!(ids, vec![3, 4], "appended after the existing 3 nodes");
862        assert_eq!(run.len(), 5);
863        let ready = run.ready_batch();
864        assert!(
865            ready.contains(&3) && ready.contains(&4),
866            "submitted independent nodes are immediately ready: {ready:?}"
867        );
868    }
869
870    #[test]
871    fn submitted_nodes_must_complete_before_workflow_is_done() {
872        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
873        use crate::types::agent::AgentRole;
874
875        // A single spawn node that, on completion, submits more work (loop-until-done shape).
876        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
877            RuntimeTask::new("root"),
878            AgentRole::Implement,
879        )]);
880        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
881        let id0 = run.current_agent_id(0);
882        run.mark_spawned(0, &id0);
883        run.record_completion(&id0, done());
884        let ids = run.submit_nodes(vec![WorkflowNode::new(
885            RuntimeTask::new("more"),
886            AgentRole::Implement,
887        )]);
888        assert_eq!(ids, vec![1]);
889        assert!(!run.is_complete(), "not complete while the submitted node is pending");
890        let spawned = spawn_round(&mut run);
891        assert_eq!(spawned, vec![(1usize, "wf-node1".to_string())]);
892        run.record_completion("wf-node1", done());
893        assert!(run.is_complete(), "complete once the submitted node finishes");
894    }
895
896    #[test]
897    fn reduce_node_carries_reducer_and_inputs_then_completes_like_a_spawn() {
898        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
899        use crate::types::agent::AgentRole;
900
901        // G2: two fan-out workers feed a deterministic reduce node (dedupe). The reduce node runs no
902        // agent; its descriptor names the reducer + its inputs, and it completes like a spawn.
903        let spec = WorkflowSpec::new(vec![
904            WorkflowNode::new(RuntimeTask::new("worker-a"), AgentRole::Explore),
905            WorkflowNode::new(RuntimeTask::new("worker-b"), AgentRole::Explore),
906            WorkflowNode::new(RuntimeTask::new("merge"), AgentRole::Implement)
907                .with_reduce("dedupe_lines")
908                .with_depends_on(vec![0, 1]),
909        ]);
910        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
911
912        // Only the two workers are ready first (the reduce node waits on both).
913        assert_eq!(run.ready_batch(), vec![0, 1]);
914        for i in [0usize, 1] {
915            let id = run.current_agent_id(i);
916            run.mark_spawned(i, &id);
917            run.record_completion(&id, done());
918        }
919
920        // Now the reduce node is ready; its descriptor carries the reducer name + both input ids.
921        assert_eq!(run.ready_batch(), vec![2]);
922        let info = run.spawn_info(2);
923        assert_eq!(info.reducer.as_deref(), Some("dedupe_lines"));
924        assert_eq!(info.input_agent_ids, vec!["wf-node0".to_string(), "wf-node1".to_string()]);
925
926        // The reduce node's (SDK-computed) result feeds back as an ordinary completion → DAG done.
927        run.mark_spawned(2, "wf-node2");
928        run.record_completion("wf-node2", done());
929        assert!(run.is_complete());
930        let (completed, failed) = run.outcome();
931        assert_eq!(completed, vec!["wf-node0", "wf-node1", "wf-node2"]);
932        assert!(failed.is_empty());
933    }
934
935    #[test]
936    fn output_schema_reaches_the_spawn_descriptor() {
937        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
938        use crate::types::agent::AgentRole;
939
940        // G3: a node declaring an output schema carries it verbatim to the SDK spawn descriptor.
941        let schema = serde_json::json!({
942            "type": "object",
943            "required": ["verdict"],
944            "properties": { "verdict": { "type": "string" } }
945        });
946        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
947            RuntimeTask::new("judge"),
948            AgentRole::Verify,
949        )
950        .with_output_schema(schema.clone())]);
951        let run = WorkflowRun::new(&spec, "sess").unwrap();
952        let info = run.spawn_info(0);
953        assert_eq!(info.output_schema.as_ref(), Some(&schema));
954
955        // Full serde round-trip preserves it (additive ABI).
956        let json = serde_json::to_string(&info).unwrap();
957        let back: WorkflowSpawnInfo = serde_json::from_str(&json).unwrap();
958        assert_eq!(back.output_schema, Some(schema));
959
960        // A node without a schema omits the field entirely on the wire.
961        let plain = WorkflowSpec::new(vec![WorkflowNode::new(
962            RuntimeTask::new("x"),
963            AgentRole::Implement,
964        )]);
965        let plain_info = WorkflowRun::new(&plain, "sess").unwrap().spawn_info(0);
966        assert!(plain_info.output_schema.is_none());
967        assert!(!serde_json::to_string(&plain_info).unwrap().contains("output_schema"));
968    }
969
970    #[test]
971    fn quarantined_submitter_taints_submitted_nodes() {
972        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
973        use crate::types::agent::AgentRole;
974
975        // G1: a quarantined root reads untrusted content, then tries to submit a node it declares
976        // "trusted" (and write-capable). The kernel must coerce that node to quarantined — a
977        // quarantined origin cannot escalate its descendants out of the sandbox.
978        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
979            RuntimeTask::new("read-untrusted"),
980            AgentRole::Explore,
981        )
982        .quarantined()]);
983        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
984        let id0 = run.current_agent_id(0);
985        run.mark_spawned(0, &id0);
986        run.record_completion(&id0, done());
987
988        // Submitted node claims Trusted; the quarantined submitter cannot grant that.
989        let ids = run.submit_nodes_from(
990            Some(&id0),
991            vec![WorkflowNode::new(RuntimeTask::new("act"), AgentRole::Implement)],
992        );
993        assert_eq!(ids, vec![1]);
994        let id1 = run.current_agent_id(1);
995        run.mark_spawned(1, &id1);
996        assert!(
997            run.is_agent_quarantined(&id1),
998            "submitted node inherits the submitter's quarantine (no escalation)"
999        );
1000
1001        // A trusted / unknown submitter does NOT coerce — only quarantined origins taint.
1002        let ids2 = run.submit_nodes_from(
1003            None,
1004            vec![WorkflowNode::new(RuntimeTask::new("trusted-work"), AgentRole::Implement)],
1005        );
1006        let id2 = run.current_agent_id(ids2[0]);
1007        run.mark_spawned(ids2[0], &id2);
1008        assert!(
1009            !run.is_agent_quarantined(&id2),
1010            "no quarantined submitter ⇒ no coercion"
1011        );
1012    }
1013
1014    #[test]
1015    fn submit_nodes_honors_batch_relative_backward_deps() {
1016        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1017        use crate::types::agent::AgentRole;
1018
1019        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1020            RuntimeTask::new("root"),
1021            AgentRole::Implement,
1022        )]);
1023        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1024        let id0 = run.current_agent_id(0);
1025        run.mark_spawned(0, &id0);
1026        run.record_completion(&id0, done());
1027        // [extractor @offset 0, dependent @offset 1 depends on 0].
1028        let ids = run.submit_nodes(vec![
1029            WorkflowNode::new(RuntimeTask::new("extractor"), AgentRole::Implement),
1030            WorkflowNode::new(RuntimeTask::new("dependent"), AgentRole::Implement)
1031                .with_depends_on(vec![0]),
1032        ]);
1033        assert_eq!(ids, vec![1, 2]);
1034        assert_eq!(run.ready_batch(), vec![1], "backward dep keeps the dependent pending");
1035        run.mark_spawned(1, "wf-node1");
1036        run.record_completion("wf-node1", done());
1037        assert_eq!(run.ready_batch(), vec![2], "dependent unblocks after the extractor");
1038    }
1039
1040    #[test]
1041    fn submit_nodes_drops_forward_and_out_of_range_deps() {
1042        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1043        use crate::types::agent::AgentRole;
1044
1045        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1046            RuntimeTask::new("root"),
1047            AgentRole::Implement,
1048        )]);
1049        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1050        // Only dep is a forward/out-of-range ref → dropped, so the node must not be stranded.
1051        let ids = run.submit_nodes(vec![
1052            WorkflowNode::new(RuntimeTask::new("a"), AgentRole::Implement).with_depends_on(vec![5]),
1053        ]);
1054        assert_eq!(ids, vec![1]);
1055        assert!(
1056            run.ready_batch().contains(&1),
1057            "a node whose only dep was dropped is ready, not stranded"
1058        );
1059    }
1060
1061    #[test]
1062    fn submitted_node_can_itself_be_a_loop_control_flow() {
1063        // R3-2: control flow *composes* through dynamic submission — a submitted node can itself be
1064        // a Loop (or Tournament), executing its full control flow. This delivers nested control flow
1065        // without changing `NodeKind::Tournament`'s entrant type: the submitter just hands over a
1066        // node whose `kind` the unchanged completion machinery already honors.
1067        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1068        use crate::types::agent::AgentRole;
1069
1070        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1071            RuntimeTask::new("root"),
1072            AgentRole::Implement,
1073        )]);
1074        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1075        let id0 = run.current_agent_id(0);
1076        run.mark_spawned(0, &id0);
1077        run.record_completion(&id0, done());
1078
1079        // Submit a Loop{2} node mid-run.
1080        let ids = run.submit_nodes(vec![
1081            WorkflowNode::new(RuntimeTask::new("refine"), AgentRole::Implement).with_loop(2),
1082        ]);
1083        assert_eq!(ids, vec![1]);
1084
1085        // It iterates with distinct per-iteration ids, then completes — its control flow runs.
1086        for k in 0..2 {
1087            assert_eq!(run.ready_batch(), vec![1], "submitted loop ready for iteration {k}");
1088            let id = run.current_agent_id(1);
1089            assert_eq!(id, format!("wf-node1-i{k}"), "submitted loop gets per-iteration ids");
1090            run.mark_spawned(1, &id);
1091            run.record_completion(&id, done());
1092        }
1093        assert!(run.is_complete(), "submitted loop ran its 2 iterations then finished");
1094    }
1095
1096    #[test]
1097    fn submitted_tournament_runs_bracket_then_promotes_submitted_dependent() {
1098        // M2: an agent can submit a Tournament *controller* (plus a dependent) at runtime. The
1099        // controller expands into entrant children + a judge via the same bracket machinery, and the
1100        // dependent's batch-relative `depends_on` links it to the submitted controller.
1101        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1102        use crate::types::agent::AgentRole;
1103
1104        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1105            RuntimeTask::new("root"),
1106            AgentRole::Implement,
1107        )]);
1108        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1109        let id0 = run.current_agent_id(0);
1110        run.mark_spawned(0, &id0);
1111        run.record_completion(&id0, done());
1112
1113        // Submit [tournament@batch0, dependent@batch1 depends_on [0]] (batch-relative).
1114        let ids = run.submit_nodes(vec![
1115            WorkflowNode::new(RuntimeTask::new("pick best"), AgentRole::Plan)
1116                .with_tournament(vec![RuntimeTask::new("x"), RuntimeTask::new("y")]),
1117            WorkflowNode::new(RuntimeTask::new("use winner"), AgentRole::Implement)
1118                .with_depends_on(vec![0]),
1119        ]);
1120        assert_eq!(ids, vec![1, 2], "appended controller=1, dependent=2");
1121
1122        // Controller (node 1) expands into 2 entrant children (3,4); spawns no agent of its own.
1123        let entrants = spawn_round(&mut run);
1124        let entrant_nodes: Vec<usize> = entrants.iter().map(|(n, _)| *n).collect();
1125        assert_eq!(entrant_nodes, vec![3, 4], "two entrant children appended after the dependent");
1126        for (_, id) in &entrants {
1127            run.record_completion(id, done());
1128        }
1129
1130        // One judge over the two entrants; dependent (node 2) gated until the bracket resolves.
1131        let r1 = spawn_round(&mut run);
1132        assert_eq!(r1.len(), 1, "one judge for two entrants");
1133        let jm = run.spawn_info(r1[0].0).judge_match.expect("judge carries a match");
1134        assert_eq!(jm, JudgeMatch { left: node_agent_id(3), right: node_agent_id(4) });
1135
1136        // Entrant 3 wins → controller completes with the champion → dependent unblocks.
1137        run.record_completion(&r1[0].1, judge_done(&node_agent_id(3)));
1138        assert_eq!(run.ready_batch(), vec![2], "submitted dependent unblocks after the bracket");
1139        let last = spawn_round(&mut run);
1140        assert_eq!(last, vec![(2, node_agent_id(2))]);
1141        run.record_completion(&last[0].1, done());
1142        assert!(run.is_complete());
1143    }
1144
1145    #[test]
1146    fn submitted_classify_remaps_branch_indices_and_prunes() {
1147        // M2: a submitted Classify node's branch `nodes` are batch-relative; `submit_nodes` remaps
1148        // them to absolute indices so the chosen branch runs and the rest are pruned. Without the
1149        // remap a runtime-submitted classifier would prune the wrong nodes.
1150        use crate::orchestration::workflow::{ClassifyBranch, NodeKind, WorkflowNode, WorkflowSpec};
1151        use crate::types::agent::AgentRole;
1152
1153        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1154            RuntimeTask::new("root"),
1155            AgentRole::Implement,
1156        )]);
1157        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1158        let id0 = run.current_agent_id(0);
1159        run.mark_spawned(0, &id0);
1160        run.record_completion(&id0, done());
1161
1162        // Submit [classify@batch0 (a→[1] b→[2]), branchA@batch1 dep[0], branchB@batch2 dep[0]].
1163        let ids = run.submit_nodes(vec![
1164            WorkflowNode::new(RuntimeTask::new("route"), AgentRole::Plan).with_classify(vec![
1165                ClassifyBranch { label: "a".into(), nodes: vec![1] },
1166                ClassifyBranch { label: "b".into(), nodes: vec![2] },
1167            ]),
1168            WorkflowNode::new(RuntimeTask::new("branch-a"), AgentRole::Implement)
1169                .with_depends_on(vec![0]),
1170            WorkflowNode::new(RuntimeTask::new("branch-b"), AgentRole::Implement)
1171                .with_depends_on(vec![0]),
1172        ]);
1173        assert_eq!(ids, vec![1, 2, 3], "classify=1, branchA=2, branchB=3");
1174
1175        // Branch indices were remapped batch-relative → absolute: a→[2], b→[3].
1176        if let NodeKind::Classify { branches } = &run.nodes[1].kind {
1177            assert_eq!(branches[0].nodes, vec![2], "branch a remapped to absolute node 2");
1178            assert_eq!(branches[1].nodes, vec![3], "branch b remapped to absolute node 3");
1179        } else {
1180            panic!("node 1 should be a classify node");
1181        }
1182
1183        // Classifier picks "a" → branch-a (node 2) runs, branch-b (node 3) is pruned/failed.
1184        let r = spawn_round(&mut run);
1185        assert_eq!(r, vec![(1, node_agent_id(1))], "classifier runs first");
1186        run.record_completion(&r[0].1, LoopResult { classify_branch: Some("a".into()), ..done() });
1187
1188        assert_eq!(run.ready_batch(), vec![2], "only branch a is enabled");
1189        let (_c, failed) = run.outcome();
1190        assert!(failed.contains(&node_agent_id(3)), "branch b pruned/failed");
1191
1192        let last = spawn_round(&mut run);
1193        assert_eq!(last, vec![(2, node_agent_id(2))]);
1194        run.record_completion(&last[0].1, done());
1195        assert!(run.is_complete());
1196        let (completed, _f) = run.outcome();
1197        assert!(completed.contains(&node_agent_id(1)) && completed.contains(&node_agent_id(2)));
1198    }
1199
1200    #[test]
1201    fn loop_node_iterates_with_distinct_ids_then_promotes_dependent() {
1202        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1203        use crate::types::agent::AgentRole;
1204
1205        // node 0 = Loop{3}; node 1 depends on node 0 (must wait for the whole loop).
1206        let spec = WorkflowSpec::new(vec![
1207            WorkflowNode::new(RuntimeTask::new("refine"), AgentRole::Implement).with_loop(3),
1208            WorkflowNode::new(RuntimeTask::new("finalize"), AgentRole::Implement)
1209                .with_depends_on(vec![0]),
1210        ]);
1211        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1212
1213        // Three iterations, each with a distinct agent id; the dependent stays unready throughout.
1214        for k in 0..3 {
1215            assert_eq!(run.ready_batch(), vec![0], "loop node ready for iteration {k}");
1216            let id = run.current_agent_id(0);
1217            assert_eq!(id, format!("wf-node0-i{k}"), "distinct per-iteration id");
1218            run.mark_spawned(0, &id);
1219            assert!(!run.is_complete());
1220            let node = run.record_completion(&id, done()).unwrap();
1221            assert_eq!(node, 0);
1222            if k < 2 {
1223                // Loop continues: node 0 re-armed, dependent NOT yet ready.
1224                assert_eq!(run.ready_batch(), vec![0]);
1225            }
1226        }
1227
1228        // Loop exhausted → node 0 complete → dependent (node 1) becomes ready.
1229        assert_eq!(run.ready_batch(), vec![1], "dependent unblocks only after the loop ends");
1230        let id1 = run.current_agent_id(1);
1231        assert_eq!(id1, "wf-node1", "spawn node keeps the plain id");
1232        run.mark_spawned(1, &id1);
1233        run.record_completion(&id1, done());
1234        assert!(run.is_complete());
1235    }
1236
1237    #[test]
1238    fn synth_becomes_ready_only_after_both_workers() {
1239        let mut run = fanout2();
1240        for &n in &[0usize, 1usize] {
1241            let id = node_agent_id(n);
1242            run.mark_spawned(n, &id);
1243        }
1244        assert!(!run.batch_drained());
1245        // first worker completes → synth not ready yet, batch not drained
1246        assert_eq!(run.record_completion(&node_agent_id(0), done()), Some(0));
1247        assert!(!run.batch_drained());
1248        assert!(run.ready_batch().is_empty());
1249        // second worker completes → batch drained, synth now ready
1250        assert_eq!(run.record_completion(&node_agent_id(1), done()), Some(1));
1251        assert!(run.batch_drained());
1252        assert_eq!(run.ready_batch(), vec![2]);
1253        assert!(!run.is_complete());
1254        // spawn + complete synth → workflow complete
1255        run.mark_spawned(2, &node_agent_id(2));
1256        run.record_completion(&node_agent_id(2), done());
1257        assert!(run.is_complete());
1258    }
1259
1260    #[test]
1261    fn denied_node_blocks_dependents_and_stalls_progress() {
1262        let mut run = fanout2();
1263        // node 0 spawned + completes; node 1 denied by the gate
1264        run.mark_spawned(0, &node_agent_id(0));
1265        run.mark_denied(1);
1266        run.record_completion(&node_agent_id(0), done());
1267        // synth depends on node 1 (failed) → never ready; batch drained, nothing more to run.
1268        // The state machine finishes a workflow on "drained && ready_batch empty" (here true),
1269        // even though `is_complete()` is false (node 2 stays Pending forever).
1270        assert!(run.batch_drained());
1271        assert!(run.ready_batch().is_empty());
1272        assert!(!run.is_complete());
1273    }
1274
1275    #[test]
1276    fn manifest_preserves_node_isolation_and_inheritance() {
1277        let run = fanout2();
1278        let m = run.manifest_for(0);
1279        assert_eq!(m.agent_id.as_str(), "wf-node0");
1280        assert_eq!(m.parent_session_id.as_str(), "parent-sess");
1281        // fanout workers are Explore → ReadOnly + SystemOnly (workflow role_defaults)
1282        assert_eq!(m.isolation, crate::types::agent::AgentIsolation::ReadOnly);
1283        assert_eq!(
1284            m.context_inheritance,
1285            crate::types::agent::ContextInheritance::SystemOnly
1286        );
1287    }
1288
1289    #[test]
1290    fn unknown_agent_completion_is_none() {
1291        let mut run = fanout2();
1292        assert_eq!(run.record_completion("not-a-node", done()), None);
1293    }
1294
1295    #[test]
1296    fn resume_skips_already_completed_nodes() {
1297        // fanout2: workers 0,1 → synth 2. Resume with worker 0 already done.
1298        let spec = fanout_synthesize(
1299            vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")],
1300            RuntimeTask::new("synth"),
1301        );
1302        let run = WorkflowRun::resume(&spec, "sess", &[], &[node_agent_id(0)]).unwrap();
1303        // only the remaining worker (node 1) is ready; node 0 is already complete, synth still gated.
1304        assert_eq!(run.ready_batch(), vec![1]);
1305        assert!(!run.is_complete());
1306    }
1307
1308    #[test]
1309    fn resume_with_all_done_completes() {
1310        let spec = fanout_synthesize(vec![RuntimeTask::new("w0")], RuntimeTask::new("synth"));
1311        // both nodes (worker 0, synth 1) recovered as done.
1312        let run = WorkflowRun::resume(&spec, "sess", &[], &[node_agent_id(0), node_agent_id(1)]).unwrap();
1313        assert!(run.ready_batch().is_empty());
1314        assert!(run.is_complete());
1315    }
1316
1317    #[test]
1318    fn resume_reapplies_submissions_to_reconstruct_appended_nodes() {
1319        // R3-1: a workflow that dynamically appended a node (wf-node1) is resumed by re-applying the
1320        // recorded submission, so the appended node exists again and its completed id matches —
1321        // without this, the appended node (not in the spec) would vanish on resume.
1322        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1323        use crate::types::agent::AgentRole;
1324
1325        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1326            RuntimeTask::new("root"),
1327            AgentRole::Implement,
1328        )]);
1329        let submission = vec![WorkflowNode::new(RuntimeTask::new("discovered"), AgentRole::Implement)];
1330
1331        // root done, submission re-applied, but the appended node not yet completed.
1332        let run = WorkflowRun::resume(&spec, "sess", &[submission.clone()], &[node_agent_id(0)]).unwrap();
1333        assert_eq!(run.len(), 2, "base node + re-applied submitted node");
1334        assert_eq!(run.ready_batch(), vec![1], "the re-applied appended node is the remaining work");
1335        assert!(!run.is_complete());
1336
1337        // both recovered as done → resume finishes.
1338        let run2 =
1339            WorkflowRun::resume(&spec, "sess", &[submission], &[node_agent_id(0), node_agent_id(1)]).unwrap();
1340        assert!(run2.ready_batch().is_empty());
1341        assert!(run2.is_complete());
1342    }
1343
1344    #[test]
1345    fn spawn_info_carries_model_hint_and_trust() {
1346        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1347        use crate::types::agent::AgentRole;
1348
1349        let spec = WorkflowSpec::new(vec![
1350            WorkflowNode::new(RuntimeTask::new("read tickets"), AgentRole::Explore)
1351                .quarantined()
1352                .with_model_hint("haiku"),
1353            WorkflowNode::new(RuntimeTask::new("act"), AgentRole::Implement),
1354        ]);
1355        let run = WorkflowRun::new(&spec, "sess").unwrap();
1356
1357        // W3: quarantined node + W4: model hint both reach the spawn descriptor.
1358        let q = run.spawn_info(0);
1359        assert_eq!(q.trust, "quarantined");
1360        assert_eq!(q.model_hint.as_deref(), Some("haiku"));
1361        // default node is trusted, no model hint.
1362        let t = run.spawn_info(1);
1363        assert_eq!(t.trust, "trusted");
1364        assert_eq!(t.model_hint, None);
1365    }
1366
1367    #[test]
1368    fn spawn_info_carries_loop_and_classify_hints() {
1369        use crate::orchestration::workflow::{ClassifyBranch, WorkflowNode, WorkflowSpec};
1370        use crate::types::agent::AgentRole;
1371
1372        let spec = WorkflowSpec::new(vec![
1373            // 0: loop node → descriptor carries the cap so the SDK knows to solicit `loop_continue`.
1374            WorkflowNode::new(RuntimeTask::new("refine"), AgentRole::Implement).with_loop(3),
1375            // 1: classify node → descriptor carries the branch labels so the SDK can instruct + report.
1376            WorkflowNode::new(RuntimeTask::new("route"), AgentRole::Plan).with_classify(vec![
1377                ClassifyBranch { label: "bug".into(), nodes: vec![] },
1378                ClassifyBranch { label: "feature".into(), nodes: vec![] },
1379            ]),
1380            // 2: plain spawn → neither hint present.
1381            WorkflowNode::new(RuntimeTask::new("act"), AgentRole::Implement),
1382        ]);
1383        let run = WorkflowRun::new(&spec, "sess").unwrap();
1384
1385        let l = run.spawn_info(0);
1386        assert_eq!(l.loop_max_iters, Some(3));
1387        assert!(l.classify_labels.is_empty());
1388        assert_eq!(l.token_budget, None, "no token budget unless set");
1389
1390        let c = run.spawn_info(1);
1391        assert_eq!(c.classify_labels, vec!["bug".to_string(), "feature".to_string()]);
1392        assert_eq!(c.loop_max_iters, None);
1393
1394        let s = run.spawn_info(2);
1395        assert_eq!(s.loop_max_iters, None);
1396        assert!(s.classify_labels.is_empty());
1397    }
1398
1399    #[test]
1400    fn spawn_info_carries_token_budget() {
1401        use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1402        use crate::types::agent::AgentRole;
1403
1404        let spec = WorkflowSpec::new(vec![
1405            WorkflowNode::new(RuntimeTask::new("expensive"), AgentRole::Implement).with_token_budget(10_000),
1406            WorkflowNode::new(RuntimeTask::new("plain"), AgentRole::Implement),
1407        ]);
1408        let run = WorkflowRun::new(&spec, "sess").unwrap();
1409        assert_eq!(run.spawn_info(0).token_budget, Some(10_000));
1410        assert_eq!(run.spawn_info(1).token_budget, None);
1411    }
1412
1413    // ── Tournament node (A#2) ───────────────────────────────────────────────────────────────────
1414
1415    use crate::orchestration::workflow::{NodeKind, WorkflowNode, WorkflowSpec};
1416    use crate::types::agent::AgentRole;
1417
1418    /// A 4-entrant tournament controller (node 0) gating a dependent (node 1). Drives the whole
1419    /// bracket: 4 entrants generate, then 2 round-1 judges, then 1 final judge — and only then does
1420    /// the dependent unblock, carrying the champion in the controller's `tournament_winner`.
1421    #[test]
1422    fn tournament_runs_bracket_then_promotes_dependent() {
1423        let spec = WorkflowSpec::new(vec![
1424            WorkflowNode::new(RuntimeTask::new("pick the best ad"), AgentRole::Plan).with_tournament(
1425                vec![
1426                    RuntimeTask::new("ad A"),
1427                    RuntimeTask::new("ad B"),
1428                    RuntimeTask::new("ad C"),
1429                    RuntimeTask::new("ad D"),
1430                ],
1431            ),
1432            WorkflowNode::new(RuntimeTask::new("ship the winner"), AgentRole::Implement)
1433                .with_depends_on(vec![0]),
1434        ]);
1435        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1436
1437        // Round 1 of spawning expands the controller into 4 entrant children (nodes 2..=5); the
1438        // controller spawns no agent of its own and the dependent stays gated.
1439        let entrants = spawn_round(&mut run);
1440        let entrant_nodes: Vec<usize> = entrants.iter().map(|(n, _)| *n).collect();
1441        assert_eq!(entrant_nodes, vec![2, 3, 4, 5], "4 entrant children, no controller spawn");
1442        assert!(run.spawn_info(2).judge_match.is_none(), "entrants are not judges");
1443        assert!(!run.is_complete());
1444
1445        // All entrants generate → bracket begins; nothing else spawns until they're all in.
1446        for (i, (node, id)) in entrants.iter().enumerate() {
1447            run.record_completion(id, done());
1448            if i < 3 {
1449                assert!(run.ready_batch().is_empty(), "no judges until every entrant is in");
1450            }
1451            let _ = node;
1452        }
1453
1454        // Round 1 judges: 2 matches over the 4 entrants, each carrying its pair.
1455        let r1 = spawn_round(&mut run);
1456        assert_eq!(r1.len(), 2, "two round-1 judges");
1457        let jm0 = run.spawn_info(r1[0].0).judge_match.expect("judge carries a match");
1458        assert_eq!(jm0, JudgeMatch { left: node_agent_id(2), right: node_agent_id(3) });
1459        let jm1 = run.spawn_info(r1[1].0).judge_match.expect("judge carries a match");
1460        assert_eq!(jm1, JudgeMatch { left: node_agent_id(4), right: node_agent_id(5) });
1461
1462        // Entrant 2 beats 3; entrant 4 beats 5. Dependent still gated mid-bracket.
1463        run.record_completion(&r1[0].1, judge_done(&node_agent_id(2)));
1464        run.record_completion(&r1[1].1, judge_done(&node_agent_id(4)));
1465        assert!(run.ready_batch().iter().all(|&n| n != 1), "dependent gated until the final");
1466
1467        // Final round: a single judge over the two survivors.
1468        let r2 = spawn_round(&mut run);
1469        assert_eq!(r2.len(), 1, "one final judge");
1470        let jmf = run.spawn_info(r2[0].0).judge_match.expect("final judge carries a match");
1471        assert_eq!(jmf, JudgeMatch { left: node_agent_id(2), right: node_agent_id(4) });
1472
1473        // Entrant 4 wins it all → controller completes with the champion, dependent unblocks.
1474        run.record_completion(&r2[0].1, judge_done(&node_agent_id(4)));
1475        let winner = run
1476            .graph
1477            .get(0)
1478            .and_then(|n| n.result.as_ref())
1479            .and_then(|r| r.tournament_winner.clone());
1480        assert_eq!(winner.as_deref(), Some(node_agent_id(4).as_str()), "champion recorded");
1481        assert_eq!(run.ready_batch(), vec![1], "dependent unblocks only after the bracket resolves");
1482
1483        // Ship the winner → workflow complete.
1484        let last = spawn_round(&mut run);
1485        assert_eq!(last, vec![(1, node_agent_id(1))]);
1486        run.record_completion(&last[0].1, done());
1487        assert!(run.is_complete());
1488    }
1489
1490    /// An odd entrant count gives one entrant a bye in round 1 (no judge for it), and the bracket
1491    /// still resolves to a single champion.
1492    #[test]
1493    fn tournament_with_bye_resolves() {
1494        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1495            RuntimeTask::new("rank"),
1496            AgentRole::Plan,
1497        )
1498        .with_tournament(vec![
1499            RuntimeTask::new("x"),
1500            RuntimeTask::new("y"),
1501            RuntimeTask::new("z"),
1502        ])]);
1503        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1504
1505        let entrants = spawn_round(&mut run); // nodes 1,2,3
1506        assert_eq!(entrants.len(), 3);
1507        for (_, id) in &entrants {
1508            run.record_completion(id, done());
1509        }
1510        // Round 1: only (entrant1, entrant2) plays; entrant3 draws a bye.
1511        let r1 = spawn_round(&mut run);
1512        assert_eq!(r1.len(), 1, "one match, one bye");
1513        run.record_completion(&r1[0].1, judge_done(&node_agent_id(1)));
1514        // Round 2: survivor of the match vs the bye entrant.
1515        let r2 = spawn_round(&mut run);
1516        assert_eq!(r2.len(), 1);
1517        let jm = run.spawn_info(r2[0].0).judge_match.unwrap();
1518        assert_eq!(jm, JudgeMatch { left: node_agent_id(1), right: node_agent_id(3) });
1519        run.record_completion(&r2[0].1, judge_done(&node_agent_id(3)));
1520        let winner = run.graph.get(0).and_then(|n| n.result.as_ref()).and_then(|r| r.tournament_winner.clone());
1521        assert_eq!(winner.as_deref(), Some(node_agent_id(3).as_str()));
1522        assert!(run.is_complete());
1523    }
1524
1525    /// A quarantined tournament keeps its entrant + judge children quarantined, and (being
1526    /// read-only) they pass the quarantine invariant rather than tripping it.
1527    #[test]
1528    fn tournament_children_inherit_controller_trust() {
1529        let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1530            RuntimeTask::new("judge untrusted inputs"),
1531            AgentRole::Plan,
1532        )
1533        .quarantined()
1534        .with_tournament(vec![RuntimeTask::new("a"), RuntimeTask::new("b")])]);
1535        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1536
1537        let entrants = spawn_round(&mut run);
1538        for (node, _) in &entrants {
1539            assert_eq!(run.spawn_info(*node).trust, "quarantined", "entrant inherits quarantine");
1540            assert!(!run.quarantine_violation(*node), "read-only entrant is quarantine-clean");
1541        }
1542        for (_, id) in &entrants {
1543            run.record_completion(id, done());
1544        }
1545        let r1 = spawn_round(&mut run);
1546        assert_eq!(run.spawn_info(r1[0].0).trust, "quarantined", "judge inherits quarantine");
1547        assert!(!run.quarantine_violation(r1[0].0));
1548    }
1549
1550    /// Sanity: the controller node is itself a Tournament kind and never appears in a spawn batch
1551    /// (entrants/judges carry the work).
1552    #[test]
1553    fn tournament_controller_never_spawns_itself() {
1554        let spec = WorkflowSpec::new(vec![WorkflowNode::new(RuntimeTask::new("c"), AgentRole::Plan)
1555            .with_tournament(vec![RuntimeTask::new("a"), RuntimeTask::new("b")])]);
1556        let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1557        assert!(matches!(run.nodes[0].kind, NodeKind::Tournament { .. }));
1558        let first = spawn_round(&mut run);
1559        assert!(first.iter().all(|(n, _)| *n != 0), "controller node 0 never spawns directly");
1560    }
1561}