use super::*;
use crate::context::skill_catalog::SKILL_TOOL_NAME;
use crate::runtime::kernel::KernelPressureAction;
use crate::types::message::Role;
use crate::types::skill::SkillMetadata;
fn sm() -> LoopStateMachine {
LoopStateMachine::new(LoopPolicy {
max_tokens: 128_000,
..LoopPolicy::default()
})
}
#[test]
fn start_emits_call_llm() {
let mut sm = sm();
let action = sm.start(RuntimeTask::new("Say hello"));
assert!(matches!(action, LoopAction::CallLLM { .. }));
assert!(matches!(sm.phase, LoopPhase::Reason));
}
#[test]
fn resume_after_preload_runs_pending_tools_before_llm() {
let mut sm = sm();
sm.preload_history(vec![
Message::user("goal"),
Message {
role: Role::Assistant,
content: Content::Text("checking".into()),
tool_calls: vec![ToolCall {
id: compact_str::CompactString::new("call_ping"),
name: compact_str::CompactString::new("ping"),
arguments: serde_json::json!({}),
}],
token_count: Some(5),
},
]);
match sm.resume_after_preload() {
LoopAction::ExecuteTools { calls } => {
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].name.as_str(), "ping");
}
other => panic!("expected ExecuteTools, got {other:?}"),
}
}
#[test]
fn resume_after_preload_emits_page_in_requested_for_pending_memory() {
let mut sm = sm();
sm.ctx.set_memory_enabled(true);
sm.preload_history(vec![
Message::user("goal"),
Message {
role: Role::Assistant,
content: Content::Text("recall".into()),
tool_calls: vec![ToolCall {
id: compact_str::CompactString::new("mem1"),
name: compact_str::CompactString::new("memory"),
arguments: serde_json::json!({ "query": "archived", "top_k": 3 }),
}],
token_count: Some(5),
},
]);
let _action = sm.resume_after_preload();
assert!(sm.observations.iter().any(|o| {
matches!(
o,
KernelObservation::PageInRequested { tool, query, .. }
if tool == "memory" && query == "archived"
)
}));
}
#[test]
fn resume_after_preload_emits_call_llm_without_duplicate_user() {
let mut sm = sm();
sm.preload_history(vec![
Message::user("prior goal"),
Message::assistant("partial"),
]);
let history_len = sm.ctx.partitions.history.messages.len();
let action = sm.resume_after_preload();
assert!(matches!(action, LoopAction::CallLLM { .. }));
assert_eq!(sm.ctx.partitions.history.messages.len(), history_len);
}
#[test]
fn start_places_user_message_in_history_not_signals() {
let mut sm = sm();
sm.start(RuntimeTask::new("Say hello"));
assert!(!sm.ctx.partitions.history.is_empty(), "history should have user message");
assert!(sm.ctx.partitions.signals.is_empty(), "signals should stay empty at start");
}
#[test]
fn llm_response_without_tools_terminates_and_saves_to_history() {
let mut sm = sm();
sm.start(RuntimeTask::new("Say hello"));
let action = sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("Hello!"),
});
assert!(matches!(action, LoopAction::Done { .. }));
assert!(sm.is_terminal());
let history = &sm.ctx.partitions.history.messages;
assert!(
history
.iter()
.any(|m| m.content.as_text() == Some("Hello!"))
);
}
#[test]
fn timeout_rolls_back() {
let mut sm = sm();
sm.start(RuntimeTask::new("test"));
match sm.feed(LoopEvent::Timeout) {
LoopAction::CallLLM { .. } => {}
_ => panic!("expected CallLLM"),
}
assert!(sm.observations.iter().any(|o| {
matches!(
o,
KernelObservation::Rollbacked {
reason: Some(RollbackReason::Timeout),
..
}
)
}));
}
#[test]
fn critical_signal_goes_to_signals_not_history() {
use crate::types::signal::{SignalSource, SignalType, Urgency};
let mut sm = sm();
sm.start(RuntimeTask::new("test"));
let history_len_before = sm.ctx.partitions.history.messages.len();
let sig = RuntimeSignal::new(SignalSource::Gateway, SignalType::Alert, Urgency::Critical, "fire");
let action = sm.feed(LoopEvent::Signal { signal: sig });
assert!(matches!(action, LoopAction::CallLLM { .. }));
assert!(matches!(sm.phase, LoopPhase::Reason));
assert!(sm.ctx.partitions.signals.iter().any(|s| s.contains("[INTERRUPT]")));
assert_eq!(sm.ctx.partitions.history.messages.len(), history_len_before);
}
#[test]
fn interrupt_now_preempts_awaited_subagent() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
use crate::types::signal::{SignalSource, SignalType, Urgency};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
let spec = AgentRunSpec::new(
AgentIdentity::sub_agent("child", "child-session"),
AgentRole::Implement,
"child task",
);
sm.spawn_sub_agent(spec, "parent-sess");
assert!(sm.is_suspended());
sm.take_observations();
let sig = RuntimeSignal::new(SignalSource::Gateway, SignalType::Alert, Urgency::Critical, "stop and handle this");
let action = sm.signal_event(sig);
assert!(matches!(action, Some(LoopAction::CallLLM { .. })), "interrupt reclaims the root");
assert!(!sm.is_suspended(), "preemption cleared SubAgentAwait");
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(
o,
KernelObservation::AgentPreempted { agent_ids, .. } if agent_ids == &vec!["child".to_string()]
)), "AgentPreempted names the aborted child");
}
#[test]
fn interrupt_now_aborts_owning_workflow() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
use crate::types::signal::{SignalSource, SignalType, Urgency};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
let spec = WorkflowSpec::new(vec![WorkflowNode::new(RuntimeTask::new("root node"), AgentRole::Implement)]);
sm.load_workflow(spec, "sess");
assert!(sm.workflow_active());
sm.take_observations();
let sig = RuntimeSignal::new(SignalSource::Gateway, SignalType::Alert, Urgency::Critical, "abort");
sm.signal_event(sig);
assert!(!sm.workflow_active(), "InterruptNow tore the workflow down");
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(o, KernelObservation::AgentPreempted { .. })));
assert!(obs.iter().any(|o| matches!(
o,
KernelObservation::WorkflowCompleted { failed, .. } if failed.contains(&"wf-node0".to_string())
)), "the running node is reported failed on abort");
}
#[test]
fn high_urgency_interrupt_does_not_preempt() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
use crate::types::signal::{SignalSource, SignalType, Urgency};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
let spec = AgentRunSpec::new(
AgentIdentity::sub_agent("child", "child-session"),
AgentRole::Implement,
"child task",
);
sm.spawn_sub_agent(spec, "parent-sess");
sm.take_observations();
let sig = RuntimeSignal::new(SignalSource::Gateway, SignalType::Alert, Urgency::High, "fyi handle soon");
let action = sm.signal_event(sig);
assert!(action.is_none(), "soft Interrupt does not force a turn");
assert!(sm.is_suspended(), "running sub-agent is NOT aborted");
let obs = sm.take_observations();
assert!(!obs.iter().any(|o| matches!(o, KernelObservation::AgentPreempted { .. })), "no preemption");
assert!(sm.ctx.partitions.task_state.directives.iter().any(|d| d.contains("handle soon")), "directive recorded for next boundary");
}
#[test]
fn user_directive_survives_renewal() {
use crate::types::signal::{SignalSource, SignalType, Urgency};
let mut sm = LoopStateMachine::new(LoopPolicy {
max_tokens: 100,
max_turns: 100,
..LoopPolicy::default()
});
sm.start(RuntimeTask::new("ship the feature"));
let sig = RuntimeSignal::new(
SignalSource::Gateway,
SignalType::Alert,
Urgency::Critical,
"do NOT modify the migration files",
);
sm.feed(LoopEvent::Signal { signal: sig });
assert!(
sm.ctx.partitions.task_state.directives.iter().any(|d| d.contains("migration files")),
"acted-on signal is promoted to a durable directive"
);
for i in 0..10 {
sm.ctx.partitions.system.push(Message::system(format!("c{i}")), 10);
}
sm.feed(LoopEvent::ToolResults { results: vec![] });
assert!(
sm.take_observations().iter().any(|o| matches!(o, KernelObservation::Renewed { .. })),
"renewal fired"
);
assert!(
sm.ctx.partitions.task_state.directives.iter().any(|d| d.contains("migration files")),
"user directive must survive a sprint renewal"
);
assert!(sm.ctx.partitions.task_state.format_compact().contains("migration files"));
}
#[test]
fn max_turns_emits_final_toolless_call_then_terminates() {
let mut sm = LoopStateMachine::new(LoopPolicy {
max_tokens: 128_000,
max_turns: 1,
..LoopPolicy::default()
});
sm.start(RuntimeTask::new("test"));
let action = sm.feed(LoopEvent::ToolResults { results: vec![] });
match action {
LoopAction::CallLLM { tools, .. } => {
assert!(tools.is_empty(), "final call must have no tools")
}
_ => panic!("expected CallLLM for final text-only call"),
}
let action = sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("final summary"),
});
match action {
LoopAction::Done { result } => {
assert_eq!(result.termination, TerminationReason::MaxTurns);
assert!(
result.final_message.is_some(),
"final message must be preserved"
);
}
_ => panic!("expected Done"),
}
}
#[test]
fn active_skill_gates_exposed_tools_with_stable_core() {
let mut sm = sm();
sm.tools = vec![
make_tool_schema("read"),
make_tool_schema("write"),
make_tool_schema("bash"),
make_tool_schema("grep"),
];
let mut debug = SkillMetadata::new("debug", "Debug helper");
debug.allowed_tools = vec![compact_str::CompactString::new("read"), compact_str::CompactString::new("grep")];
sm.ctx.set_available_skills(vec![debug]);
sm.ctx.set_stable_core_tools([compact_str::CompactString::new("bash")]);
match sm.start(RuntimeTask::new("go")) {
LoopAction::CallLLM { tools, .. } => {
let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
assert!(["read", "write", "bash", "grep"].iter().all(|n| names.contains(n)));
assert!(names.contains(&SKILL_TOOL_NAME));
}
_ => panic!("expected CallLLM"),
}
sm.ctx.activate_skill("debug");
match sm.emit_call_llm() {
LoopAction::CallLLM { tools, .. } => {
let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
assert!(names.contains(&"read") && names.contains(&"grep"), "declared: {names:?}");
assert!(names.contains(&"bash"), "stable-core kept: {names:?}");
assert!(names.contains(&SKILL_TOOL_NAME), "meta-tool exempt: {names:?}");
assert!(!names.contains(&"write"), "undeclared gated out: {names:?}");
}
_ => panic!("expected CallLLM"),
}
}
#[test]
fn gating_is_byte_stable_within_an_epoch() {
fn names(action: LoopAction) -> Vec<String> {
match action {
LoopAction::CallLLM { tools, .. } => {
let mut n: Vec<String> = tools.iter().map(|t| t.name.to_string()).collect();
n.sort();
n
}
_ => panic!("expected CallLLM"),
}
}
let mut sm = sm();
sm.tools = vec![make_tool_schema("read"), make_tool_schema("write"), make_tool_schema("grep")];
let mut debug = SkillMetadata::new("debug", "d");
debug.allowed_tools = vec![compact_str::CompactString::new("read")];
let mut review = SkillMetadata::new("review", "r");
review.allowed_tools = vec![compact_str::CompactString::new("grep")];
sm.ctx.set_available_skills(vec![debug, review]);
sm.start(RuntimeTask::new("go"));
sm.ctx.activate_skill("debug");
let n1 = names(sm.emit_call_llm());
let n2 = names(sm.emit_call_llm()); assert_eq!(n1, n2, "toolset must be byte-stable within an epoch");
assert!(n1.contains(&"read".to_string()) && !n1.contains(&"write".to_string()));
sm.ctx.activate_skill("review"); let n3 = names(sm.emit_call_llm());
assert_ne!(n1, n3, "activating another skill changes the toolset");
assert!(n3.contains(&"grep".to_string()), "union now includes review's tools: {n3:?}");
}
#[test]
fn skill_tool_injected_in_call_llm_when_skills_registered() {
let mut sm = sm();
sm.ctx
.set_available_skills(vec![SkillMetadata::new("debug", "Debug helper")]);
let action = sm.start(RuntimeTask::new("Fix the bug"));
match action {
LoopAction::CallLLM { tools, .. } => {
assert!(tools.iter().any(|t| t.name.as_str() == SKILL_TOOL_NAME));
}
_ => panic!("expected CallLLM"),
}
}
#[test]
fn skill_tool_not_injected_when_no_skills() {
let mut sm = sm();
let action = sm.start(RuntimeTask::new("Say hello"));
match action {
LoopAction::CallLLM { tools, .. } => {
assert!(!tools.iter().any(|t| t.name.as_str() == SKILL_TOOL_NAME));
}
_ => panic!("expected CallLLM"),
}
}
#[test]
fn top_level_run_capability_filter_gates_exposed_tools() {
use crate::types::agent::{
AgentCapabilityFilter, AgentIdentity, AgentRole, AgentRunSpec,
};
let mut sm = sm();
sm.tools = vec![
make_tool_schema("read"),
make_tool_schema("write"),
make_tool_schema("bash"),
make_tool_schema("search"),
];
let mut spec = AgentRunSpec::new(
AgentIdentity::new("root", "root-session"),
AgentRole::Custom,
"do the task",
);
spec.capability_filter = AgentCapabilityFilter {
allowed_kinds: Vec::new(),
allowed_ids: vec![
compact_str::CompactString::new("read"),
compact_str::CompactString::new("search"),
],
};
sm.run_spec = Some(spec);
match sm.start(RuntimeTask::new("do the task")) {
LoopAction::CallLLM { tools, .. } => {
let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
assert!(names.contains(&"read"), "read should be exposed: {names:?}");
assert!(names.contains(&"search"), "search should be exposed: {names:?}");
assert!(!names.contains(&"write"), "write must be gated out: {names:?}");
assert!(!names.contains(&"bash"), "bash must be gated out: {names:?}");
}
_ => panic!("expected CallLLM"),
}
}
#[test]
fn top_level_run_without_spec_exposes_all_tools() {
let mut sm = sm();
sm.tools = vec![
make_tool_schema("read"),
make_tool_schema("write"),
make_tool_schema("bash"),
];
match sm.start(RuntimeTask::new("do the task")) {
LoopAction::CallLLM { tools, .. } => {
let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
assert!(names.contains(&"read") && names.contains(&"write") && names.contains(&"bash"));
}
_ => panic!("expected CallLLM"),
}
}
#[test]
fn compression_emits_observation() {
let mut sm = LoopStateMachine::new(LoopPolicy {
max_tokens: 100,
max_turns: 100,
..LoopPolicy::default()
});
sm.start(RuntimeTask::new("test"));
for i in 0..10 {
sm.ctx
.push_history(Message::user(format!("filler {i}")), 50);
}
sm.feed(LoopEvent::ToolResults { results: vec![] });
let obs = sm.take_observations();
assert!(
obs.iter()
.any(|o| matches!(o, KernelObservation::Compressed { .. }))
);
}
#[test]
fn renewal_emits_observation_when_pressure_extreme() {
let mut sm = LoopStateMachine::new(LoopPolicy {
max_tokens: 100,
max_turns: 100,
..LoopPolicy::default()
});
sm.start(RuntimeTask::new("test"));
for i in 0..10 {
sm.ctx
.partitions
.system
.push(Message::system(format!("constraint {i}")), 10);
}
sm.feed(LoopEvent::ToolResults { results: vec![] });
let obs = sm.take_observations();
assert!(
obs.iter()
.any(|o| matches!(o, KernelObservation::Renewed { .. }))
);
}
#[test]
fn force_compact_emits_page_out_when_archived() {
let mut sm = LoopStateMachine::new(LoopPolicy {
max_tokens: 100,
max_turns: 100,
..LoopPolicy::default()
});
sm.start(RuntimeTask::new("test"));
for i in 0..10 {
sm.ctx
.push_history(Message::user(format!("filler {i}")), 50);
}
assert!(sm.force_compact());
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(o, KernelObservation::PageOut { .. })));
}
#[test]
fn autocompact_pages_out_to_semantic_tier_for_llm_summary() {
let mut sm = LoopStateMachine::new(LoopPolicy {
max_tokens: 100,
max_turns: 100,
..LoopPolicy::default()
});
sm.start(RuntimeTask::new("test"));
for i in 0..10 {
sm.ctx.push_history(Message::user(format!("filler {i}")), 50);
}
assert!(sm.force_compact()); let obs = sm.take_observations();
let semantic_pageout = obs.iter().any(|o| matches!(
o,
KernelObservation::PageOut { tier_hint, archived, action: KernelPressureAction::AutoCompact, .. }
if tier_hint == "semantic" && !archived.is_empty()
));
assert!(
semantic_pageout,
"AutoCompact must page archived messages to the semantic tier for SDK LLM summary"
);
}
#[test]
fn memory_tool_proposal_emits_page_in_requested() {
let mut sm = sm();
sm.ctx.set_memory_enabled(true);
sm.start(RuntimeTask::new("test"));
let mut msg = Message::assistant("");
msg.tool_calls.push(ToolCall {
id: compact_str::CompactString::new("m1"),
name: compact_str::CompactString::new("memory"),
arguments: serde_json::json!({"query": "bugs", "top_k": 2}),
});
let action = sm.feed(LoopEvent::LLMResponse { message: msg });
assert!(matches!(action, LoopAction::ExecuteTools { .. }));
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(
o,
KernelObservation::PageInRequested { tool, query, .. }
if tool == "memory" && query == "bugs"
)));
}
#[test]
fn apply_page_in_populates_knowledge() {
let mut sm = sm();
sm.ctx.set_memory_enabled(true);
sm.apply_page_in(&[crate::mm::PageInEntry {
content: "recalled".to_string(),
tokens: Some(3),
source: Some("memory".to_string()),
}]);
assert!(!sm.ctx.partitions.knowledge.messages.is_empty());
}
#[test]
fn preload_history_and_drain_new_messages() {
let mut sm = sm();
let prior = vec![
Message::user("Hello from last time"),
Message::assistant("Hi! I remember."),
];
sm.preload_history(prior.clone());
assert_eq!(sm.ctx.partitions.history.messages.len(), 2);
sm.start(RuntimeTask::new("What did I say before?"));
let new_msgs = sm.drain_new_messages();
assert!(!new_msgs.is_empty());
assert!(new_msgs.iter().any(|m| {
m.content
.as_text()
.map(|t| t == "Proceed with the task described in [TASK STATE].")
.unwrap_or(false)
}));
assert_eq!(sm.ctx.partitions.task_state.goal, "What did I say before?");
assert!(!new_msgs.iter().any(|m| {
m.content
.as_text()
.map(|t| t.contains("Hello from last time"))
.unwrap_or(false)
}));
}
#[test]
fn tool_result_content_parts_preserved_as_json() {
use crate::types::message::Content;
use compact_str::CompactString;
let mut sm = sm();
sm.start(RuntimeTask::new("test"));
let mut msg = Message::assistant("");
msg.tool_calls.push(crate::types::message::ToolCall {
id: CompactString::new("c1"),
name: CompactString::new("my_tool"),
arguments: serde_json::json!({}),
});
sm.feed(LoopEvent::LLMResponse { message: msg });
let structured = Content::Parts(vec![ContentPart::Text {
text: "structured output".to_string(),
}]);
sm.feed(LoopEvent::ToolResults {
results: vec![ToolResult {
call_id: CompactString::new("c1"),
output: structured,
is_error: false,
is_fatal: false,
error_kind: None,
token_count: None,
}],
});
let tool_msgs: Vec<_> = sm
.ctx
.partitions
.history
.messages
.iter()
.filter(|m| matches!(m.role, crate::types::message::Role::Tool))
.collect();
assert!(
!tool_msgs.is_empty(),
"tool result message must be in history"
);
if let Content::Parts(parts) = &tool_msgs[0].content {
assert!(!parts.is_empty());
}
}
fn make_tool_schema(name: &str) -> ToolSchema {
ToolSchema {
name: compact_str::CompactString::new(name),
description: format!("tool {name}"),
parameters: serde_json::json!({"type": "object"}),
}
}
#[test]
fn milestone_contract_loads_and_reports_current_phase() {
let mut sm = sm();
let contract = crate::types::milestone::MilestoneContract::new()
.phase(
crate::types::milestone::MilestonePhase::new("phase-a")
.with_criterion("Output contains 'hello'"),
)
.phase(crate::types::milestone::MilestonePhase::new("phase-b"));
sm.load_milestone_contract(contract);
assert_eq!(sm.current_milestone_phase_id(), Some("phase-a"));
assert!(!sm.is_milestone_complete());
assert_eq!(
sm.current_milestone_criteria(),
&["Output contains 'hello'"]
);
}
#[test]
fn milestone_pass_advances_phase_and_emits_observation() {
let mut sm = sm();
let contract = crate::types::milestone::MilestoneContract::new()
.phase(crate::types::milestone::MilestonePhase::new("plan"))
.phase(crate::types::milestone::MilestonePhase::new("implement"));
sm.load_milestone_contract(contract);
sm.start(RuntimeTask::new("do the thing"));
let action = sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("plan drafted"),
});
assert!(
matches!(action, LoopAction::EvaluateMilestone { ref phase_id, .. } if phase_id == "plan"),
"expected EvaluateMilestone for 'plan', got {action:?}",
);
let action2 = sm.feed(LoopEvent::MilestoneResult {
result: crate::types::milestone::MilestoneCheckResult::pass("plan"),
});
assert!(
matches!(action2, LoopAction::CallLLM { .. }),
"expect CallLLM after milestone advance",
);
assert_eq!(sm.current_milestone_phase_id(), Some("implement"));
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(
o,
KernelObservation::MilestoneAdvanced { phase_id, .. } if phase_id == "plan"
)));
}
#[test]
fn milestone_fail_blocks_phase_and_emits_observation() {
let mut sm = sm();
let contract = crate::types::milestone::MilestoneContract::new()
.phase(crate::types::milestone::MilestonePhase::new("plan"));
sm.load_milestone_contract(contract);
sm.start(RuntimeTask::new("do the thing"));
sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("bad plan"),
});
let action = sm.feed(LoopEvent::MilestoneResult {
result: crate::types::milestone::MilestoneCheckResult::fail("plan", "missing evidence"),
});
assert!(
matches!(action, LoopAction::CallLLM { .. }),
"blocked run must return CallLLM"
);
assert_eq!(sm.current_milestone_phase_id(), Some("plan"));
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(
o,
KernelObservation::MilestoneBlocked { phase_id, reason, .. }
if phase_id == "plan" && reason.contains("missing evidence")
)));
}
#[test]
fn milestone_unlocks_capabilities_on_advance() {
let mut sm = sm();
let schema = make_tool_schema("deploy_tool");
let cap = crate::types::capability::CapabilityDescriptor::tool(schema);
let contract = crate::types::milestone::MilestoneContract::new()
.phase(crate::types::milestone::MilestonePhase::new("phase-a").unlocking(cap));
sm.load_milestone_contract(contract);
sm.start(RuntimeTask::new("build pipeline"));
assert!(
sm.ctx
.capabilities
.by_kind(crate::types::capability::CapabilityKind::Tool)
.is_empty()
);
sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("done"),
});
sm.feed(LoopEvent::MilestoneResult {
result: crate::types::milestone::MilestoneCheckResult::pass("phase-a"),
});
let tools = sm
.ctx
.capabilities
.by_kind(crate::types::capability::CapabilityKind::Tool);
assert!(
tools.iter().any(|c| c.id.as_str() == "deploy_tool"),
"deploy_tool should be unlocked after phase-a passes",
);
let obs = sm.take_observations();
let advanced = obs.iter().find_map(|o| {
if let KernelObservation::MilestoneAdvanced {
capabilities_unlocked,
..
} = o
{
Some(capabilities_unlocked)
} else {
None
}
});
assert!(advanced.is_some(), "MilestoneAdvanced observation expected");
assert!(advanced.unwrap().iter().any(|s| s.contains("deploy_tool")));
}
#[test]
fn all_phases_complete_terminates_run() {
let mut sm = sm();
let contract = crate::types::milestone::MilestoneContract::new()
.phase(crate::types::milestone::MilestonePhase::new("only-phase"));
sm.load_milestone_contract(contract);
sm.start(RuntimeTask::new("single milestone run"));
sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("ready"),
});
let done = sm.feed(LoopEvent::MilestoneResult {
result: crate::types::milestone::MilestoneCheckResult::pass("only-phase"),
});
assert!(sm.is_milestone_complete());
assert!(
matches!(done, LoopAction::Done { .. }),
"all phases done must produce Done"
);
}
#[test]
fn no_contract_terminates_normally() {
let mut sm = sm();
sm.start(RuntimeTask::new("simple task"));
let action = sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("answer"),
});
assert!(
matches!(action, LoopAction::Done { .. }),
"without milestone contract, text-only response must terminate: {action:?}",
);
}
#[test]
fn mount_unmount_capability_emits_observation() {
let mut sm = sm();
let schema = ToolSchema {
name: compact_str::CompactString::new("test_tool"),
description: "test description".to_string(),
parameters: serde_json::json!({ "type": "object" }),
};
let desc =
crate::types::capability::CapabilityDescriptor::tool(schema).with_version("1.0.0");
sm.mount_capability(desc, None, None);
let obs = sm.take_observations();
assert_eq!(obs.len(), 1);
if let KernelObservation::CapabilityChanged {
turn,
added,
removed,
change_kind,
capability_id,
version,
..
} = &obs[0]
{
assert_eq!(*turn, 0);
assert_eq!(added, &vec!["Tool:test_tool".to_string()]);
assert!(removed.is_empty());
assert_eq!(change_kind.as_deref(), Some("mount"));
assert_eq!(capability_id.as_deref(), Some("test_tool"));
assert_eq!(version.as_deref(), Some("1.0.0"));
} else {
panic!("Expected CapabilityChanged observation");
}
sm.unmount_capability(crate::types::capability::CapabilityKind::Tool, "test_tool");
let obs2 = sm.take_observations();
assert_eq!(obs2.len(), 1);
if let KernelObservation::CapabilityChanged {
turn,
added,
removed,
change_kind,
capability_id,
version,
..
} = &obs2[0]
{
assert_eq!(*turn, 0);
assert!(added.is_empty());
assert_eq!(removed, &vec!["Tool:test_tool".to_string()]);
assert_eq!(change_kind.as_deref(), Some("unmount"));
assert_eq!(capability_id.as_deref(), Some("test_tool"));
assert_eq!(version.as_deref(), Some("1.0.0"));
} else {
panic!("Expected CapabilityChanged observation");
}
}
#[test]
fn rollback_note_is_concise_by_default() {
let reason = RollbackReason::FatalToolError {
tool_name: "run_tests".to_string(),
error: "exit code 1".to_string(),
};
let note = crate::scheduler::rollback::build_rollback_note(&reason, false);
assert!(
!note.contains("[SYSTEM]"),
"default note must not contain [SYSTEM]: {note}"
);
assert!(
note.contains("run_tests"),
"note should name the tool: {note}"
);
}
#[test]
fn rollback_note_is_verbose_when_opted_in() {
let reason = RollbackReason::Timeout;
let note = crate::scheduler::rollback::build_rollback_note(&reason, true);
assert!(
note.starts_with("[SYSTEM] Transaction rollback:"),
"verbose note must use internal format: {note}"
);
}
fn sm_with_ask_user_rule() -> LoopStateMachine {
use crate::governance::permission::{PermissionAction, PermissionRule};
use crate::governance::pipeline::GovernancePipeline;
let mut sm = sm();
let mut pipeline = GovernancePipeline::new(PermissionAction::Allow);
pipeline.permission.add_rule(PermissionRule {
tool_pattern: "sensitive.*".into(),
action: PermissionAction::AskUser,
});
sm.set_governance(pipeline);
sm
}
#[test]
fn ask_user_enters_suspended_without_execute_tools() {
let mut sm = sm_with_ask_user_rule();
sm.start(RuntimeTask::new("test"));
let mut msg = Message::assistant("");
msg.tool_calls.push(ToolCall {
id: compact_str::CompactString::new("call_a"),
name: compact_str::CompactString::new("sensitive.read"),
arguments: serde_json::json!({}),
});
let action = sm.feed(LoopEvent::LLMResponse { message: msg });
assert!(matches!(action, LoopAction::AwaitingResume));
assert!(sm.is_suspended());
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(o, KernelObservation::Suspended { .. })));
assert!(obs.iter().any(|o| matches!(o, KernelObservation::ToolGated { .. })));
}
#[test]
fn resume_approved_emits_execute_tools() {
let mut sm = sm_with_ask_user_rule();
sm.start(RuntimeTask::new("test"));
let mut msg = Message::assistant("");
msg.tool_calls.push(ToolCall {
id: compact_str::CompactString::new("call_a"),
name: compact_str::CompactString::new("sensitive.read"),
arguments: serde_json::json!({}),
});
sm.feed(LoopEvent::LLMResponse { message: msg });
sm.take_observations();
let action = sm.resume_from_suspend(vec!["call_a".to_string()], vec![]);
match action {
LoopAction::ExecuteTools { calls } => assert_eq!(calls.len(), 1),
other => panic!("expected ExecuteTools, got {other:?}"),
}
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(o, KernelObservation::Resumed { .. })));
}
#[test]
fn resume_all_denied_reprompts_without_execute() {
let mut sm = sm_with_ask_user_rule();
sm.start(RuntimeTask::new("test"));
let mut msg = Message::assistant("");
msg.tool_calls.push(ToolCall {
id: compact_str::CompactString::new("call_a"),
name: compact_str::CompactString::new("sensitive.read"),
arguments: serde_json::json!({}),
});
sm.feed(LoopEvent::LLMResponse { message: msg });
sm.take_observations();
let action = sm.resume_from_suspend(vec![], vec!["call_a".to_string()]);
assert!(matches!(action, LoopAction::CallLLM { .. }));
}
#[test]
fn spawn_sub_agent_suspends_until_completed() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = AgentRunSpec::new(
AgentIdentity::sub_agent("child", "child-session"),
AgentRole::Implement,
"child task",
);
let action = sm.spawn_sub_agent(spec, "parent-sess");
assert!(matches!(action, LoopAction::AwaitingResume));
assert!(sm.is_suspended());
assert!(matches!(
sm.wait_reason(),
Some(WaitReason::SubAgentJoin(_))
));
let result = SubAgentResult {
agent_id: compact_str::CompactString::new("child"),
result: LoopResult {
termination: TerminationReason::Completed,
final_message: Some(Message::assistant("ok")),
turns_used: 1,
total_tokens_used: 1,
loop_continue: None,
classify_branch: None,
tournament_winner: None,
},
};
let resumed = sm.feed(LoopEvent::SubAgentCompleted { result });
assert!(matches!(resumed, LoopAction::CallLLM { .. }));
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(o, KernelObservation::Resumed { .. })));
assert_eq!(
sm.agent_process("child")
.expect("process")
.state,
crate::proc::ProcessState::Joined
);
}
#[test]
fn budget_exceeded_observation_on_max_turns() {
let mut sm = LoopStateMachine::new(LoopPolicy {
max_tokens: 128_000,
max_turns: 1,
..LoopPolicy::default()
});
sm.start(RuntimeTask::new("test"));
let action = sm.feed(LoopEvent::ToolResults { results: vec![] });
assert!(matches!(action, LoopAction::CallLLM { tools, .. } if tools.is_empty()));
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(
o,
KernelObservation::BudgetExceeded { budget, .. } if budget == "max_turns"
)));
let done = sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("final"),
});
assert!(matches!(done, LoopAction::Done { .. }));
}
#[test]
fn lifecycle_idle_before_start_is_ready() {
let sm = sm();
assert_eq!(sm.lifecycle(), TaskState::Ready);
assert_eq!(sm.wait_reason(), None);
}
#[test]
fn lifecycle_running_after_start() {
let mut sm = sm();
sm.start(RuntimeTask::new("hi"));
assert!(matches!(sm.phase, LoopPhase::Reason));
assert_eq!(sm.lifecycle(), TaskState::Running);
assert_eq!(sm.wait_reason(), None);
}
#[test]
fn lifecycle_suspended_on_ask_user_with_approval_wait() {
let mut sm = sm_with_ask_user_rule();
sm.start(RuntimeTask::new("test"));
let mut msg = Message::assistant("");
msg.tool_calls.push(ToolCall {
id: compact_str::CompactString::new("call_a"),
name: compact_str::CompactString::new("sensitive.read"),
arguments: serde_json::json!({}),
});
sm.feed(LoopEvent::LLMResponse { message: msg });
assert!(sm.is_suspended());
assert_eq!(sm.lifecycle(), TaskState::Suspended);
assert_eq!(sm.wait_reason(), Some(WaitReason::Approval));
}
#[test]
fn lifecycle_suspended_on_sub_agent_with_join_wait() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
let spec = AgentRunSpec::new(
AgentIdentity::sub_agent("child", "child-session"),
AgentRole::Implement,
"child task",
);
sm.spawn_sub_agent(spec, "parent-sess");
assert!(sm.is_suspended());
assert_eq!(sm.lifecycle(), TaskState::Suspended);
assert!(matches!(
sm.wait_reason(),
Some(WaitReason::SubAgentJoin(_))
));
}
#[test]
fn lifecycle_terminal_is_done_with_no_wait() {
let mut sm = sm();
sm.start(RuntimeTask::new("hi"));
let done = sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("final answer"),
});
assert!(matches!(done, LoopAction::Done { .. }));
assert!(sm.is_terminal());
assert!(sm.lifecycle().is_terminal());
assert_eq!(sm.wait_reason(), None);
}
#[test]
fn lifecycle_running_again_after_resume_from_suspend() {
let mut sm = sm_with_ask_user_rule();
sm.start(RuntimeTask::new("test"));
let mut msg = Message::assistant("");
msg.tool_calls.push(ToolCall {
id: compact_str::CompactString::new("call_a"),
name: compact_str::CompactString::new("sensitive.read"),
arguments: serde_json::json!({}),
});
sm.feed(LoopEvent::LLMResponse { message: msg });
assert_eq!(sm.lifecycle(), TaskState::Suspended);
sm.resume_from_suspend(vec!["call_a".to_string()], vec![]);
assert_eq!(sm.lifecycle(), TaskState::Running);
assert_eq!(sm.wait_reason(), None);
}
#[test]
fn budget_exceeded_observation_on_token_budget() {
use crate::types::result::TerminationReason;
let mut sm = LoopStateMachine::new(LoopPolicy {
max_tokens: 128_000,
max_total_tokens: 10,
..LoopPolicy::default()
});
sm.start(RuntimeTask::new("test"));
sm.take_observations();
let action = sm.feed(LoopEvent::ToolResults {
results: vec![ToolResult {
call_id: compact_str::CompactString::new("c"),
output: Content::Text("x".into()),
is_error: false,
is_fatal: false,
error_kind: None,
token_count: Some(20),
}],
});
assert!(matches!(action, LoopAction::CallLLM { tools, .. } if tools.is_empty()));
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(
o,
KernelObservation::BudgetExceeded { budget, .. } if budget == "token_budget"
)));
let done = sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("final"),
});
assert!(matches!(
done,
LoopAction::Done { result } if result.termination == TerminationReason::TokenBudget
));
}
#[test]
fn budget_exceeded_observation_on_wall_time() {
use crate::types::result::TerminationReason;
let mut sm = LoopStateMachine::new(LoopPolicy {
max_tokens: 128_000,
max_wall_ms: Some(1_000),
..LoopPolicy::default()
});
sm.set_observed_time(0); sm.start(RuntimeTask::new("test"));
sm.take_observations();
sm.set_observed_time(2_000); let action = sm.feed(LoopEvent::ToolResults { results: vec![] });
assert!(matches!(action, LoopAction::CallLLM { tools, .. } if tools.is_empty()));
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(
o,
KernelObservation::BudgetExceeded { budget, .. } if budget == "wall_time"
)));
let done = sm.feed(LoopEvent::LLMResponse {
message: Message::assistant("final"),
});
assert!(matches!(
done,
LoopAction::Done { result } if result.termination == TerminationReason::Timeout
));
}
#[test]
fn agent_process_view_shape_is_pinned_across_spawn_and_join() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
use crate::proc::ProcessState;
use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = AgentRunSpec::new(
AgentIdentity::sub_agent("child", "child-session"),
AgentRole::Implement,
"child task",
);
sm.spawn_sub_agent(spec, "parent-sess");
let p = sm.agent_process("child").expect("process after spawn");
assert_eq!(p.agent_id.as_str(), "child");
assert_eq!(p.parent_session_id.as_str(), "parent-sess");
assert_eq!(p.role, AgentRole::Implement);
assert_eq!(p.state, ProcessState::Running);
assert!(p.result.is_none());
let isolation_at_spawn = p.isolation;
let inheritance_at_spawn = p.context_inheritance;
sm.feed(LoopEvent::SubAgentCompleted {
result: SubAgentResult {
agent_id: compact_str::CompactString::new("child"),
result: LoopResult {
termination: TerminationReason::Completed,
final_message: Some(Message::assistant("done")),
turns_used: 2,
total_tokens_used: 42,
loop_continue: None,
classify_branch: None,
tournament_winner: None,
},
},
});
let p = sm.agent_process("child").expect("process after join");
assert_eq!(p.state, ProcessState::Joined);
assert_eq!(p.isolation, isolation_at_spawn);
assert_eq!(p.context_inheritance, inheritance_at_spawn);
let result = p.result.as_ref().expect("join result");
assert_eq!(result.result.termination, TerminationReason::Completed);
assert_eq!(result.result.turns_used, 2);
assert_eq!(result.result.total_tokens_used, 42);
assert_eq!(sm.agent_processes().len(), 1);
}
#[test]
fn spawn_quota_denies_beyond_concurrency_limit() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
let mut sm = sm();
sm.set_resource_quota(crate::governance::quota::ResourceQuota {
max_concurrent_subagents: Some(1),
..Default::default()
});
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let a1 = sm.spawn_sub_agent(
AgentRunSpec::new(AgentIdentity::sub_agent("a", "a-sess"), AgentRole::Implement, "t"),
"parent-sess",
);
assert!(matches!(a1, LoopAction::AwaitingResume));
assert_eq!(sm.task_table().children_of("root").len(), 1);
sm.take_observations();
let a2 = sm.spawn_sub_agent(
AgentRunSpec::new(AgentIdentity::sub_agent("b", "b-sess"), AgentRole::Implement, "t"),
"parent-sess",
);
assert!(matches!(a2, LoopAction::CallLLM { .. }));
assert_eq!(sm.task_table().children_of("root").len(), 1);
assert!(sm.agent_process("b").is_none());
}
#[test]
fn spawn_quota_denies_when_depth_exceeds_limit() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
let mut sm = sm();
sm.set_resource_quota(crate::governance::quota::ResourceQuota {
max_spawn_depth: Some(0), ..Default::default()
});
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let action = sm.spawn_sub_agent(
AgentRunSpec::new(AgentIdentity::sub_agent("c", "c-sess"), AgentRole::Implement, "t"),
"parent-sess",
);
assert!(matches!(action, LoopAction::CallLLM { .. }));
assert!(!sm.is_suspended());
assert!(sm.agent_process("c").is_none());
}
#[test]
fn no_quota_leaves_spawn_unconditionally_allowed() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let action = sm.spawn_sub_agent(
AgentRunSpec::new(AgentIdentity::sub_agent("d", "d-sess"), AgentRole::Implement, "t"),
"parent-sess",
);
assert!(matches!(action, LoopAction::AwaitingResume));
assert!(sm.is_suspended());
}
#[test]
fn memory_write_quota_rate_limits_within_window() {
use crate::mm::memory::{MemoryMetadata, MemoryWriteRequest};
use crate::syscall::{Disposition, Syscall};
let mut sm = sm();
sm.set_resource_quota(crate::governance::quota::ResourceQuota {
memory_writes_per_window: Some((2, 60_000)), ..Default::default()
});
sm.set_observed_time(1_000);
let req = MemoryWriteRequest {
metadata: MemoryMetadata { name: "m".into(), description: "d".into(), ..Default::default() },
content: "c".to_string(),
};
assert!(sm.gate_syscall(&Syscall::WriteMemory(req.clone())).is_allowed());
assert!(sm.gate_syscall(&Syscall::WriteMemory(req.clone())).is_allowed());
assert!(matches!(
sm.gate_syscall(&Syscall::WriteMemory(req.clone())),
Disposition::RateLimited { .. }
));
sm.set_observed_time(1_000 + 60_000);
assert!(sm.gate_syscall(&Syscall::WriteMemory(req)).is_allowed());
}
#[test]
fn large_tool_result_is_spooled_with_preview_and_observation() {
let mut sm = sm();
sm.start(RuntimeTask::new("task"));
sm.take_observations();
let huge = "Z".repeat(60 * 1024); sm.feed(LoopEvent::ToolResults {
results: vec![ToolResult {
call_id: compact_str::CompactString::new("big"),
output: Content::Text(huge.clone()),
is_error: false,
is_fatal: false,
error_kind: None,
token_count: None,
}],
});
let obs = sm.take_observations();
assert!(obs.iter().any(|o| matches!(
o,
KernelObservation::LargeResultSpooled { call_id, original_size, spool_ref: None, .. }
if call_id == "big" && *original_size == (60 * 1024)
)));
let stored: usize = sm
.ctx
.partitions
.history
.messages
.iter()
.filter_map(|m| match &m.content {
Content::Parts(parts) => Some(parts),
_ => None,
})
.flatten()
.filter_map(|p| match p {
ContentPart::ToolResult { output, .. } => Some(output.len()),
_ => None,
})
.sum();
assert!(stored < huge.len(), "spooled output should be a small preview");
assert!(stored < 8 * 1024, "preview should be near the 2 KiB budget");
}
#[test]
fn small_tool_result_is_not_spooled() {
let mut sm = sm();
sm.start(RuntimeTask::new("task"));
sm.take_observations();
sm.feed(LoopEvent::ToolResults {
results: vec![ToolResult {
call_id: compact_str::CompactString::new("ok"),
output: Content::Text("small output".into()),
is_error: false,
is_fatal: false,
error_kind: None,
token_count: None,
}],
});
let obs = sm.take_observations();
assert!(!obs
.iter()
.any(|o| matches!(o, KernelObservation::LargeResultSpooled { .. })));
}
#[test]
fn task_table_holds_root_after_start() {
let mut sm = sm();
assert_eq!(sm.task_table().all().len(), 1);
assert_eq!(sm.task_table().get("root").unwrap().state, TaskState::Ready);
sm.start(RuntimeTask::new("hi"));
let root = sm.task_table().get("root").expect("root task");
assert_eq!(root.state, TaskState::Running);
assert!(root.parent.is_none());
assert_eq!(sm.task_table().all().len(), 1);
}
#[test]
fn task_table_tracks_sub_agent_lifecycle() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = AgentRunSpec::new(
AgentIdentity::sub_agent("child", "child-session"),
AgentRole::Implement,
"child task",
);
sm.spawn_sub_agent(spec, "parent-sess");
let child = sm.task_table().get("child").expect("child task");
assert_eq!(child.state, TaskState::Running);
assert_eq!(child.parent.as_deref(), Some("root"));
assert_eq!(sm.task_table().children_of("root").len(), 1);
assert_eq!(sm.task_table().all().len(), 2);
sm.feed(LoopEvent::SubAgentCompleted {
result: SubAgentResult {
agent_id: compact_str::CompactString::new("child"),
result: LoopResult {
termination: TerminationReason::Completed,
final_message: Some(Message::assistant("ok")),
turns_used: 1,
total_tokens_used: 1,
loop_continue: None,
classify_branch: None,
tournament_winner: None,
},
},
});
let child = sm.task_table().get("child").expect("child task");
assert_eq!(child.state, TaskState::Done(TerminationReason::Completed));
}
fn wf_completed(agent_id: &str) -> crate::types::result::SubAgentResult {
use crate::types::result::{LoopResult, SubAgentResult, TerminationReason};
SubAgentResult {
agent_id: compact_str::CompactString::new(agent_id),
result: LoopResult {
termination: TerminationReason::Completed,
final_message: Some(Message::assistant("ok")),
turns_used: 1,
total_tokens_used: 1,
loop_continue: None,
classify_branch: None,
tournament_winner: None,
},
}
}
fn wf_completed_stop(agent_id: &str) -> crate::types::result::SubAgentResult {
let mut r = wf_completed(agent_id);
r.result.loop_continue = Some(false);
r
}
fn wf_completed_branch(agent_id: &str, label: &str) -> crate::types::result::SubAgentResult {
let mut r = wf_completed(agent_id);
r.result.classify_branch = Some(label.to_string());
r
}
fn wf_completed_winner(agent_id: &str, winner: &str) -> crate::types::result::SubAgentResult {
let mut r = wf_completed(agent_id);
r.result.tournament_winner = Some(winner.to_string());
r
}
fn last_batch_spawns(
obs: &[KernelObservation],
) -> Vec<crate::orchestration::workflow::WorkflowSpawnInfo> {
obs.iter()
.rev()
.find_map(|o| match o {
KernelObservation::WorkflowBatchSpawned { nodes, .. } => Some(nodes.clone()),
_ => None,
})
.unwrap_or_default()
}
fn count_spawned(obs: &[KernelObservation]) -> usize {
obs.iter()
.filter(|o| matches!(o, KernelObservation::AgentProcessChanged { state, .. } if state == "running"))
.count()
}
#[test]
fn workflow_fanout_spawns_batch_then_synthesizes() {
use crate::orchestration::workflow::fanout_synthesize;
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = fanout_synthesize(
vec![
RuntimeTask::new("w0"),
RuntimeTask::new("w1"),
RuntimeTask::new("w2"),
],
RuntimeTask::new("synth"),
);
let action = sm.load_workflow(spec, "sess");
assert!(matches!(action, LoopAction::AwaitingResume));
assert!(sm.workflow_active());
assert!(sm.is_suspended());
let obs = sm.take_observations();
assert_eq!(count_spawned(&obs), 3);
let suspended = obs.iter().find_map(|o| match o {
KernelObservation::Suspended {
reason,
pending_calls,
..
} => Some((reason.clone(), pending_calls.len())),
_ => None,
});
assert_eq!(suspended, Some(("workflow_batch".to_string(), 3)));
assert!(matches!(
sm.feed(LoopEvent::SubAgentCompleted {
result: wf_completed("wf-node0")
}),
LoopAction::AwaitingResume
));
assert!(matches!(
sm.feed(LoopEvent::SubAgentCompleted {
result: wf_completed("wf-node1")
}),
LoopAction::AwaitingResume
));
assert!(sm.workflow_active());
sm.take_observations();
assert!(matches!(
sm.feed(LoopEvent::SubAgentCompleted {
result: wf_completed("wf-node2")
}),
LoopAction::AwaitingResume
));
assert!(sm.workflow_active());
let obs = sm.take_observations();
assert_eq!(count_spawned(&obs), 1); assert!(sm.agent_process("wf-node3").is_some());
let final_action = sm.feed(LoopEvent::SubAgentCompleted {
result: wf_completed("wf-node3"),
});
assert!(matches!(final_action, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
assert!(
sm.take_observations()
.iter()
.any(|o| matches!(o, KernelObservation::Resumed { .. }))
);
}
#[test]
fn workflow_linear_chain_spawns_one_at_a_time() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("A"), AgentRole::Implement),
WorkflowNode::new(RuntimeTask::new("B"), AgentRole::Implement).with_depends_on(vec![0]),
WorkflowNode::new(RuntimeTask::new("C"), AgentRole::Implement).with_depends_on(vec![1]),
]);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 1);
sm.feed(LoopEvent::SubAgentCompleted {
result: wf_completed("wf-node0"),
});
assert_eq!(count_spawned(&sm.take_observations()), 1); assert!(sm.workflow_active());
sm.feed(LoopEvent::SubAgentCompleted {
result: wf_completed("wf-node1"),
});
assert_eq!(count_spawned(&sm.take_observations()), 1);
let done = sm.feed(LoopEvent::SubAgentCompleted {
result: wf_completed("wf-node2"),
});
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn workflow_node_concurrency_limit_serializes_via_defer() {
use crate::orchestration::workflow::fanout_synthesize;
let mut sm = sm();
sm.set_resource_quota(crate::governance::quota::ResourceQuota {
max_concurrent_subagents: Some(1),
..Default::default()
});
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = fanout_synthesize(
vec![RuntimeTask::new("w0"), RuntimeTask::new("w1")],
RuntimeTask::new("synth"),
);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 1); assert!(sm.agent_process("wf-node0").is_some());
assert!(sm.agent_process("wf-node1").is_none());
assert!(matches!(
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0") }),
LoopAction::AwaitingResume
));
assert_eq!(count_spawned(&sm.take_observations()), 1); assert!(sm.agent_process("wf-node1").is_some());
assert!(sm.workflow_active());
assert!(matches!(
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node1") }),
LoopAction::AwaitingResume
));
assert_eq!(count_spawned(&sm.take_observations()), 1); assert!(sm.agent_process("wf-node2").is_some());
let done = sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node2") });
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn submit_workflow_nodes_appends_and_spawns_mid_run() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![WorkflowNode::new(
RuntimeTask::new("root"),
AgentRole::Implement,
)]);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 1); assert!(sm.agent_process("wf-node0").is_some());
let action = sm.submit_workflow_nodes(
vec![WorkflowNode::new(
RuntimeTask::new("discovered-work"),
AgentRole::Implement,
)],
None,
);
assert!(matches!(action, LoopAction::AwaitingResume));
assert_eq!(count_spawned(&sm.take_observations()), 1); assert!(sm.agent_process("wf-node1").is_some());
assert!(sm.workflow_active());
sm.submit_workflow_nodes(vec![], None);
assert_eq!(count_spawned(&sm.take_observations()), 0);
assert!(matches!(
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0") }),
LoopAction::AwaitingResume
));
assert!(sm.workflow_active(), "still running the submitted node");
let done = sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node1") });
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn submit_workflow_nodes_denied_past_max_workflow_nodes_quota() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.set_resource_quota(crate::governance::quota::ResourceQuota {
max_workflow_nodes: Some(1),
..Default::default()
});
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![WorkflowNode::new(
RuntimeTask::new("root"),
AgentRole::Implement,
)]);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 1);
sm.submit_workflow_nodes(
vec![WorkflowNode::new(RuntimeTask::new("more"), AgentRole::Implement)],
None,
);
assert_eq!(count_spawned(&sm.take_observations()), 0);
assert!(sm.agent_process("wf-node1").is_none(), "denied submission does not spawn");
let done = sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0") });
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn submit_workflow_bootstraps_dag_when_no_workflow_active() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
assert!(!sm.workflow_active(), "no workflow before authoring");
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("a"), AgentRole::Implement),
WorkflowNode::new(RuntimeTask::new("b"), AgentRole::Implement),
]);
let action = sm.submit_workflow(spec, "sess", None);
assert!(matches!(action, LoopAction::AwaitingResume { .. }));
assert!(sm.workflow_active(), "spec bootstrapped the DAG");
assert_eq!(count_spawned(&sm.take_observations()), 2);
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0") });
let done = sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node1") });
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn submit_workflow_flattens_onto_active_workflow() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![WorkflowNode::new(
RuntimeTask::new("root"),
AgentRole::Implement,
)]);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 1);
let more = WorkflowSpec::new(vec![WorkflowNode::new(
RuntimeTask::new("more"),
AgentRole::Implement,
)]);
sm.submit_workflow(more, "sess", None);
assert_eq!(count_spawned(&sm.take_observations()), 1); assert!(sm.agent_process("wf-node1").is_some(), "flattened node spawned in the same DAG");
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0") });
let done = sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node1") });
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn submit_workflow_denied_past_max_workflow_nodes_quota() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.set_resource_quota(crate::governance::quota::ResourceQuota {
max_workflow_nodes: Some(2),
..Default::default()
});
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("a"), AgentRole::Implement),
WorkflowNode::new(RuntimeTask::new("b"), AgentRole::Implement),
WorkflowNode::new(RuntimeTask::new("c"), AgentRole::Implement),
]);
let action = sm.submit_workflow(spec, "sess", None);
assert!(matches!(action, LoopAction::AwaitingResume { .. }));
assert_eq!(count_spawned(&sm.take_observations()), 0);
assert!(!sm.workflow_active(), "denied authoring installs no workflow");
}
#[test]
fn workflow_batch_spawned_carries_remaining_budget_under_quota() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.set_resource_quota(crate::governance::quota::ResourceQuota {
max_workflow_nodes: Some(5),
max_concurrent_subagents: Some(3),
..Default::default()
});
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![WorkflowNode::new(
RuntimeTask::new("root"),
AgentRole::Implement,
)]);
sm.load_workflow(spec, "sess");
let budget = sm
.take_observations()
.into_iter()
.find_map(|o| match o {
KernelObservation::WorkflowBatchSpawned { budget, .. } => budget,
_ => None,
})
.expect("budget present under an active quota");
assert_eq!(budget.nodes_used, 1);
assert_eq!(budget.nodes_max, Some(5));
assert_eq!(budget.nodes_remaining, Some(4));
assert_eq!(budget.running_subagents, 1);
assert_eq!(budget.max_concurrent_subagents, Some(3));
assert_eq!(budget.concurrency_remaining, Some(2));
assert_eq!(budget.tokens_used, 0);
assert!(budget.tokens_max.is_some());
assert_eq!(budget.tokens_remaining, budget.tokens_max);
}
#[test]
fn workflow_batch_spawned_omits_budget_without_quota() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![WorkflowNode::new(
RuntimeTask::new("root"),
AgentRole::Implement,
)]);
sm.load_workflow(spec, "sess");
let had_budget = sm.take_observations().into_iter().any(|o| {
matches!(o, KernelObservation::WorkflowBatchSpawned { budget: Some(_), .. })
});
assert!(!had_budget, "no quota ⇒ no budget signal");
}
#[test]
fn quarantined_node_output_is_labeled_crossing_into_trusted_context() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("read-untrusted"), AgentRole::Explore).quarantined(),
]);
sm.load_workflow(spec, "sess");
sm.take_observations();
assert!(sm.agent_process("wf-node0").is_some(), "quarantined ReadOnly node spawns");
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0") });
assert!(
sm.ctx
.partitions
.signals
.iter()
.any(|s| s.contains("[quarantined sub-agent wf-node0]")),
"quarantined output is labeled untrusted-origin on crossing: {:?}",
sm.ctx.partitions.signals
);
}
#[test]
fn quarantined_node_with_write_isolation_is_denied_in_kernel() {
use crate::orchestration::workflow::{NodeTrust, WorkflowNode, WorkflowSpec};
use crate::types::agent::{AgentIsolation, AgentRole};
let mut sm = sm();
sm.start(RuntimeTask::new("triage untrusted input"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("read untrusted webpage"), AgentRole::Explore)
.with_isolation(AgentIsolation::Shared)
.with_trust(NodeTrust::Quarantined),
WorkflowNode::new(RuntimeTask::new("act on it"), AgentRole::Implement)
.with_depends_on(vec![0]),
]);
let action = sm.load_workflow(spec, "sess");
assert!(matches!(action, LoopAction::CallLLM { .. }));
assert!(sm.agent_process("wf-node0").is_none(), "quarantined+write node denied");
assert!(sm.agent_process("wf-node1").is_none(), "dependent starves");
assert!(!sm.workflow_active());
let obs = sm.take_observations();
assert!(
obs.iter().any(|o| matches!(o, KernelObservation::Rollbacked { .. }))
|| sm.ctx.partitions.signals.iter().any(|s| s.to_lowercase().contains("quarantine")),
"quarantine denial is surfaced"
);
}
#[test]
fn loop_node_reruns_then_unblocks_dependent_via_drive_workflow() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("iterate to convergence"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("refine"), AgentRole::Implement).with_loop(2),
WorkflowNode::new(RuntimeTask::new("finalize"), AgentRole::Implement)
.with_depends_on(vec![0]),
]);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 1);
assert!(sm.agent_process("wf-node0-i0").is_some());
assert!(sm.agent_process("wf-node1").is_none(), "dependent waits for the loop");
assert!(matches!(
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0-i0") }),
LoopAction::AwaitingResume
));
assert_eq!(count_spawned(&sm.take_observations()), 1);
assert!(sm.agent_process("wf-node0-i1").is_some(), "second iteration spawned");
assert!(sm.agent_process("wf-node1").is_none());
assert!(matches!(
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0-i1") }),
LoopAction::AwaitingResume
));
assert_eq!(count_spawned(&sm.take_observations()), 1);
assert!(sm.agent_process("wf-node1").is_some(), "dependent spawns after the loop ends");
let done = sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node1") });
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn loop_node_stops_early_on_loop_continue_false() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("search until no new findings"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("probe"), AgentRole::Explore).with_loop(5),
WorkflowNode::new(RuntimeTask::new("report"), AgentRole::Plan).with_depends_on(vec![0]),
]);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 1);
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0-i0") });
assert_eq!(count_spawned(&sm.take_observations()), 1); assert!(sm.agent_process("wf-node1").is_none(), "dependent still waiting");
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed_stop("wf-node0-i1") });
assert_eq!(count_spawned(&sm.take_observations()), 1);
assert!(sm.agent_process("wf-node1").is_some(), "early stop promoted the dependent");
assert!(sm.agent_process("wf-node0-i2").is_none(), "no third iteration ran");
let done = sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node1") });
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn classify_node_routes_to_chosen_branch_and_prunes_others() {
use crate::orchestration::workflow::{ClassifyBranch, WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("triage this ticket"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("classify ticket"), AgentRole::Plan).with_classify(vec![
ClassifyBranch { label: "bug".into(), nodes: vec![1] },
ClassifyBranch { label: "feature".into(), nodes: vec![2] },
]),
WorkflowNode::new(RuntimeTask::new("fix the bug"), AgentRole::Implement)
.with_depends_on(vec![0]),
WorkflowNode::new(RuntimeTask::new("build the feature"), AgentRole::Implement)
.with_depends_on(vec![0]),
]);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 1);
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed_branch("wf-node0", "bug") });
assert_eq!(count_spawned(&sm.take_observations()), 1);
assert!(sm.agent_process("wf-node1").is_some(), "chosen branch runs");
assert!(sm.agent_process("wf-node2").is_none(), "other branch pruned");
let done = sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node1") });
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn tournament_node_drives_bracket_end_to_end_via_drive_workflow() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("pick the strongest candidate"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("which ad converts best?"), AgentRole::Plan)
.with_tournament(vec![
RuntimeTask::new("draft ad A"),
RuntimeTask::new("draft ad B"),
RuntimeTask::new("draft ad C"),
RuntimeTask::new("draft ad D"),
]),
WorkflowNode::new(RuntimeTask::new("ship the winner"), AgentRole::Implement)
.with_depends_on(vec![0]),
]);
sm.load_workflow(spec, "sess");
let obs = sm.take_observations();
assert_eq!(count_spawned(&obs), 4, "four entrant generators");
assert!(sm.agent_process("wf-node0").is_none(), "controller spawns no agent of its own");
for id in ["wf-node2", "wf-node3", "wf-node4", "wf-node5"] {
assert!(sm.agent_process(id).is_some(), "{id} entrant spawned");
}
assert!(last_batch_spawns(&obs).iter().all(|n| n.judge_match.is_none()), "entrants aren't judges");
for id in ["wf-node2", "wf-node3", "wf-node4"] {
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed(id) });
assert_eq!(count_spawned(&sm.take_observations()), 0, "no judges until every entrant is in");
}
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node5") });
let r1 = sm.take_observations();
assert_eq!(count_spawned(&r1), 2, "two round-1 judges (wf-node6, wf-node7)");
let r1_matches: Vec<_> =
last_batch_spawns(&r1).iter().filter_map(|n| n.judge_match.clone()).collect();
assert_eq!(r1_matches.len(), 2, "each round-1 judge carries a match");
assert_eq!(r1_matches[0].left, "wf-node2");
assert_eq!(r1_matches[0].right, "wf-node3");
assert_eq!(r1_matches[1].left, "wf-node4");
assert_eq!(r1_matches[1].right, "wf-node5");
assert!(sm.agent_process("wf-node1").is_none(), "dependent waits for the whole bracket");
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed_winner("wf-node6", "wf-node2") });
assert_eq!(count_spawned(&sm.take_observations()), 0, "final waits for both round-1 judges");
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed_winner("wf-node7", "wf-node4") });
let r2 = sm.take_observations();
assert_eq!(count_spawned(&r2), 1, "one final judge (wf-node8)");
let r2_match = last_batch_spawns(&r2)[0].judge_match.clone().expect("final judge match");
assert_eq!(r2_match.left, "wf-node2");
assert_eq!(r2_match.right, "wf-node4");
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed_winner("wf-node8", "wf-node4") });
assert_eq!(count_spawned(&sm.take_observations()), 1, "dependent spawns after the bracket");
assert!(sm.agent_process("wf-node1").is_some(), "the 'ship the winner' dependent");
let done = sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node1") });
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn quarantined_node_read_only_is_allowed() {
use crate::orchestration::workflow::{NodeTrust, WorkflowNode, WorkflowSpec};
use crate::types::agent::{AgentIsolation, AgentRole};
let mut sm = sm();
sm.start(RuntimeTask::new("triage"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("read untrusted webpage"), AgentRole::Explore)
.with_isolation(AgentIsolation::ReadOnly)
.with_trust(NodeTrust::Quarantined),
]);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 1, "read-only quarantined node spawns");
assert!(sm.agent_process("wf-node0").is_some());
}
#[test]
fn workflow_run_queue_unblocks_dependents_per_node() {
use crate::orchestration::workflow::{WorkflowNode, WorkflowSpec};
use crate::types::agent::AgentRole;
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
let spec = WorkflowSpec::new(vec![
WorkflowNode::new(RuntimeTask::new("A"), AgentRole::Implement),
WorkflowNode::new(RuntimeTask::new("B"), AgentRole::Implement),
WorkflowNode::new(RuntimeTask::new("C"), AgentRole::Implement).with_depends_on(vec![0, 1]),
WorkflowNode::new(RuntimeTask::new("D"), AgentRole::Implement).with_depends_on(vec![0]),
]);
sm.load_workflow(spec, "sess");
assert_eq!(count_spawned(&sm.take_observations()), 2);
sm.feed(LoopEvent::SubAgentCompleted { result: wf_completed("wf-node0") });
let obs = sm.take_observations();
assert_eq!(count_spawned(&obs), 1, "D unblocks on A alone, not waiting for B");
assert!(sm.agent_process("wf-node3").is_some(), "D (node 3) is running");
assert!(sm.agent_process("wf-node2").is_none(), "C still waits on B");
assert!(sm.workflow_active());
}
#[test]
fn single_spawn_path_leaves_workflow_inactive() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
sm.take_observations();
sm.spawn_sub_agent(
AgentRunSpec::new(
AgentIdentity::sub_agent("child", "child-session"),
AgentRole::Implement,
"child task",
),
"parent-sess",
);
assert!(!sm.workflow_active());
let done = sm.feed(LoopEvent::SubAgentCompleted {
result: wf_completed("child"),
});
assert!(matches!(done, LoopAction::CallLLM { .. }));
assert!(!sm.workflow_active());
}
#[test]
fn snapshot_roundtrip_preserves_turn_and_tokens() {
let mut sm = sm();
sm.start(RuntimeTask::new("test task"));
sm.turn = 5;
sm.total_tokens = 1000;
let snap = sm.snapshot();
let restored = LoopStateMachine::restore(&snap);
assert_eq!(restored.turn, 5);
assert_eq!(restored.total_tokens, 1000);
}
#[test]
fn snapshot_roundtrip_preserves_task_table() {
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
let _ = sm.spawn_sub_agent(
AgentRunSpec::new(
AgentIdentity::sub_agent("child", "child-session"),
AgentRole::Implement,
"child task",
),
"parent-sess",
);
let snap = sm.snapshot();
assert_eq!(snap.tasks.len(), 2);
let restored = LoopStateMachine::restore(&snap);
assert_eq!(restored.task_table().all().len(), 2);
assert!(restored.task_table().get("root").is_some());
assert!(restored.task_table().get("child").is_some());
}
#[test]
fn snapshot_roundtrip_preserves_context_messages() {
let mut sm = sm();
sm.ctx.push_history(Message::user("test message"), 10);
let snap = sm.snapshot();
assert_eq!(snap.context.history_messages.len(), 1);
let restored = LoopStateMachine::restore(&snap);
assert_eq!(restored.ctx.partitions.history.messages.len(), 1);
assert_eq!(
restored.ctx.partitions.history.messages[0]
.content
.as_text(),
Some("test message")
);
}
#[test]
fn snapshot_roundtrip_preserves_signals() {
let mut sm = sm();
sm.ctx.push_signal("[TEST] test signal".to_string());
let snap = sm.snapshot();
assert_eq!(snap.context.signals.len(), 1);
assert_eq!(snap.context.signals[0], "[TEST] test signal");
let restored = LoopStateMachine::restore(&snap);
assert_eq!(restored.ctx.partitions.signals.len(), 1);
assert_eq!(restored.ctx.partitions.signals[0], "[TEST] test signal");
}
#[test]
fn snapshot_roundtrip_preserves_task_state() {
let mut sm = sm();
sm.ctx.init_task("test goal".to_string(), vec![]);
sm.ctx.partitions.task_state.progress = "50% done".to_string();
let snap = sm.snapshot();
assert_eq!(snap.context.task_goal.as_deref(), Some("test goal"));
assert_eq!(
snap.context.task_progress.as_deref(),
Some("50% done")
);
let restored = LoopStateMachine::restore(&snap);
assert_eq!(restored.ctx.partitions.task_state.goal, "test goal");
assert_eq!(restored.ctx.partitions.task_state.progress, "50% done");
}
#[test]
fn snapshot_roundtrip_serialization() {
let mut sm = sm();
sm.start(RuntimeTask::new("test"));
sm.turn = 3;
sm.total_tokens = 500;
let snap = sm.snapshot();
let json = serde_json::to_string(&snap).expect("serialize snapshot");
let restored_snap: crate::runtime::snapshot::KernelSnapshot =
serde_json::from_str(&json).expect("deserialize snapshot");
assert_eq!(restored_snap.turn, 3);
assert_eq!(restored_snap.total_tokens, 500);
let restored = LoopStateMachine::restore(&restored_snap);
assert_eq!(restored.turn, 3);
assert_eq!(restored.total_tokens, 500);
}
#[test]
fn snapshot_roundtrip_with_suspended_state() {
use crate::types::agent::{AgentIdentity, AgentRole, AgentRunSpec};
let mut sm = sm();
sm.start(RuntimeTask::new("parent"));
let _ = sm.spawn_sub_agent(
AgentRunSpec::new(
AgentIdentity::sub_agent("child", "child-session"),
AgentRole::Implement,
"child task",
),
"parent-sess",
);
let snap = sm.snapshot();
let restored = LoopStateMachine::restore(&snap);
assert!(matches!(
restored.lifecycle(),
TaskState::Suspended
));
assert!(matches!(
restored.wait_reason(),
Some(crate::scheduler::tcb::WaitReason::SubAgentJoin(_))
));
}
#[test]
fn snapshot_restore_independence() {
let mut sm = sm();
sm.turn = 5;
sm.ctx.push_signal("original".to_string());
let snap = sm.snapshot();
let mut restored = LoopStateMachine::restore(&snap);
restored.turn = 10;
restored.ctx.push_signal("modified".to_string());
assert_eq!(sm.turn, 5);
assert_eq!(sm.ctx.partitions.signals.len(), 1);
assert_eq!(restored.turn, 10);
assert_eq!(restored.ctx.partitions.signals.len(), 2);
}
#[test]
fn snapshot_preserves_max_tokens_and_sprint() {
let mut sm = sm();
sm.ctx.max_tokens = 64_000;
sm.ctx.sprint = 2;
let snap = sm.snapshot();
assert_eq!(snap.context.max_tokens, 64_000);
assert_eq!(snap.context.sprint, 2);
let restored = LoopStateMachine::restore(&snap);
assert_eq!(restored.ctx.max_tokens, 64_000);
assert_eq!(restored.ctx.sprint, 2);
}