use std::sync::{Arc, Mutex};
use cap_rs::core::StopReason;
use cap_rs_orchestrator::config::FleetSpec;
use cap_rs_orchestrator::event::OrchestratorEvent;
use cap_rs_orchestrator::executor::Executor;
use cap_rs_orchestrator::testing::{StubDriver, StubDriverFactory};
use cap_rs_orchestrator::worktree::NoopWorktreeManager;
async fn run_to_completion(
spec: FleetSpec,
factory: StubDriverFactory,
) -> (Vec<String>, Vec<(String, String)>) {
let wt = NoopWorktreeManager::new();
let (handle, mut events) = Executor::start(spec, factory, wt, "the task")
.await
.expect("executor start");
let mut tags = Vec::new();
while let Some(ev) = events.recv().await {
match &ev {
OrchestratorEvent::SessionStarted { session } => tags.push(format!("start:{session}")),
OrchestratorEvent::SessionDone { session, .. } => tags.push(format!("done:{session}")),
OrchestratorEvent::Routed { from, to } => tags.push(format!("route:{from}->{to}")),
OrchestratorEvent::AwaitSelection { candidates } => {
tags.push(format!("select:{}", candidates.join(",")));
}
OrchestratorEvent::FleetComplete => {
tags.push("complete".into());
break;
}
_ => {}
}
}
let audit = handle.audit_pairs();
(tags, audit)
}
#[tokio::test]
async fn pipeline_a_then_b() {
let spec = FleetSpec::from_yaml(
r#"
fleet:
base_branch: main
sessions:
coder: { driver: claude, permissions: allow }
reviewer: { driver: codex, permissions: allow }
start: coder
routes:
- { when: coder.done, route_to: reviewer }
"#,
)
.unwrap();
let factory = StubDriverFactory::new()
.with(
"coder",
StubDriver::new("coder")
.text("wrote code")
.done(StopReason::EndTurn),
)
.with(
"reviewer",
StubDriver::new("reviewer")
.text("looks ok")
.done(StopReason::EndTurn),
);
let (tags, audit) = run_to_completion(spec, factory).await;
assert_eq!(tags.iter().filter(|t| t.starts_with("done:")).count(), 2);
let route_pos = tags
.iter()
.position(|t| t == "route:coder->reviewer")
.unwrap();
let coder_done = tags.iter().position(|t| t == "done:coder").unwrap();
let reviewer_done = tags.iter().position(|t| t == "done:reviewer").unwrap();
assert!(coder_done < route_pos, "route must follow coder done");
assert!(
route_pos < reviewer_done,
"reviewer done must follow the route"
);
assert!(tags.last().unwrap() == "complete");
assert_eq!(audit, vec![("coder".to_string(), "reviewer".to_string())]);
}
#[tokio::test]
async fn lead_worker_fan_out_then_join() {
let spec = FleetSpec::from_yaml(
r#"
fleet:
base_branch: main
sessions:
lead: { driver: claude, permissions: allow }
a: { driver: codex, permissions: allow }
b: { driver: codex, permissions: allow }
rev: { driver: claude, permissions: allow }
start: lead
routes:
- when: lead.done
fan_out: { to: [a, b], split: broadcast }
- when: [a.done, b.done]
route_to: rev
"#,
)
.unwrap();
let factory = StubDriverFactory::new()
.with(
"lead",
StubDriver::new("lead")
.text("plan")
.done(StopReason::EndTurn),
)
.with(
"a",
StubDriver::new("a")
.text("a-work")
.done(StopReason::EndTurn),
)
.with(
"b",
StubDriver::new("b")
.text("b-work")
.done(StopReason::EndTurn),
)
.with(
"rev",
StubDriver::new("rev")
.text("merged")
.done(StopReason::EndTurn),
);
let (tags, audit) = run_to_completion(spec, factory).await;
let rev_start = tags.iter().position(|t| t == "start:rev").unwrap();
let a_done = tags.iter().position(|t| t == "done:a").unwrap();
let b_done = tags.iter().position(|t| t == "done:b").unwrap();
assert!(
a_done < rev_start && b_done < rev_start,
"join must wait for both"
);
assert!(audit.contains(&("lead".into(), "a".into())));
assert!(audit.contains(&("lead".into(), "b".into())));
assert!(
audit.contains(&("a".into(), "rev".into())) || audit.contains(&("b".into(), "rev".into()))
);
assert_eq!(tags.last().unwrap(), "complete");
}
#[tokio::test]
async fn parallel_race_collects_for_human() {
let spec = FleetSpec::from_yaml(
r#"
fleet:
base_branch: main
sessions:
x: { driver: claude, permissions: allow }
y: { driver: codex, permissions: allow }
start: [x, y]
routes:
- when: [x.done, y.done]
collect: human
"#,
)
.unwrap();
let factory = StubDriverFactory::new()
.with(
"x",
StubDriver::new("x").text("sol-x").done(StopReason::EndTurn),
)
.with(
"y",
StubDriver::new("y").text("sol-y").done(StopReason::EndTurn),
);
let (tags, _audit) = run_to_completion(spec, factory).await;
assert!(tags.iter().any(|t| t == "select:x,y"), "tags: {tags:?}");
assert_eq!(tags.last().unwrap(), "complete");
}
#[tokio::test]
async fn lead_worker_by_subtask_split() {
let spec = FleetSpec::from_yaml(
r#"
fleet:
base_branch: main
sessions:
lead: { driver: claude, permissions: allow }
a: { driver: codex, permissions: allow }
b: { driver: codex, permissions: allow }
start: lead
routes:
- when: lead.done
fan_out: { to: [a, b], split: by_subtask }
"#,
)
.unwrap();
let fence = "`".repeat(3);
let lead_out = format!(
"Here is the plan.\n{fence}cap-subtasks\n[\"task for A\", \"task for B\"]\n{fence}\n"
);
let factory = StubDriverFactory::new()
.with(
"lead",
StubDriver::new("lead")
.text(&lead_out)
.done(StopReason::EndTurn),
)
.with(
"a",
StubDriver::new("a").text("did A").done(StopReason::EndTurn),
)
.with(
"b",
StubDriver::new("b").text("did B").done(StopReason::EndTurn),
);
let (tags, audit) = run_to_completion(spec, factory).await;
assert!(
audit.contains(&("lead".into(), "a".into())),
"audit: {audit:?}"
);
assert!(
audit.contains(&("lead".into(), "b".into())),
"audit: {audit:?}"
);
assert!(tags.iter().any(|t| t == "done:a"));
assert!(tags.iter().any(|t| t == "done:b"));
assert_eq!(tags.last().unwrap(), "complete");
}
#[tokio::test]
async fn by_subtask_missing_block_fails_lead_and_terminates() {
let spec = FleetSpec::from_yaml(
r#"
fleet:
base_branch: main
sessions:
lead: { driver: claude, permissions: allow }
a: { driver: codex, permissions: allow }
start: lead
routes:
- when: lead.done
fan_out: { to: [a], split: by_subtask }
"#,
)
.unwrap();
let factory = StubDriverFactory::new()
.with(
"lead",
StubDriver::new("lead")
.text("no block here")
.done(StopReason::EndTurn),
)
.with(
"a",
StubDriver::new("a")
.text("should not run")
.done(StopReason::EndTurn),
);
let wt = NoopWorktreeManager::new();
let (_handle, mut events) = Executor::start(spec, factory, wt, "task").await.unwrap();
let mut saw_lead_failed = false;
let mut saw_complete = false;
let mut a_started = false;
while let Some(ev) = events.recv().await {
match ev {
OrchestratorEvent::SessionFailed { session, .. } if session == "lead" => {
saw_lead_failed = true
}
OrchestratorEvent::SessionStarted { session } if session == "a" => a_started = true,
OrchestratorEvent::FleetComplete => {
saw_complete = true;
break;
}
_ => {}
}
}
assert!(
saw_lead_failed,
"lead should be reported failed for the unparseable split"
);
assert!(saw_complete, "fleet must terminate, not hang");
assert!(!a_started, "target a must not spawn when the split fails");
}
#[tokio::test]
async fn pipeline_forwards_upstream_output() {
let spec = FleetSpec::from_yaml(
r#"
fleet:
base_branch: main
sessions:
coder: { driver: claude, permissions: allow }
reviewer: { driver: codex, permissions: allow }
start: coder
routes:
- { when: coder.done, route_to: reviewer }
"#,
)
.unwrap();
let captured = Arc::new(Mutex::new(Vec::<String>::new()));
let factory = StubDriverFactory::new()
.with(
"coder",
StubDriver::new("coder")
.text("CODER_OUTPUT_XYZ")
.done(StopReason::EndTurn),
)
.with(
"reviewer",
StubDriver::new("reviewer")
.capture(captured.clone())
.done(StopReason::EndTurn),
);
let _ = run_to_completion(spec, factory).await;
let prompts = captured.lock().unwrap();
assert!(
prompts.iter().any(|p| p.contains("CODER_OUTPUT_XYZ")),
"reviewer should receive coder's output; got: {prompts:?}"
);
}