1use std::collections::HashMap;
13
14use serde::{Deserialize, Serialize};
15
16use crate::orchestration::task_graph::{TaskGraph, TaskStatus};
17use crate::orchestration::tournament::{EntrantId, Match, Tournament, TournamentAction};
18use super::{NodeKind, NodeTrust, WorkflowNode, WorkflowSpec};
19use crate::types::agent::{AgentIsolation, AgentRole, ContextInheritance, IsolationManifest};
20use crate::types::error::Result;
21use crate::types::result::{LoopResult, TerminationReason};
22
23pub fn node_agent_id(node: usize) -> String {
25 format!("wf-node{node}")
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
33pub struct WorkflowSpawnInfo {
34 pub agent_id: String,
35 pub goal: String,
36 pub role: String,
37 pub isolation: String,
38 pub context_inheritance: String,
39 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub model_hint: Option<String>,
41 #[serde(default = "default_trust")]
44 pub trust: String,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub output_schema: Option<serde_json::Value>,
50 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub reducer: Option<String>,
55 #[serde(default, skip_serializing_if = "Vec::is_empty")]
58 pub input_agent_ids: Vec<String>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub judge_match: Option<JudgeMatch>,
65 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub loop_max_iters: Option<usize>,
72 #[serde(default, skip_serializing_if = "Vec::is_empty")]
77 pub classify_labels: Vec<String>,
78 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub token_budget: Option<u64>,
83}
84
85fn default_trust() -> String {
86 "trusted".to_string()
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
93pub struct JudgeMatch {
94 pub left: String,
95 pub right: String,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
104pub struct WorkflowBudget {
105 pub nodes_used: usize,
107 #[serde(default, skip_serializing_if = "Option::is_none")]
109 pub nodes_max: Option<usize>,
110 #[serde(default, skip_serializing_if = "Option::is_none")]
113 pub nodes_remaining: Option<usize>,
114 pub running_subagents: usize,
116 #[serde(default, skip_serializing_if = "Option::is_none")]
118 pub max_concurrent_subagents: Option<usize>,
119 #[serde(default, skip_serializing_if = "Option::is_none")]
122 pub concurrency_remaining: Option<usize>,
123 #[serde(default)]
126 pub tokens_used: u64,
127 #[serde(default, skip_serializing_if = "Option::is_none")]
129 pub tokens_max: Option<u64>,
130 #[serde(default, skip_serializing_if = "Option::is_none")]
134 pub tokens_remaining: Option<u64>,
135}
136
137fn role_label(role: AgentRole) -> &'static str {
138 match role {
139 AgentRole::Explore => "explore",
140 AgentRole::Plan => "plan",
141 AgentRole::Implement => "implement",
142 AgentRole::Verify => "verify",
143 AgentRole::Custom => "custom",
144 }
145}
146
147fn isolation_label(isolation: AgentIsolation) -> &'static str {
148 match isolation {
149 AgentIsolation::Shared => "shared",
150 AgentIsolation::ReadOnly => "read_only",
151 AgentIsolation::Worktree => "worktree",
152 AgentIsolation::Remote => "remote",
153 }
154}
155
156fn inheritance_label(inheritance: ContextInheritance) -> &'static str {
157 match inheritance {
158 ContextInheritance::None => "none",
159 ContextInheritance::SystemOnly => "system_only",
160 ContextInheritance::Full => "full",
161 }
162}
163
164fn trust_label(trust: NodeTrust) -> &'static str {
165 match trust {
166 NodeTrust::Trusted => "trusted",
167 NodeTrust::Quarantined => "quarantined",
168 }
169}
170
171fn resumed_result() -> LoopResult {
173 LoopResult {
174 termination: crate::types::result::TerminationReason::Completed,
175 final_message: None,
176 turns_used: 0,
177 total_tokens_used: 0,
178 loop_continue: None,
179 classify_branch: None,
180 tournament_winner: None,
181 }
182}
183
184struct TournamentState {
188 entrant_nodes: Vec<usize>,
190 entrants_remaining: usize,
192 bracket: Option<Tournament>,
194 judge_nodes: Vec<usize>,
196 judge_winners: Vec<Option<EntrantId>>,
198 judges_remaining: usize,
200}
201
202pub struct WorkflowRun {
204 graph: TaskGraph,
205 nodes: Vec<WorkflowNode>,
206 parent_session_id: String,
208 node_of_agent: HashMap<String, usize>,
210 batch: Vec<usize>,
212 iter_counts: HashMap<usize, usize>,
215 tournaments: HashMap<usize, TournamentState>,
217 child_controller: HashMap<usize, usize>,
219 judge_matches: HashMap<usize, JudgeMatch>,
221}
222
223impl WorkflowRun {
224 pub fn new(spec: &WorkflowSpec, parent_session_id: &str) -> Result<Self> {
226 spec.validate()?;
227 Ok(Self {
228 graph: spec.to_task_graph()?,
229 nodes: spec.nodes.clone(),
230 parent_session_id: parent_session_id.to_string(),
231 node_of_agent: HashMap::new(),
232 batch: Vec::new(),
233 iter_counts: HashMap::new(),
234 tournaments: HashMap::new(),
235 child_controller: HashMap::new(),
236 judge_matches: HashMap::new(),
237 })
238 }
239
240 pub fn resume(
252 spec: &WorkflowSpec,
253 parent_session_id: &str,
254 submissions: &[Vec<WorkflowNode>],
255 completed: &[String],
256 ) -> Result<Self> {
257 let mut run = Self::new(spec, parent_session_id)?;
258 for batch in submissions {
259 run.submit_nodes(batch.clone());
260 }
261 let n = run.graph.len();
262 for id in completed {
263 if let Some(node) = (0..n).find(|&i| node_agent_id(i) == *id) {
264 run.graph.start(node);
265 run.graph.complete(node, resumed_result());
266 }
267 }
268 Ok(run)
269 }
270
271 pub fn ready_batch(&self) -> Vec<usize> {
273 self.graph.ready_tasks()
274 }
275
276 pub fn current_agent_id(&self, node: usize) -> String {
281 match self.nodes[node].kind {
282 NodeKind::Loop { .. } => {
283 let k = self.iter_counts.get(&node).copied().unwrap_or(0);
284 format!("{}-i{k}", node_agent_id(node))
285 }
286 NodeKind::Spawn
290 | NodeKind::Classify { .. }
291 | NodeKind::Tournament { .. }
292 | NodeKind::Reduce { .. } => node_agent_id(node),
293 }
294 }
295
296 pub fn manifest_for(&self, node: usize) -> IsolationManifest {
300 let n = &self.nodes[node];
301 IsolationManifest {
302 agent_id: self.current_agent_id(node).into(),
303 parent_session_id: self.parent_session_id.as_str().into(),
304 role: n.role,
305 isolation: n.isolation,
306 context_inheritance: n.context_inheritance,
307 permitted_capability_ids: Vec::new(),
308 }
309 }
310
311 pub fn goal_of(&self, node: usize) -> &str {
313 &self.nodes[node].task.goal
314 }
315
316 pub fn quarantine_violation(&self, node: usize) -> bool {
321 let n = &self.nodes[node];
322 matches!(n.trust, NodeTrust::Quarantined)
323 && !matches!(n.isolation, AgentIsolation::ReadOnly)
324 }
325
326 pub fn spawn_info(&self, node: usize) -> WorkflowSpawnInfo {
330 let n = &self.nodes[node];
331 let (reducer, input_agent_ids) = match &n.kind {
334 NodeKind::Reduce { reducer } => (
335 Some(reducer.clone()),
336 n.depends_on.iter().map(|&d| node_agent_id(d)).collect(),
337 ),
338 _ => (None, Vec::new()),
339 };
340 let loop_max_iters = match &n.kind {
344 NodeKind::Loop { max_iters } => Some(*max_iters),
345 _ => None,
346 };
347 let classify_labels = match &n.kind {
348 NodeKind::Classify { branches } => branches.iter().map(|b| b.label.clone()).collect(),
349 _ => Vec::new(),
350 };
351 WorkflowSpawnInfo {
352 agent_id: self.current_agent_id(node),
353 goal: n.task.goal.clone(),
354 role: role_label(n.role).to_string(),
355 isolation: isolation_label(n.isolation).to_string(),
356 context_inheritance: inheritance_label(n.context_inheritance).to_string(),
357 model_hint: n.model_hint.clone(),
358 trust: trust_label(n.trust).to_string(),
359 output_schema: n.output_schema.clone(),
360 reducer,
361 input_agent_ids,
362 judge_match: self.judge_matches.get(&node).cloned(),
363 loop_max_iters,
364 classify_labels,
365 token_budget: n.token_budget,
366 }
367 }
368
369 pub fn mark_spawned(&mut self, node: usize, agent_id: &str) {
372 self.graph.start(node);
373 self.batch.push(node);
374 self.node_of_agent.insert(agent_id.to_string(), node);
375 }
376
377 pub fn mark_denied(&mut self, node: usize) {
380 self.graph.fail(node);
381 }
382
383 pub fn record_completion(&mut self, agent_id: &str, result: LoopResult) -> Option<usize> {
391 let node = *self.node_of_agent.get(agent_id)?;
392 self.batch.retain(|&n| n != node);
393
394 if let Some(&controller) = self.child_controller.get(&node) {
397 return self.advance_tournament(controller, node, result);
398 }
399
400 match &self.nodes[node].kind {
401 NodeKind::Loop { max_iters } => {
402 let max_iters = *max_iters;
405 let stop_requested = result.loop_continue == Some(false);
406 let done = self.iter_counts.entry(node).or_insert(0);
407 *done += 1;
408 if *done < max_iters && !stop_requested {
409 self.graph.set_ready(node);
411 return Some(node);
412 }
413 }
414 NodeKind::Classify { branches } => {
415 let chosen = result.classify_branch.clone();
419 let prune: Vec<usize> = branches
420 .iter()
421 .filter(|b| Some(&b.label) != chosen.as_ref())
422 .flat_map(|b| b.nodes.iter().copied())
423 .collect();
424 for bn in prune {
425 self.graph.fail(bn);
426 }
427 }
428 NodeKind::Spawn | NodeKind::Tournament { .. } | NodeKind::Reduce { .. } => {}
432 }
433
434 if matches!(result.termination, crate::types::result::TerminationReason::Error) {
441 self.graph.fail(node);
442 } else {
443 self.graph.complete(node, result);
444 }
445 Some(node)
446 }
447
448 fn append_child(&mut self, node: WorkflowNode) -> usize {
454 let idx = self.graph.add(node.task.clone(), Vec::new());
455 debug_assert_eq!(idx, self.nodes.len(), "graph/nodes index drift");
456 self.nodes.push(node);
457 idx
458 }
459
460 pub fn expand_ready_controllers(&mut self) {
465 let pending: Vec<usize> = (0..self.nodes.len())
466 .filter(|i| !self.tournaments.contains_key(i))
467 .filter(|&i| matches!(self.nodes[i].kind, NodeKind::Tournament { .. }))
468 .filter(|&i| self.graph.get(i).map(|n| n.status) == Some(TaskStatus::Ready))
469 .collect();
470 for c in pending {
471 self.expand_tournament(c);
472 }
473 }
474
475 fn expand_tournament(&mut self, c: usize) {
478 let entrants = match &self.nodes[c].kind {
479 NodeKind::Tournament { entrants } => entrants.clone(),
480 _ => return,
481 };
482 let trust = self.nodes[c].trust;
483 self.graph.start(c);
485 let mut entrant_nodes = Vec::with_capacity(entrants.len());
486 for task in entrants {
487 let child = WorkflowNode::new(task, AgentRole::Custom)
488 .with_isolation(AgentIsolation::ReadOnly)
489 .with_trust(trust);
490 let idx = self.append_child(child);
491 self.child_controller.insert(idx, c);
492 entrant_nodes.push(idx);
493 }
494 let entrants_remaining = entrant_nodes.len();
495 self.tournaments.insert(
496 c,
497 TournamentState {
498 entrant_nodes,
499 entrants_remaining,
500 bracket: None,
501 judge_nodes: Vec::new(),
502 judge_winners: Vec::new(),
503 judges_remaining: 0,
504 },
505 );
506 }
507
508 fn advance_tournament(
511 &mut self,
512 controller: usize,
513 child: usize,
514 result: LoopResult,
515 ) -> Option<usize> {
516 self.graph.complete(child, result.clone());
518
519 let in_entrant_phase = self.tournaments.get(&controller)?.bracket.is_none();
520 if in_entrant_phase {
521 let all_in = {
522 let st = self.tournaments.get_mut(&controller)?;
523 st.entrants_remaining = st.entrants_remaining.saturating_sub(1);
524 st.entrants_remaining == 0
525 };
526 if all_in {
527 self.begin_bracket(controller);
528 }
529 } else {
530 let round_done = {
531 let st = self.tournaments.get_mut(&controller)?;
532 if let Some(pos) = st.judge_nodes.iter().position(|&n| n == child) {
533 st.judge_winners[pos] = result.tournament_winner.clone();
534 }
535 st.judges_remaining = st.judges_remaining.saturating_sub(1);
536 st.judges_remaining == 0
537 };
538 if round_done {
539 self.finish_round(controller);
540 }
541 }
542 Some(controller)
543 }
544
545 fn begin_bracket(&mut self, controller: usize) {
547 let entrant_ids: Vec<EntrantId> = self
548 .tournaments
549 .get(&controller)
550 .map(|st| st.entrant_nodes.iter().map(|&n| node_agent_id(n)).collect())
551 .unwrap_or_default();
552 let mut bracket = match Tournament::new(entrant_ids) {
554 Ok(b) => b,
555 Err(_) => return self.complete_tournament(controller, None),
556 };
557 let action = bracket.start();
558 if let Some(st) = self.tournaments.get_mut(&controller) {
559 st.bracket = Some(bracket);
560 }
561 self.apply_action(controller, action);
562 }
563
564 fn finish_round(&mut self, controller: usize) {
566 let winners: Vec<EntrantId> = self
567 .tournaments
568 .get(&controller)
569 .map(|st| st.judge_winners.iter().filter_map(|w| w.clone()).collect())
570 .unwrap_or_default();
571 let action = {
572 let st = match self.tournaments.get_mut(&controller) {
573 Some(st) => st,
574 None => return,
575 };
576 match st.bracket.as_mut() {
577 Some(b) => b.feed_round(winners),
580 None => return,
581 }
582 };
583 match action {
584 Ok(act) => self.apply_action(controller, act),
585 Err(_) => self.complete_tournament(controller, None),
586 }
587 }
588
589 fn apply_action(&mut self, controller: usize, action: TournamentAction) {
591 match action {
592 TournamentAction::JudgeRound { matches, .. } => self.emit_judges(controller, matches),
593 TournamentAction::Done { winner, .. } => {
594 self.complete_tournament(controller, Some(winner))
595 }
596 }
597 }
598
599 fn emit_judges(&mut self, controller: usize, matches: Vec<Match>) {
602 let criterion = self.nodes[controller].task.clone();
603 let trust = self.nodes[controller].trust;
604 let mut judge_nodes = Vec::with_capacity(matches.len());
605 for m in &matches {
606 let judge = WorkflowNode::new(criterion.clone(), AgentRole::Verify).with_trust(trust);
607 let idx = self.append_child(judge);
608 self.child_controller.insert(idx, controller);
609 self.judge_matches.insert(
610 idx,
611 JudgeMatch {
612 left: m.left.clone(),
613 right: m.right.clone(),
614 },
615 );
616 judge_nodes.push(idx);
617 }
618 if let Some(st) = self.tournaments.get_mut(&controller) {
619 st.judge_winners = vec![None; judge_nodes.len()];
620 st.judges_remaining = judge_nodes.len();
621 st.judge_nodes = judge_nodes;
622 }
623 }
624
625 fn complete_tournament(&mut self, controller: usize, winner: Option<EntrantId>) {
628 self.tournaments.remove(&controller);
629 let result = LoopResult {
630 termination: TerminationReason::Completed,
631 final_message: None,
632 turns_used: 0,
633 total_tokens_used: 0,
634 loop_continue: None,
635 classify_branch: None,
636 tournament_winner: winner,
637 };
638 self.graph.complete(controller, result);
639 }
640
641 pub fn submit_nodes_from(
672 &mut self,
673 submitter: Option<&str>,
674 mut nodes: Vec<WorkflowNode>,
675 ) -> Vec<usize> {
676 let submitter_quarantined = submitter.is_some_and(|s| self.is_agent_quarantined(s));
677 if submitter_quarantined {
678 for node in &mut nodes {
679 node.trust = NodeTrust::Quarantined;
680 }
681 }
682 self.submit_nodes(nodes)
683 }
684
685 pub fn submit_nodes(&mut self, nodes: Vec<WorkflowNode>) -> Vec<usize> {
686 let base = self.nodes.len();
687 let batch_len = nodes.len();
688 let mut ids = Vec::with_capacity(nodes.len());
689 for (offset, mut node) in nodes.into_iter().enumerate() {
690 let deps: Vec<usize> = node
691 .depends_on
692 .iter()
693 .filter(|&&d| d < offset)
694 .map(|&d| base + d)
695 .collect();
696 node.depends_on = deps.clone();
697 if let NodeKind::Classify { branches } = &mut node.kind {
703 for branch in branches.iter_mut() {
704 branch.nodes = branch
705 .nodes
706 .iter()
707 .filter(|&&d| d < batch_len)
708 .map(|&d| base + d)
709 .collect();
710 }
711 }
712 let idx = self.graph.add(node.task.clone(), deps);
713 debug_assert_eq!(idx, self.nodes.len(), "graph/nodes index drift");
714 self.nodes.push(node);
715 ids.push(idx);
716 }
717 ids
718 }
719
720 pub fn owns_agent(&self, agent_id: &str) -> bool {
722 self.node_of_agent.contains_key(agent_id)
723 }
724
725 pub fn is_agent_quarantined(&self, agent_id: &str) -> bool {
730 self.node_of_agent
731 .get(agent_id)
732 .is_some_and(|&node| matches!(self.nodes[node].trust, NodeTrust::Quarantined))
733 }
734
735 pub fn parent_session_id(&self) -> &str {
737 &self.parent_session_id
738 }
739
740 pub fn batch_drained(&self) -> bool {
742 self.batch.is_empty()
743 }
744
745 pub fn is_complete(&self) -> bool {
747 self.graph.all_done() && self.batch.is_empty()
748 }
749
750 pub fn outcome(&self) -> (Vec<String>, Vec<String>) {
753 let mut completed = Vec::new();
754 let mut failed = Vec::new();
755 for i in 0..self.graph.len() {
756 match self.graph.get(i).map(|n| n.status) {
757 Some(TaskStatus::Completed) => completed.push(node_agent_id(i)),
758 Some(TaskStatus::Failed) => failed.push(node_agent_id(i)),
759 _ => {}
760 }
761 }
762 (completed, failed)
763 }
764
765 pub fn abort_outcome(&self) -> (Vec<String>, Vec<String>) {
769 let mut completed = Vec::new();
770 let mut failed = Vec::new();
771 for i in 0..self.graph.len() {
772 match self.graph.get(i).map(|n| n.status) {
773 Some(TaskStatus::Completed) => completed.push(node_agent_id(i)),
774 _ => failed.push(node_agent_id(i)),
775 }
776 }
777 (completed, failed)
778 }
779
780 pub fn len(&self) -> usize {
782 self.graph.len()
783 }
784
785 pub fn is_empty(&self) -> bool {
786 self.graph.is_empty()
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793 use crate::orchestration::workflow::fanout_synthesize;
794 use crate::types::result::{LoopResult, TerminationReason};
795 use crate::types::task::RuntimeTask;
796
797 fn done() -> LoopResult {
798 LoopResult {
799 termination: TerminationReason::Completed,
800 final_message: None,
801 turns_used: 1,
802 total_tokens_used: 0,
803 loop_continue: None,
804 classify_branch: None,
805 tournament_winner: None,
806 }
807 }
808
809 fn fanout2() -> WorkflowRun {
810 let spec = fanout_synthesize(
812 vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")],
813 RuntimeTask::new("synth"),
814 );
815 WorkflowRun::new(&spec, "parent-sess").unwrap()
816 }
817
818 fn judge_done(winner: &str) -> LoopResult {
820 LoopResult {
821 tournament_winner: Some(winner.to_string()),
822 ..done()
823 }
824 }
825
826 fn spawn_round(run: &mut WorkflowRun) -> Vec<(usize, String)> {
829 run.expand_ready_controllers();
830 let ready = run.ready_batch();
831 let mut out = Vec::new();
832 for node in ready {
833 let id = run.current_agent_id(node);
834 run.mark_spawned(node, &id);
835 out.push((node, id));
836 }
837 out
838 }
839
840 #[test]
841 fn first_batch_is_the_workers() {
842 let run = fanout2();
843 assert_eq!(run.ready_batch(), vec![0, 1]);
844 assert_eq!(run.len(), 3);
845 assert!(!run.is_complete());
846 }
847
848 #[test]
851 fn submit_nodes_appends_independent_nodes_ready_immediately() {
852 use crate::orchestration::workflow::WorkflowNode;
853 use crate::types::agent::AgentRole;
854
855 let mut run = fanout2(); assert_eq!(run.len(), 3);
857 let ids = run.submit_nodes(vec![
858 WorkflowNode::new(RuntimeTask::new("extra-a"), AgentRole::Implement),
859 WorkflowNode::new(RuntimeTask::new("extra-b"), AgentRole::Implement),
860 ]);
861 assert_eq!(ids, vec![3, 4], "appended after the existing 3 nodes");
862 assert_eq!(run.len(), 5);
863 let ready = run.ready_batch();
864 assert!(
865 ready.contains(&3) && ready.contains(&4),
866 "submitted independent nodes are immediately ready: {ready:?}"
867 );
868 }
869
870 #[test]
871 fn submitted_nodes_must_complete_before_workflow_is_done() {
872 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
873 use crate::types::agent::AgentRole;
874
875 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
877 RuntimeTask::new("root"),
878 AgentRole::Implement,
879 )]);
880 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
881 let id0 = run.current_agent_id(0);
882 run.mark_spawned(0, &id0);
883 run.record_completion(&id0, done());
884 let ids = run.submit_nodes(vec![WorkflowNode::new(
885 RuntimeTask::new("more"),
886 AgentRole::Implement,
887 )]);
888 assert_eq!(ids, vec![1]);
889 assert!(!run.is_complete(), "not complete while the submitted node is pending");
890 let spawned = spawn_round(&mut run);
891 assert_eq!(spawned, vec![(1usize, "wf-node1".to_string())]);
892 run.record_completion("wf-node1", done());
893 assert!(run.is_complete(), "complete once the submitted node finishes");
894 }
895
896 #[test]
897 fn reduce_node_carries_reducer_and_inputs_then_completes_like_a_spawn() {
898 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
899 use crate::types::agent::AgentRole;
900
901 let spec = WorkflowSpec::new(vec![
904 WorkflowNode::new(RuntimeTask::new("worker-a"), AgentRole::Explore),
905 WorkflowNode::new(RuntimeTask::new("worker-b"), AgentRole::Explore),
906 WorkflowNode::new(RuntimeTask::new("merge"), AgentRole::Implement)
907 .with_reduce("dedupe_lines")
908 .with_depends_on(vec![0, 1]),
909 ]);
910 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
911
912 assert_eq!(run.ready_batch(), vec![0, 1]);
914 for i in [0usize, 1] {
915 let id = run.current_agent_id(i);
916 run.mark_spawned(i, &id);
917 run.record_completion(&id, done());
918 }
919
920 assert_eq!(run.ready_batch(), vec![2]);
922 let info = run.spawn_info(2);
923 assert_eq!(info.reducer.as_deref(), Some("dedupe_lines"));
924 assert_eq!(info.input_agent_ids, vec!["wf-node0".to_string(), "wf-node1".to_string()]);
925
926 run.mark_spawned(2, "wf-node2");
928 run.record_completion("wf-node2", done());
929 assert!(run.is_complete());
930 let (completed, failed) = run.outcome();
931 assert_eq!(completed, vec!["wf-node0", "wf-node1", "wf-node2"]);
932 assert!(failed.is_empty());
933 }
934
935 #[test]
936 fn output_schema_reaches_the_spawn_descriptor() {
937 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
938 use crate::types::agent::AgentRole;
939
940 let schema = serde_json::json!({
942 "type": "object",
943 "required": ["verdict"],
944 "properties": { "verdict": { "type": "string" } }
945 });
946 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
947 RuntimeTask::new("judge"),
948 AgentRole::Verify,
949 )
950 .with_output_schema(schema.clone())]);
951 let run = WorkflowRun::new(&spec, "sess").unwrap();
952 let info = run.spawn_info(0);
953 assert_eq!(info.output_schema.as_ref(), Some(&schema));
954
955 let json = serde_json::to_string(&info).unwrap();
957 let back: WorkflowSpawnInfo = serde_json::from_str(&json).unwrap();
958 assert_eq!(back.output_schema, Some(schema));
959
960 let plain = WorkflowSpec::new(vec![WorkflowNode::new(
962 RuntimeTask::new("x"),
963 AgentRole::Implement,
964 )]);
965 let plain_info = WorkflowRun::new(&plain, "sess").unwrap().spawn_info(0);
966 assert!(plain_info.output_schema.is_none());
967 assert!(!serde_json::to_string(&plain_info).unwrap().contains("output_schema"));
968 }
969
970 #[test]
971 fn quarantined_submitter_taints_submitted_nodes() {
972 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
973 use crate::types::agent::AgentRole;
974
975 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
979 RuntimeTask::new("read-untrusted"),
980 AgentRole::Explore,
981 )
982 .quarantined()]);
983 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
984 let id0 = run.current_agent_id(0);
985 run.mark_spawned(0, &id0);
986 run.record_completion(&id0, done());
987
988 let ids = run.submit_nodes_from(
990 Some(&id0),
991 vec![WorkflowNode::new(RuntimeTask::new("act"), AgentRole::Implement)],
992 );
993 assert_eq!(ids, vec![1]);
994 let id1 = run.current_agent_id(1);
995 run.mark_spawned(1, &id1);
996 assert!(
997 run.is_agent_quarantined(&id1),
998 "submitted node inherits the submitter's quarantine (no escalation)"
999 );
1000
1001 let ids2 = run.submit_nodes_from(
1003 None,
1004 vec![WorkflowNode::new(RuntimeTask::new("trusted-work"), AgentRole::Implement)],
1005 );
1006 let id2 = run.current_agent_id(ids2[0]);
1007 run.mark_spawned(ids2[0], &id2);
1008 assert!(
1009 !run.is_agent_quarantined(&id2),
1010 "no quarantined submitter ⇒ no coercion"
1011 );
1012 }
1013
1014 #[test]
1015 fn submit_nodes_honors_batch_relative_backward_deps() {
1016 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1017 use crate::types::agent::AgentRole;
1018
1019 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1020 RuntimeTask::new("root"),
1021 AgentRole::Implement,
1022 )]);
1023 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1024 let id0 = run.current_agent_id(0);
1025 run.mark_spawned(0, &id0);
1026 run.record_completion(&id0, done());
1027 let ids = run.submit_nodes(vec![
1029 WorkflowNode::new(RuntimeTask::new("extractor"), AgentRole::Implement),
1030 WorkflowNode::new(RuntimeTask::new("dependent"), AgentRole::Implement)
1031 .with_depends_on(vec![0]),
1032 ]);
1033 assert_eq!(ids, vec![1, 2]);
1034 assert_eq!(run.ready_batch(), vec![1], "backward dep keeps the dependent pending");
1035 run.mark_spawned(1, "wf-node1");
1036 run.record_completion("wf-node1", done());
1037 assert_eq!(run.ready_batch(), vec![2], "dependent unblocks after the extractor");
1038 }
1039
1040 #[test]
1041 fn submit_nodes_drops_forward_and_out_of_range_deps() {
1042 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1043 use crate::types::agent::AgentRole;
1044
1045 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1046 RuntimeTask::new("root"),
1047 AgentRole::Implement,
1048 )]);
1049 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1050 let ids = run.submit_nodes(vec![
1052 WorkflowNode::new(RuntimeTask::new("a"), AgentRole::Implement).with_depends_on(vec![5]),
1053 ]);
1054 assert_eq!(ids, vec![1]);
1055 assert!(
1056 run.ready_batch().contains(&1),
1057 "a node whose only dep was dropped is ready, not stranded"
1058 );
1059 }
1060
1061 #[test]
1062 fn submitted_node_can_itself_be_a_loop_control_flow() {
1063 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1068 use crate::types::agent::AgentRole;
1069
1070 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1071 RuntimeTask::new("root"),
1072 AgentRole::Implement,
1073 )]);
1074 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1075 let id0 = run.current_agent_id(0);
1076 run.mark_spawned(0, &id0);
1077 run.record_completion(&id0, done());
1078
1079 let ids = run.submit_nodes(vec![
1081 WorkflowNode::new(RuntimeTask::new("refine"), AgentRole::Implement).with_loop(2),
1082 ]);
1083 assert_eq!(ids, vec![1]);
1084
1085 for k in 0..2 {
1087 assert_eq!(run.ready_batch(), vec![1], "submitted loop ready for iteration {k}");
1088 let id = run.current_agent_id(1);
1089 assert_eq!(id, format!("wf-node1-i{k}"), "submitted loop gets per-iteration ids");
1090 run.mark_spawned(1, &id);
1091 run.record_completion(&id, done());
1092 }
1093 assert!(run.is_complete(), "submitted loop ran its 2 iterations then finished");
1094 }
1095
1096 #[test]
1097 fn submitted_tournament_runs_bracket_then_promotes_submitted_dependent() {
1098 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1102 use crate::types::agent::AgentRole;
1103
1104 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1105 RuntimeTask::new("root"),
1106 AgentRole::Implement,
1107 )]);
1108 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1109 let id0 = run.current_agent_id(0);
1110 run.mark_spawned(0, &id0);
1111 run.record_completion(&id0, done());
1112
1113 let ids = run.submit_nodes(vec![
1115 WorkflowNode::new(RuntimeTask::new("pick best"), AgentRole::Plan)
1116 .with_tournament(vec![RuntimeTask::new("x"), RuntimeTask::new("y")]),
1117 WorkflowNode::new(RuntimeTask::new("use winner"), AgentRole::Implement)
1118 .with_depends_on(vec![0]),
1119 ]);
1120 assert_eq!(ids, vec![1, 2], "appended controller=1, dependent=2");
1121
1122 let entrants = spawn_round(&mut run);
1124 let entrant_nodes: Vec<usize> = entrants.iter().map(|(n, _)| *n).collect();
1125 assert_eq!(entrant_nodes, vec![3, 4], "two entrant children appended after the dependent");
1126 for (_, id) in &entrants {
1127 run.record_completion(id, done());
1128 }
1129
1130 let r1 = spawn_round(&mut run);
1132 assert_eq!(r1.len(), 1, "one judge for two entrants");
1133 let jm = run.spawn_info(r1[0].0).judge_match.expect("judge carries a match");
1134 assert_eq!(jm, JudgeMatch { left: node_agent_id(3), right: node_agent_id(4) });
1135
1136 run.record_completion(&r1[0].1, judge_done(&node_agent_id(3)));
1138 assert_eq!(run.ready_batch(), vec![2], "submitted dependent unblocks after the bracket");
1139 let last = spawn_round(&mut run);
1140 assert_eq!(last, vec![(2, node_agent_id(2))]);
1141 run.record_completion(&last[0].1, done());
1142 assert!(run.is_complete());
1143 }
1144
1145 #[test]
1146 fn submitted_classify_remaps_branch_indices_and_prunes() {
1147 use crate::orchestration::workflow::{ClassifyBranch, NodeKind, WorkflowNode, WorkflowSpec};
1151 use crate::types::agent::AgentRole;
1152
1153 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1154 RuntimeTask::new("root"),
1155 AgentRole::Implement,
1156 )]);
1157 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1158 let id0 = run.current_agent_id(0);
1159 run.mark_spawned(0, &id0);
1160 run.record_completion(&id0, done());
1161
1162 let ids = run.submit_nodes(vec![
1164 WorkflowNode::new(RuntimeTask::new("route"), AgentRole::Plan).with_classify(vec![
1165 ClassifyBranch { label: "a".into(), nodes: vec![1] },
1166 ClassifyBranch { label: "b".into(), nodes: vec![2] },
1167 ]),
1168 WorkflowNode::new(RuntimeTask::new("branch-a"), AgentRole::Implement)
1169 .with_depends_on(vec![0]),
1170 WorkflowNode::new(RuntimeTask::new("branch-b"), AgentRole::Implement)
1171 .with_depends_on(vec![0]),
1172 ]);
1173 assert_eq!(ids, vec![1, 2, 3], "classify=1, branchA=2, branchB=3");
1174
1175 if let NodeKind::Classify { branches } = &run.nodes[1].kind {
1177 assert_eq!(branches[0].nodes, vec![2], "branch a remapped to absolute node 2");
1178 assert_eq!(branches[1].nodes, vec![3], "branch b remapped to absolute node 3");
1179 } else {
1180 panic!("node 1 should be a classify node");
1181 }
1182
1183 let r = spawn_round(&mut run);
1185 assert_eq!(r, vec![(1, node_agent_id(1))], "classifier runs first");
1186 run.record_completion(&r[0].1, LoopResult { classify_branch: Some("a".into()), ..done() });
1187
1188 assert_eq!(run.ready_batch(), vec![2], "only branch a is enabled");
1189 let (_c, failed) = run.outcome();
1190 assert!(failed.contains(&node_agent_id(3)), "branch b pruned/failed");
1191
1192 let last = spawn_round(&mut run);
1193 assert_eq!(last, vec![(2, node_agent_id(2))]);
1194 run.record_completion(&last[0].1, done());
1195 assert!(run.is_complete());
1196 let (completed, _f) = run.outcome();
1197 assert!(completed.contains(&node_agent_id(1)) && completed.contains(&node_agent_id(2)));
1198 }
1199
1200 #[test]
1201 fn loop_node_iterates_with_distinct_ids_then_promotes_dependent() {
1202 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1203 use crate::types::agent::AgentRole;
1204
1205 let spec = WorkflowSpec::new(vec![
1207 WorkflowNode::new(RuntimeTask::new("refine"), AgentRole::Implement).with_loop(3),
1208 WorkflowNode::new(RuntimeTask::new("finalize"), AgentRole::Implement)
1209 .with_depends_on(vec![0]),
1210 ]);
1211 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1212
1213 for k in 0..3 {
1215 assert_eq!(run.ready_batch(), vec![0], "loop node ready for iteration {k}");
1216 let id = run.current_agent_id(0);
1217 assert_eq!(id, format!("wf-node0-i{k}"), "distinct per-iteration id");
1218 run.mark_spawned(0, &id);
1219 assert!(!run.is_complete());
1220 let node = run.record_completion(&id, done()).unwrap();
1221 assert_eq!(node, 0);
1222 if k < 2 {
1223 assert_eq!(run.ready_batch(), vec![0]);
1225 }
1226 }
1227
1228 assert_eq!(run.ready_batch(), vec![1], "dependent unblocks only after the loop ends");
1230 let id1 = run.current_agent_id(1);
1231 assert_eq!(id1, "wf-node1", "spawn node keeps the plain id");
1232 run.mark_spawned(1, &id1);
1233 run.record_completion(&id1, done());
1234 assert!(run.is_complete());
1235 }
1236
1237 #[test]
1238 fn synth_becomes_ready_only_after_both_workers() {
1239 let mut run = fanout2();
1240 for &n in &[0usize, 1usize] {
1241 let id = node_agent_id(n);
1242 run.mark_spawned(n, &id);
1243 }
1244 assert!(!run.batch_drained());
1245 assert_eq!(run.record_completion(&node_agent_id(0), done()), Some(0));
1247 assert!(!run.batch_drained());
1248 assert!(run.ready_batch().is_empty());
1249 assert_eq!(run.record_completion(&node_agent_id(1), done()), Some(1));
1251 assert!(run.batch_drained());
1252 assert_eq!(run.ready_batch(), vec![2]);
1253 assert!(!run.is_complete());
1254 run.mark_spawned(2, &node_agent_id(2));
1256 run.record_completion(&node_agent_id(2), done());
1257 assert!(run.is_complete());
1258 }
1259
1260 #[test]
1261 fn denied_node_blocks_dependents_and_stalls_progress() {
1262 let mut run = fanout2();
1263 run.mark_spawned(0, &node_agent_id(0));
1265 run.mark_denied(1);
1266 run.record_completion(&node_agent_id(0), done());
1267 assert!(run.batch_drained());
1271 assert!(run.ready_batch().is_empty());
1272 assert!(!run.is_complete());
1273 }
1274
1275 #[test]
1276 fn manifest_preserves_node_isolation_and_inheritance() {
1277 let run = fanout2();
1278 let m = run.manifest_for(0);
1279 assert_eq!(m.agent_id.as_str(), "wf-node0");
1280 assert_eq!(m.parent_session_id.as_str(), "parent-sess");
1281 assert_eq!(m.isolation, crate::types::agent::AgentIsolation::ReadOnly);
1283 assert_eq!(
1284 m.context_inheritance,
1285 crate::types::agent::ContextInheritance::SystemOnly
1286 );
1287 }
1288
1289 #[test]
1290 fn unknown_agent_completion_is_none() {
1291 let mut run = fanout2();
1292 assert_eq!(run.record_completion("not-a-node", done()), None);
1293 }
1294
1295 #[test]
1296 fn resume_skips_already_completed_nodes() {
1297 let spec = fanout_synthesize(
1299 vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")],
1300 RuntimeTask::new("synth"),
1301 );
1302 let run = WorkflowRun::resume(&spec, "sess", &[], &[node_agent_id(0)]).unwrap();
1303 assert_eq!(run.ready_batch(), vec![1]);
1305 assert!(!run.is_complete());
1306 }
1307
1308 #[test]
1309 fn resume_with_all_done_completes() {
1310 let spec = fanout_synthesize(vec![RuntimeTask::new("w0")], RuntimeTask::new("synth"));
1311 let run = WorkflowRun::resume(&spec, "sess", &[], &[node_agent_id(0), node_agent_id(1)]).unwrap();
1313 assert!(run.ready_batch().is_empty());
1314 assert!(run.is_complete());
1315 }
1316
1317 #[test]
1318 fn resume_reapplies_submissions_to_reconstruct_appended_nodes() {
1319 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1323 use crate::types::agent::AgentRole;
1324
1325 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1326 RuntimeTask::new("root"),
1327 AgentRole::Implement,
1328 )]);
1329 let submission = vec![WorkflowNode::new(RuntimeTask::new("discovered"), AgentRole::Implement)];
1330
1331 let run = WorkflowRun::resume(&spec, "sess", &[submission.clone()], &[node_agent_id(0)]).unwrap();
1333 assert_eq!(run.len(), 2, "base node + re-applied submitted node");
1334 assert_eq!(run.ready_batch(), vec![1], "the re-applied appended node is the remaining work");
1335 assert!(!run.is_complete());
1336
1337 let run2 =
1339 WorkflowRun::resume(&spec, "sess", &[submission], &[node_agent_id(0), node_agent_id(1)]).unwrap();
1340 assert!(run2.ready_batch().is_empty());
1341 assert!(run2.is_complete());
1342 }
1343
1344 #[test]
1345 fn spawn_info_carries_model_hint_and_trust() {
1346 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1347 use crate::types::agent::AgentRole;
1348
1349 let spec = WorkflowSpec::new(vec![
1350 WorkflowNode::new(RuntimeTask::new("read tickets"), AgentRole::Explore)
1351 .quarantined()
1352 .with_model_hint("haiku"),
1353 WorkflowNode::new(RuntimeTask::new("act"), AgentRole::Implement),
1354 ]);
1355 let run = WorkflowRun::new(&spec, "sess").unwrap();
1356
1357 let q = run.spawn_info(0);
1359 assert_eq!(q.trust, "quarantined");
1360 assert_eq!(q.model_hint.as_deref(), Some("haiku"));
1361 let t = run.spawn_info(1);
1363 assert_eq!(t.trust, "trusted");
1364 assert_eq!(t.model_hint, None);
1365 }
1366
1367 #[test]
1368 fn spawn_info_carries_loop_and_classify_hints() {
1369 use crate::orchestration::workflow::{ClassifyBranch, WorkflowNode, WorkflowSpec};
1370 use crate::types::agent::AgentRole;
1371
1372 let spec = WorkflowSpec::new(vec![
1373 WorkflowNode::new(RuntimeTask::new("refine"), AgentRole::Implement).with_loop(3),
1375 WorkflowNode::new(RuntimeTask::new("route"), AgentRole::Plan).with_classify(vec![
1377 ClassifyBranch { label: "bug".into(), nodes: vec![] },
1378 ClassifyBranch { label: "feature".into(), nodes: vec![] },
1379 ]),
1380 WorkflowNode::new(RuntimeTask::new("act"), AgentRole::Implement),
1382 ]);
1383 let run = WorkflowRun::new(&spec, "sess").unwrap();
1384
1385 let l = run.spawn_info(0);
1386 assert_eq!(l.loop_max_iters, Some(3));
1387 assert!(l.classify_labels.is_empty());
1388 assert_eq!(l.token_budget, None, "no token budget unless set");
1389
1390 let c = run.spawn_info(1);
1391 assert_eq!(c.classify_labels, vec!["bug".to_string(), "feature".to_string()]);
1392 assert_eq!(c.loop_max_iters, None);
1393
1394 let s = run.spawn_info(2);
1395 assert_eq!(s.loop_max_iters, None);
1396 assert!(s.classify_labels.is_empty());
1397 }
1398
1399 #[test]
1400 fn spawn_info_carries_token_budget() {
1401 use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
1402 use crate::types::agent::AgentRole;
1403
1404 let spec = WorkflowSpec::new(vec![
1405 WorkflowNode::new(RuntimeTask::new("expensive"), AgentRole::Implement).with_token_budget(10_000),
1406 WorkflowNode::new(RuntimeTask::new("plain"), AgentRole::Implement),
1407 ]);
1408 let run = WorkflowRun::new(&spec, "sess").unwrap();
1409 assert_eq!(run.spawn_info(0).token_budget, Some(10_000));
1410 assert_eq!(run.spawn_info(1).token_budget, None);
1411 }
1412
1413 use crate::orchestration::workflow::{NodeKind, WorkflowNode, WorkflowSpec};
1416 use crate::types::agent::AgentRole;
1417
1418 #[test]
1422 fn tournament_runs_bracket_then_promotes_dependent() {
1423 let spec = WorkflowSpec::new(vec![
1424 WorkflowNode::new(RuntimeTask::new("pick the best ad"), AgentRole::Plan).with_tournament(
1425 vec![
1426 RuntimeTask::new("ad A"),
1427 RuntimeTask::new("ad B"),
1428 RuntimeTask::new("ad C"),
1429 RuntimeTask::new("ad D"),
1430 ],
1431 ),
1432 WorkflowNode::new(RuntimeTask::new("ship the winner"), AgentRole::Implement)
1433 .with_depends_on(vec![0]),
1434 ]);
1435 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1436
1437 let entrants = spawn_round(&mut run);
1440 let entrant_nodes: Vec<usize> = entrants.iter().map(|(n, _)| *n).collect();
1441 assert_eq!(entrant_nodes, vec![2, 3, 4, 5], "4 entrant children, no controller spawn");
1442 assert!(run.spawn_info(2).judge_match.is_none(), "entrants are not judges");
1443 assert!(!run.is_complete());
1444
1445 for (i, (node, id)) in entrants.iter().enumerate() {
1447 run.record_completion(id, done());
1448 if i < 3 {
1449 assert!(run.ready_batch().is_empty(), "no judges until every entrant is in");
1450 }
1451 let _ = node;
1452 }
1453
1454 let r1 = spawn_round(&mut run);
1456 assert_eq!(r1.len(), 2, "two round-1 judges");
1457 let jm0 = run.spawn_info(r1[0].0).judge_match.expect("judge carries a match");
1458 assert_eq!(jm0, JudgeMatch { left: node_agent_id(2), right: node_agent_id(3) });
1459 let jm1 = run.spawn_info(r1[1].0).judge_match.expect("judge carries a match");
1460 assert_eq!(jm1, JudgeMatch { left: node_agent_id(4), right: node_agent_id(5) });
1461
1462 run.record_completion(&r1[0].1, judge_done(&node_agent_id(2)));
1464 run.record_completion(&r1[1].1, judge_done(&node_agent_id(4)));
1465 assert!(run.ready_batch().iter().all(|&n| n != 1), "dependent gated until the final");
1466
1467 let r2 = spawn_round(&mut run);
1469 assert_eq!(r2.len(), 1, "one final judge");
1470 let jmf = run.spawn_info(r2[0].0).judge_match.expect("final judge carries a match");
1471 assert_eq!(jmf, JudgeMatch { left: node_agent_id(2), right: node_agent_id(4) });
1472
1473 run.record_completion(&r2[0].1, judge_done(&node_agent_id(4)));
1475 let winner = run
1476 .graph
1477 .get(0)
1478 .and_then(|n| n.result.as_ref())
1479 .and_then(|r| r.tournament_winner.clone());
1480 assert_eq!(winner.as_deref(), Some(node_agent_id(4).as_str()), "champion recorded");
1481 assert_eq!(run.ready_batch(), vec![1], "dependent unblocks only after the bracket resolves");
1482
1483 let last = spawn_round(&mut run);
1485 assert_eq!(last, vec![(1, node_agent_id(1))]);
1486 run.record_completion(&last[0].1, done());
1487 assert!(run.is_complete());
1488 }
1489
1490 #[test]
1493 fn tournament_with_bye_resolves() {
1494 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1495 RuntimeTask::new("rank"),
1496 AgentRole::Plan,
1497 )
1498 .with_tournament(vec![
1499 RuntimeTask::new("x"),
1500 RuntimeTask::new("y"),
1501 RuntimeTask::new("z"),
1502 ])]);
1503 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1504
1505 let entrants = spawn_round(&mut run); assert_eq!(entrants.len(), 3);
1507 for (_, id) in &entrants {
1508 run.record_completion(id, done());
1509 }
1510 let r1 = spawn_round(&mut run);
1512 assert_eq!(r1.len(), 1, "one match, one bye");
1513 run.record_completion(&r1[0].1, judge_done(&node_agent_id(1)));
1514 let r2 = spawn_round(&mut run);
1516 assert_eq!(r2.len(), 1);
1517 let jm = run.spawn_info(r2[0].0).judge_match.unwrap();
1518 assert_eq!(jm, JudgeMatch { left: node_agent_id(1), right: node_agent_id(3) });
1519 run.record_completion(&r2[0].1, judge_done(&node_agent_id(3)));
1520 let winner = run.graph.get(0).and_then(|n| n.result.as_ref()).and_then(|r| r.tournament_winner.clone());
1521 assert_eq!(winner.as_deref(), Some(node_agent_id(3).as_str()));
1522 assert!(run.is_complete());
1523 }
1524
1525 #[test]
1528 fn tournament_children_inherit_controller_trust() {
1529 let spec = WorkflowSpec::new(vec![WorkflowNode::new(
1530 RuntimeTask::new("judge untrusted inputs"),
1531 AgentRole::Plan,
1532 )
1533 .quarantined()
1534 .with_tournament(vec![RuntimeTask::new("a"), RuntimeTask::new("b")])]);
1535 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1536
1537 let entrants = spawn_round(&mut run);
1538 for (node, _) in &entrants {
1539 assert_eq!(run.spawn_info(*node).trust, "quarantined", "entrant inherits quarantine");
1540 assert!(!run.quarantine_violation(*node), "read-only entrant is quarantine-clean");
1541 }
1542 for (_, id) in &entrants {
1543 run.record_completion(id, done());
1544 }
1545 let r1 = spawn_round(&mut run);
1546 assert_eq!(run.spawn_info(r1[0].0).trust, "quarantined", "judge inherits quarantine");
1547 assert!(!run.quarantine_violation(r1[0].0));
1548 }
1549
1550 #[test]
1553 fn tournament_controller_never_spawns_itself() {
1554 let spec = WorkflowSpec::new(vec![WorkflowNode::new(RuntimeTask::new("c"), AgentRole::Plan)
1555 .with_tournament(vec![RuntimeTask::new("a"), RuntimeTask::new("b")])]);
1556 let mut run = WorkflowRun::new(&spec, "sess").unwrap();
1557 assert!(matches!(run.nodes[0].kind, NodeKind::Tournament { .. }));
1558 let first = spawn_round(&mut run);
1559 assert!(first.iter().all(|(n, _)| *n != 0), "controller node 0 never spawns directly");
1560 }
1561}