use super::*;
use crate::agent::agent_loop::hooks::{
AfterToolCallContext, AfterToolCallFn, GetSteeringMessagesFn, PrepareNextTurnFn,
ShouldStopAfterTurnFn,
};
use crate::agent::agent_loop::message::{StreamEvent, UserMessage};
use crate::agent::agent_loop::result::AfterToolCallResult;
use crate::agent::agent_loop::stream::StreamFn;
use crate::agent::agent_loop::tool::{AbortSignal, LoopTool, LoopToolUpdate};
use crate::agent::agent_loop::types::{ConvertToLlmFn, LoopConfig, ToolExecutionMode, TurnUpdate};
use std::pin::Pin;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
fn empty_checkpoint_slot() -> super::CheckpointSlot {
std::sync::Arc::new(std::sync::Mutex::new(None))
}
fn canned_factory(responses: Vec<AssistantMessage>) -> StreamFn {
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let responses = std::sync::Arc::new(responses);
std::sync::Arc::new(move |_ctx, _opts| {
let n = counter.fetch_add(1, Ordering::SeqCst);
let msg = responses.get(n).cloned().unwrap_or_else(|| {
AssistantMessage::new(
vec![ContentBlock::Text {
text: "end".to_string(),
}],
StopReason::Stop,
)
});
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
})
}
fn capturing_factory(
responses: Vec<AssistantMessage>,
seen: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
) -> StreamFn {
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let responses = std::sync::Arc::new(responses);
std::sync::Arc::new(move |ctx, _opts| {
seen.lock()
.unwrap()
.push(serde_json::to_string(&ctx.messages).unwrap_or_default());
let n = counter.fetch_add(1, Ordering::SeqCst);
let msg = responses.get(n).cloned().unwrap_or_else(|| {
AssistantMessage::new(
vec![ContentBlock::Text {
text: "end".to_string(),
}],
StopReason::Stop,
)
});
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
})
}
fn identity_converter() -> ConvertToLlmFn {
std::sync::Arc::new(|messages: &[Value]| {
messages
.iter()
.filter(|m| {
let role = m.get("role").and_then(|r| r.as_str()).unwrap_or("");
matches!(role, "user" | "assistant" | "tool" | "toolResult")
})
.cloned()
.collect()
})
}
fn build_config() -> LoopConfig {
LoopConfig {
convert_to_llm: identity_converter(),
transform_context: None,
compaction_hooks: None,
get_api_key: None,
api_key: None,
tool_execution: ToolExecutionMode::Sequential,
before_tool_call: None,
after_tool_call: None,
prepare_next_turn: None,
should_stop_after_turn: None,
get_steering_messages: None,
get_followup_messages: None,
reasoning: None,
thinking_budgets: None,
headers: std::collections::HashMap::new(),
metadata: std::collections::HashMap::new(),
request_timeout: None,
provider_name: None,
model_name: None,
compact_model: None,
storm_mutating_tools: None,
storm_exempt_tools: None,
repair_stats: std::sync::Arc::new(
crate::agent::agent_loop::tool_input_repair::RepairStats::new(),
),
truncation_notes: std::sync::Arc::new(std::sync::Mutex::new(
std::collections::HashMap::new(),
)),
tool_def_filter: None,
dynamic_tool_search: false,
escalation_stream_fn: None,
escalation_provider_name: None,
escalation_pending: std::sync::Arc::new(std::sync::Mutex::new(None)),
escalation_max_per_session: 3,
escalation_remaining: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(3)),
file_touch_tracker: None,
verifier: None,
critic_fn: None,
goal: None,
max_turns: None,
}
}
fn empty_context() -> Context {
Context {
system_prompt: String::new(),
messages: Vec::new(),
tools: Vec::new(),
}
}
#[tokio::test]
async fn run_compaction_pass_inserts_summary_and_rotates_session() {
let mut ctx = empty_context();
ctx.system_prompt = "you are an agent".into();
ctx.messages.push(serde_json::json!({
"role": "system", "content": "you are an agent"
}));
ctx.messages.push(serde_json::json!({
"role": "user", "content": "initial task: fix the bug"
}));
for i in 0..20 {
let role = if i % 2 == 0 { "assistant" } else { "user" };
ctx.messages.push(serde_json::json!({
"role": role,
"content": format!("turn {i} with some content to fill bytes"),
}));
}
ctx.messages.push(serde_json::json!({
"role": "user", "content": "latest user request"
}));
let n_before = ctx.messages.len();
let prompt_seen = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let prompt_seen_inner = prompt_seen.clone();
let summarize_fn: Option<crate::agent::compression::SummarizeFn> =
Some(std::sync::Arc::new(move |prompt: String| {
let store = prompt_seen_inner.clone();
Box::pin(async move {
*store.lock().unwrap() = prompt;
Ok("## Active Task\nfix the bug\n\n\
## Goal\nresolve the issue\n\n\
## Completed Actions\n1. read the file\n\n\
## Remaining Work\nrun tests"
.to_string())
})
}));
let (tx, mut rx) = mpsc::channel::<LoopEvent>(8);
super::run_compaction_pass(
&mut ctx,
&summarize_fn,
5,
0,
&None,
None,
&tx,
&empty_checkpoint_slot(),
&mut 0,
u64::MAX,
)
.await;
drop(tx);
assert!(
ctx.messages.len() < n_before,
"expected compaction to shrink the message list: before={n_before} after={}",
ctx.messages.len()
);
let summary_msg = ctx
.messages
.iter()
.find(|m| {
m.get("role").and_then(|v| v.as_str()) == Some("system")
&& m.get("content")
.and_then(|v| v.as_str())
.map(|s| s.contains("CONTEXT COMPACTION"))
.unwrap_or(false)
})
.expect("compaction summary message should be present");
let body = summary_msg["content"].as_str().unwrap();
assert!(body.contains("## Active Task"));
assert!(body.contains("fix the bug"));
let last = ctx.messages.last().unwrap();
assert_eq!(last["content"].as_str().unwrap(), "latest user request");
let mut compacted_event_seen = false;
while let Some(ev) = rx.recv().await {
if let LoopEvent::ContextCompacted { new_session_id, .. } = ev {
assert!(
new_session_id.starts_with("compacted-"),
"session id should rotate via compacted- prefix; got {new_session_id}"
);
compacted_event_seen = true;
}
}
assert!(compacted_event_seen, "expected ContextCompacted event");
let received = prompt_seen.lock().unwrap().clone();
assert!(received.contains("TURNS TO SUMMARIZE"));
assert!(received.contains("## Active Task"));
}
fn padded_ctx(n: usize) -> super::Context {
let mut ctx = empty_context();
ctx.messages
.push(serde_json::json!({"role": "system", "content": "you are an agent"}));
ctx.messages
.push(serde_json::json!({"role": "user", "content": "initial task"}));
for i in 0..n {
let role = if i % 2 == 0 { "assistant" } else { "user" };
ctx.messages.push(serde_json::json!({
"role": role,
"content": format!("turn {i} with some content to fill bytes"),
}));
}
ctx.messages
.push(serde_json::json!({"role": "user", "content": "latest user request"}));
ctx
}
fn recording_summarizer(
called: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Option<crate::agent::compression::SummarizeFn> {
Some(std::sync::Arc::new(move |_prompt: String| {
let called = called.clone();
Box::pin(async move {
called.store(true, std::sync::atomic::Ordering::SeqCst);
Ok("## Active Task\nINLINE SUMMARY\n## Remaining Work\nx".to_string())
})
}))
}
fn slot_with(summary: &str, boundary: usize, generation: u64) -> super::CheckpointSlot {
std::sync::Arc::new(std::sync::Mutex::new(Some(super::CachedCheckpoint {
summary: summary.to_string(),
boundary,
generation,
})))
}
#[tokio::test]
async fn run_compaction_pass_reuses_fresh_checkpoint_without_calling_summarizer() {
use std::sync::atomic::{AtomicBool, Ordering};
let mut ctx = padded_ctx(20);
let called = std::sync::Arc::new(AtomicBool::new(false));
let summarize_fn = recording_summarizer(called.clone());
let slot = slot_with(
"## Active Task\nFROM CHECKPOINT\n## Remaining Work\nfinish",
10,
0,
);
let mut generation = 0u64;
let (tx, _rx) = mpsc::channel::<LoopEvent>(16);
let outcome = super::run_compaction_pass(
&mut ctx,
&summarize_fn,
5,
0,
&None,
None,
&tx,
&slot,
&mut generation,
u64::MAX,
)
.await;
drop(tx);
assert!(
matches!(outcome, super::SummaryOutcome::Succeeded(_)),
"reuse should succeed"
);
assert!(
!called.load(Ordering::SeqCst),
"inline summarizer must NOT be called on the fast path"
);
let summary_msg = ctx
.messages
.iter()
.find_map(|m| {
let c = m.get("content").and_then(|v| v.as_str())?;
c.contains("CONTEXT COMPACTION").then_some(c)
})
.expect("a summary message should be present");
assert!(
summary_msg.contains("FROM CHECKPOINT"),
"the spliced summary should be the checkpoint's, not the inline one"
);
assert_eq!(generation, 1, "a successful fold bumps the epoch");
assert!(
slot.lock().unwrap().is_none(),
"the consumed checkpoint slot is cleared after the fold"
);
}
#[tokio::test]
async fn run_compaction_pass_ignores_stale_generation_checkpoint() {
use std::sync::atomic::{AtomicBool, Ordering};
let mut ctx = padded_ctx(20);
let called = std::sync::Arc::new(AtomicBool::new(false));
let summarize_fn = recording_summarizer(called.clone());
let slot = slot_with(
"## Active Task\nFROM CHECKPOINT\n## Remaining Work\nx",
10,
0,
);
let mut generation = 7u64;
let (tx, _rx) = mpsc::channel::<LoopEvent>(16);
let outcome = super::run_compaction_pass(
&mut ctx,
&summarize_fn,
5,
0,
&None,
None,
&tx,
&slot,
&mut generation,
u64::MAX,
)
.await;
drop(tx);
assert!(matches!(outcome, super::SummaryOutcome::Succeeded(_)));
assert!(
called.load(Ordering::SeqCst),
"stale checkpoint → inline summarizer must run"
);
let summary_msg = ctx
.messages
.iter()
.find_map(|m| {
let c = m.get("content").and_then(|v| v.as_str())?;
c.contains("CONTEXT COMPACTION").then_some(c)
})
.expect("a summary message should be present");
assert!(
summary_msg.contains("INLINE SUMMARY"),
"the inline summary should be used when the checkpoint is stale"
);
}
static DIRTY_FLAG_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test]
fn memories_dirty_flag_is_consumed_once() {
use crate::agent::agent_loop::context_manager::{mark_memories_dirty, take_memories_dirty};
let _guard = DIRTY_FLAG_TEST_LOCK.lock().unwrap();
let _ = take_memories_dirty();
mark_memories_dirty();
assert!(take_memories_dirty(), "first take after mark is true");
assert!(!take_memories_dirty(), "second take resets to false");
}
#[tokio::test]
#[allow(clippy::await_holding_lock)] async fn memory_refresh_injects_block_at_turn_boundary_when_dirty() {
use crate::extras::memory_provider::MemoryProvider;
let _guard = DIRTY_FLAG_TEST_LOCK.lock().unwrap();
struct StubProvider;
impl MemoryProvider for StubProvider {
fn name(&self) -> &str {
"stub"
}
fn format_for_system_prompt(&self) -> String {
"STUBMEM: prefer the fast path".to_string()
}
fn view(&self, _t: &str) -> Value {
serde_json::json!({})
}
fn add(&self, _: &str, _: &str, _: Option<&str>) -> Result<Value, String> {
Ok(serde_json::json!({}))
}
fn replace(&self, _: &str, _: &str, _: &str, _: Option<&str>) -> Result<Value, String> {
Ok(serde_json::json!({}))
}
fn remove(&self, _: &str, _: &str) -> Result<Value, String> {
Ok(serde_json::json!({}))
}
}
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.tools.push(echo.clone());
let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
let factory = capturing_factory(
vec![
tool_use_response("call-1", "echo", serde_json::json!({"v": 1})),
text_response("done"),
],
seen.clone(),
);
let provider: std::sync::Arc<dyn MemoryProvider> = std::sync::Arc::new(StubProvider);
let mut config = build_config();
config.convert_to_llm = std::sync::Arc::new(|messages: &[Value]| {
messages
.iter()
.filter(|m| {
let role = m.get("role").and_then(|r| r.as_str()).unwrap_or("");
matches!(
role,
"user" | "assistant" | "tool" | "toolResult" | "system"
)
})
.cloned()
.collect()
});
crate::agent::agent_loop::context_manager::mark_memories_dirty();
let (tx, _rx) = mpsc::channel::<LoopEvent>(128);
let _ = run_agent_loop(
vec![user("echo please")],
ctx,
config,
AbortSignal::new(),
&tx,
&factory,
None,
Some(provider),
)
.await;
drop(tx);
let snapshots = seen.lock().unwrap().clone();
assert!(
snapshots.iter().any(|s| s.contains("STUBMEM")),
"the refreshed memory block should appear in the model-facing context \
after the turn boundary; snapshots={snapshots:?}"
);
}
#[tokio::test]
async fn compaction_on_compact_hook_overrides_llm_summary() {
use crate::agent::agent_loop::types::CompactionHooks;
use std::sync::atomic::{AtomicUsize, Ordering};
let mut ctx = empty_context();
ctx.messages
.push(serde_json::json!({"role": "system", "content": "sys"}));
ctx.messages
.push(serde_json::json!({"role": "user", "content": "initial"}));
for i in 0..20 {
let role = if i % 2 == 0 { "assistant" } else { "user" };
ctx.messages
.push(serde_json::json!({"role": role, "content": format!("turn {i} content")}));
}
ctx.messages
.push(serde_json::json!({"role": "user", "content": "latest"}));
let llm_called = std::sync::Arc::new(AtomicUsize::new(0));
let llm_called_c = llm_called.clone();
let summarize_fn: Option<crate::agent::compression::SummarizeFn> =
Some(std::sync::Arc::new(move |_prompt: String| {
llm_called_c.fetch_add(1, Ordering::SeqCst);
Box::pin(async move { Ok("## Active Task\nLLM-SUMMARY".to_string()) })
}));
let before_fired = std::sync::Arc::new(AtomicUsize::new(0));
let before_c = before_fired.clone();
let hooks = CompactionHooks {
on_before: std::sync::Arc::new(move |_count, _tokens| {
let f = before_c.clone();
Box::pin(async move {
f.fetch_add(1, Ordering::SeqCst);
})
}),
on_compact: std::sync::Arc::new(move |_middle| {
Box::pin(async move { Some("## Active Task\nPLUGIN-SUMMARY".to_string()) })
}),
};
let (tx, _rx) = mpsc::channel::<LoopEvent>(8);
super::run_compaction_pass(
&mut ctx,
&summarize_fn,
5,
0,
&None,
Some(&hooks),
&tx,
&empty_checkpoint_slot(),
&mut 0,
u64::MAX,
)
.await;
drop(tx);
assert_eq!(
before_fired.load(Ordering::SeqCst),
1,
"on-before-compact must fire"
);
let summary_msg = ctx
.messages
.iter()
.find(|m| {
m.get("content")
.and_then(|v| v.as_str())
.map(|s| s.contains("PLUGIN-SUMMARY"))
.unwrap_or(false)
})
.expect("plugin summary must be in the compacted context");
assert!(
summary_msg["content"]
.as_str()
.unwrap()
.contains("PLUGIN-SUMMARY")
);
assert!(
!ctx.messages.iter().any(|m| m
.get("content")
.and_then(|v| v.as_str())
.map(|s| s.contains("LLM-SUMMARY"))
.unwrap_or(false)),
"LLM summary must NOT appear — plugin override should win",
);
assert_eq!(
llm_called.load(Ordering::SeqCst),
0,
"LLM summarizer must NOT be called when the plugin supplies a valid summary",
);
}
#[tokio::test]
async fn compaction_invalid_plugin_summary_falls_through_to_llm() {
use crate::agent::agent_loop::types::CompactionHooks;
use std::sync::atomic::{AtomicUsize, Ordering};
let mut ctx = empty_context();
ctx.messages
.push(serde_json::json!({"role": "system", "content": "sys"}));
ctx.messages
.push(serde_json::json!({"role": "user", "content": "initial"}));
for i in 0..20 {
let role = if i % 2 == 0 { "assistant" } else { "user" };
ctx.messages
.push(serde_json::json!({"role": role, "content": format!("turn {i} content")}));
}
ctx.messages
.push(serde_json::json!({"role": "user", "content": "latest"}));
let llm_called = std::sync::Arc::new(AtomicUsize::new(0));
let llm_called_c = llm_called.clone();
let summarize_fn: Option<crate::agent::compression::SummarizeFn> =
Some(std::sync::Arc::new(move |_prompt: String| {
llm_called_c.fetch_add(1, Ordering::SeqCst);
Box::pin(async move { Ok("## Active Task\nLLM-SUMMARY".to_string()) })
}));
let hooks = CompactionHooks {
on_before: std::sync::Arc::new(|_c, _t| Box::pin(async {})),
on_compact: std::sync::Arc::new(move |_middle| {
Box::pin(async move { Some("garbage with no section header".to_string()) })
}),
};
let (tx, _rx) = mpsc::channel::<LoopEvent>(8);
super::run_compaction_pass(
&mut ctx,
&summarize_fn,
5,
0,
&None,
Some(&hooks),
&tx,
&empty_checkpoint_slot(),
&mut 0,
u64::MAX,
)
.await;
drop(tx);
assert_eq!(
llm_called.load(Ordering::SeqCst),
1,
"invalid plugin summary must fall through to the LLM summarizer",
);
assert!(
ctx.messages.iter().any(|m| m
.get("content")
.and_then(|v| v.as_str())
.map(|s| s.contains("LLM-SUMMARY"))
.unwrap_or(false)),
"LLM summary should be applied after the invalid plugin summary",
);
}
#[tokio::test]
async fn run_compaction_pass_without_summarizer_prunes_only() {
let mut ctx = empty_context();
ctx.messages.push(serde_json::json!({
"role": "user", "content": "first"
}));
ctx.messages.push(serde_json::json!({
"role": "toolResult", "content": "x".repeat(2000), "toolName": "bash"
}));
ctx.messages.push(serde_json::json!({
"role": "user", "content": "tail"
}));
ctx.messages.push(serde_json::json!({
"role": "assistant", "content": "tail asst"
}));
let (tx, mut rx) = mpsc::channel::<LoopEvent>(4);
super::run_compaction_pass(
&mut ctx,
&None,
2,
0,
&None,
None,
&tx,
&empty_checkpoint_slot(),
&mut 0,
u64::MAX,
)
.await;
drop(tx);
let has_summary = ctx.messages.iter().any(|m| {
m.get("content")
.and_then(|v| v.as_str())
.map(|s| s.contains("CONTEXT COMPACTION"))
.unwrap_or(false)
});
assert!(
!has_summary,
"no summary should be inserted without summarize_fn"
);
let tool_msg = &ctx.messages[1];
assert!(tool_msg["content"].as_str().unwrap().contains("[bash]"));
let mut compacted_event_seen = false;
while let Some(ev) = rx.recv().await {
if matches!(ev, LoopEvent::ContextCompacted { .. }) {
compacted_event_seen = true;
}
}
assert!(compacted_event_seen);
}
#[derive(Debug)]
struct EchoTool {
terminate: bool,
executed: std::sync::Arc<Mutex<Vec<Value>>>,
}
impl EchoTool {
fn new() -> Self {
Self {
terminate: false,
executed: std::sync::Arc::new(Mutex::new(Vec::new())),
}
}
fn with_terminate(mut self) -> Self {
self.terminate = true;
self
}
}
impl LoopTool for EchoTool {
fn name(&self) -> &str {
"echo"
}
fn description(&self) -> &str {
"Echo tool"
}
fn label(&self) -> &str {
"Echo"
}
fn parameters(&self) -> &Value {
static EMPTY: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
EMPTY.get_or_init(|| serde_json::json!({"type": "object"}))
}
fn execute<'a>(
&'a self,
_id: &'a str,
args: Value,
_signal: AbortSignal,
_on_update: LoopToolUpdate,
) -> Pin<Box<dyn Future<Output = Result<super::super::LoopToolResult, String>> + Send + 'a>>
{
let executed = self.executed.clone();
let terminate = self.terminate;
Box::pin(async move {
executed.lock().unwrap().push(args.clone());
Ok(super::super::LoopToolResult {
content: vec![serde_json::json!({"type": "text", "text": "ok"})],
details: args,
terminate: if terminate { Some(true) } else { None },
})
})
}
}
fn user(text: &str) -> LoopMessage {
LoopMessage::User(UserMessage {
content: text.to_string(),
})
}
fn text_response(text: &str) -> AssistantMessage {
AssistantMessage::new(
vec![ContentBlock::Text {
text: text.to_string(),
}],
StopReason::Stop,
)
}
fn tool_use_response(id: &str, name: &str, args: Value) -> AssistantMessage {
AssistantMessage::new(
vec![ContentBlock::ToolCall {
id: id.to_string(),
name: name.to_string(),
arguments: args,
}],
StopReason::ToolUse,
)
}
async fn drain(rx: &mut mpsc::Receiver<LoopEvent>) -> Vec<LoopEvent> {
let mut out = Vec::new();
while let Some(e) = rx.recv().await {
out.push(e);
}
out
}
#[tokio::test]
async fn test_emits_full_agent_loop_event_sequence() {
let factory = canned_factory(vec![text_response("Hi there!")]);
let (tx, mut rx) = mpsc::channel::<LoopEvent>(64);
let messages = run_agent_loop(
vec![user("Hello")],
empty_context(),
build_config(),
AbortSignal::new(),
&tx,
&factory,
None,
None, )
.await;
drop(tx);
let kinds: Vec<_> = drain(&mut rx).await.iter().map(|e| e.kind()).collect();
for required in [
"agent_start",
"turn_start",
"message_start",
"message_end",
"turn_end",
"agent_end",
] {
assert!(kinds.contains(&required), "missing {required}: {kinds:?}");
}
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].role(), "user");
assert_eq!(messages[1].role(), "assistant");
}
#[tokio::test]
async fn test_full_loop_with_tool_then_final_text() {
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.tools.push(echo.clone());
let factory = canned_factory(vec![
tool_use_response("call-1", "echo", serde_json::json!({"v": 1})),
text_response("done"),
]);
let (tx, mut rx) = mpsc::channel::<LoopEvent>(128);
let messages = run_agent_loop(
vec![user("echo")],
ctx,
build_config(),
AbortSignal::new(),
&tx,
&factory,
None,
None, )
.await;
drop(tx);
assert_eq!(echo.executed.lock().unwrap().len(), 1);
let roles: Vec<_> = messages.iter().map(|m| m.role()).collect();
assert_eq!(roles, vec!["user", "assistant", "toolResult", "assistant"]);
let kinds: Vec<_> = drain(&mut rx).await.iter().map(|e| e.kind()).collect();
assert!(kinds.contains(&"tool_execution_start"));
assert!(kinds.contains(&"tool_execution_end"));
}
#[tokio::test]
async fn test_prepare_next_turn_snapshot_applied() {
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.system_prompt = "first prompt".to_string();
ctx.tools.push(echo.clone());
let observed_prompts = std::sync::Arc::new(Mutex::new(Vec::<String>::new()));
let observed_clone = observed_prompts.clone();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let factory: StreamFn = std::sync::Arc::new(move |llm_ctx, _opts| {
observed_clone.lock().unwrap().push(llm_ctx.system_prompt);
let n = counter.fetch_add(1, Ordering::SeqCst);
let msg = if n == 0 {
tool_use_response("call-1", "echo", serde_json::json!({"v": 1}))
} else {
text_response("done")
};
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
});
let fired = std::sync::Arc::new(AtomicUsize::new(0));
let fired_clone = fired.clone();
let hook: PrepareNextTurnFn = std::sync::Arc::new(move |ctx| {
let fired = fired_clone.clone();
Box::pin(async move {
if fired.fetch_add(1, Ordering::SeqCst) > 0 {
return None; }
Some(TurnUpdate {
context: Some(Context {
system_prompt: "second prompt".to_string(),
messages: ctx.context.messages.clone(),
tools: ctx.context.tools.clone(),
}),
..Default::default()
})
})
});
let mut config = build_config();
config.prepare_next_turn = Some(hook);
let (tx, _rx) = mpsc::channel::<LoopEvent>(128);
let _ = run_agent_loop(
vec![user("echo something")],
ctx,
config,
AbortSignal::new(),
&tx,
&factory,
None,
None, )
.await;
let observed = observed_prompts.lock().unwrap().clone();
assert_eq!(observed.len(), 2, "expected 2 LLM calls");
assert_eq!(observed[0], "first prompt");
assert_eq!(
observed[1], "second prompt",
"second LLM call should see the mutated context"
);
}
#[tokio::test]
async fn prepare_next_turn_applies_thinking_level_to_next_turn() {
use crate::agent::agent_loop::types::ThinkingLevel;
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.tools.push(echo.clone());
let observed_reasoning = std::sync::Arc::new(Mutex::new(Vec::<Option<ThinkingLevel>>::new()));
let observed_clone = observed_reasoning.clone();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let factory: StreamFn = std::sync::Arc::new(move |_llm_ctx, opts| {
observed_clone.lock().unwrap().push(opts.reasoning);
let n = counter.fetch_add(1, Ordering::SeqCst);
let msg = if n == 0 {
tool_use_response("call-1", "echo", serde_json::json!({"v": 1}))
} else {
text_response("done")
};
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
});
let fired = std::sync::Arc::new(AtomicUsize::new(0));
let fired_clone = fired.clone();
let hook: PrepareNextTurnFn = std::sync::Arc::new(move |_ctx| {
let fired = fired_clone.clone();
Box::pin(async move {
if fired.fetch_add(1, Ordering::SeqCst) > 0 {
return None;
}
Some(TurnUpdate {
thinking_level: Some(ThinkingLevel::High),
..Default::default()
})
})
});
let mut config = build_config();
config.prepare_next_turn = Some(hook);
config.reasoning = None;
let (tx, _rx) = mpsc::channel::<LoopEvent>(128);
let _ = run_agent_loop(
vec![user("go")],
ctx,
config,
AbortSignal::new(),
&tx,
&factory,
None,
None,
)
.await;
let observed = observed_reasoning.lock().unwrap().clone();
assert_eq!(observed.len(), 2, "expected 2 LLM calls");
assert_eq!(
observed[0], None,
"turn 1 runs with the initial reasoning (none)"
);
assert_eq!(
observed[1],
Some(ThinkingLevel::High),
"turn 2 must see the thinking_level prepareNextTurn requested — \
pre-fix this was dropped and turn 2 saw None",
);
}
#[tokio::test]
async fn test_should_stop_after_turn_stops_loop() {
let factory = canned_factory(vec![
text_response("turn one"),
text_response("should not appear"),
]);
let llm_calls = std::sync::Arc::new(AtomicUsize::new(0));
let llm_calls_clone = llm_calls.clone();
let factory_counted: StreamFn = std::sync::Arc::new(move |ctx, opts| {
llm_calls_clone.fetch_add(1, Ordering::SeqCst);
factory(ctx, opts)
});
let hook: ShouldStopAfterTurnFn = std::sync::Arc::new(|_ctx| Box::pin(async move { true }));
let mut config = build_config();
config.should_stop_after_turn = Some(hook);
let (tx, mut rx) = mpsc::channel::<LoopEvent>(64);
let messages = run_agent_loop(
vec![user("hi")],
empty_context(),
config,
AbortSignal::new(),
&tx,
&factory_counted,
None,
None, )
.await;
drop(tx);
assert_eq!(llm_calls.load(Ordering::SeqCst), 1);
assert_eq!(messages.len(), 2);
let kinds: Vec<_> = drain(&mut rx).await.iter().map(|e| e.kind()).collect();
assert!(kinds.contains(&"agent_end"));
}
#[tokio::test]
async fn test_terminate_stops_loop_after_tool_batch() {
let echo = std::sync::Arc::new(EchoTool::new().with_terminate());
let mut ctx = empty_context();
ctx.tools.push(echo);
let llm_calls = std::sync::Arc::new(AtomicUsize::new(0));
let llm_calls_clone = llm_calls.clone();
let factory: StreamFn = std::sync::Arc::new(move |_ctx, _opts| {
llm_calls_clone.fetch_add(1, Ordering::SeqCst);
let msg = tool_use_response("call-1", "echo", serde_json::json!({"v": 1}));
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason: StopReason::ToolUse,
message: msg,
usage: None,
}]))
});
let (tx, _rx) = mpsc::channel::<LoopEvent>(64);
let messages = run_agent_loop(
vec![user("echo")],
ctx,
build_config(),
AbortSignal::new(),
&tx,
&factory,
None,
None, )
.await;
assert_eq!(llm_calls.load(Ordering::SeqCst), 1, "no second LLM call");
let roles: Vec<_> = messages.iter().map(|m| m.role()).collect();
assert_eq!(roles, vec!["user", "assistant", "toolResult"]);
}
#[tokio::test]
async fn test_after_tool_call_terminate_stops_loop() {
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.tools.push(echo);
let llm_calls = std::sync::Arc::new(AtomicUsize::new(0));
let llm_calls_clone = llm_calls.clone();
let factory: StreamFn = std::sync::Arc::new(move |_ctx, _opts| {
llm_calls_clone.fetch_add(1, Ordering::SeqCst);
let msg = tool_use_response("call-1", "echo", serde_json::json!({"v": 1}));
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason: StopReason::ToolUse,
message: msg,
usage: None,
}]))
});
let after: AfterToolCallFn = std::sync::Arc::new(|_ctx: AfterToolCallContext| {
Box::pin(async move {
Some(AfterToolCallResult {
content: None,
details: None,
is_error: None,
terminate: Some(true),
})
})
});
let mut config = build_config();
config.after_tool_call = Some(after);
let (tx, _rx) = mpsc::channel::<LoopEvent>(64);
let _ = run_agent_loop(
vec![user("echo")],
ctx,
config,
AbortSignal::new(),
&tx,
&factory,
None,
None, )
.await;
assert_eq!(llm_calls.load(Ordering::SeqCst), 1, "no second LLM call");
}
#[tokio::test]
async fn test_continue_when_not_all_terminate() {
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.tools.push(echo);
let llm_calls = std::sync::Arc::new(AtomicUsize::new(0));
let llm_calls_clone = llm_calls.clone();
let factory: StreamFn = std::sync::Arc::new(move |_ctx, _opts| {
let n = llm_calls_clone.fetch_add(1, Ordering::SeqCst);
let msg = if n == 0 {
tool_use_response("call-1", "echo", serde_json::json!({"v": 1}))
} else {
text_response("done")
};
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
});
let (tx, _rx) = mpsc::channel::<LoopEvent>(64);
let _ = run_agent_loop(
vec![user("echo")],
ctx,
build_config(),
AbortSignal::new(),
&tx,
&factory,
None,
None, )
.await;
assert_eq!(
llm_calls.load(Ordering::SeqCst),
2,
"two LLM calls expected"
);
}
#[tokio::test]
async fn test_steering_messages_injected_after_tool_calls() {
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.tools.push(echo);
let poll_count = std::sync::Arc::new(AtomicUsize::new(0));
let poll_clone = poll_count.clone();
let steering: GetSteeringMessagesFn = std::sync::Arc::new(move || {
let poll = poll_clone.clone();
Box::pin(async move {
let n = poll.fetch_add(1, Ordering::SeqCst);
if n == 1 {
vec![user("interrupt")]
} else {
Vec::new()
}
})
});
let saw_interrupt_on_second = std::sync::Arc::new(std::sync::Mutex::new(false));
let saw_clone = saw_interrupt_on_second.clone();
let call_counter = std::sync::Arc::new(AtomicUsize::new(0));
let factory: StreamFn = std::sync::Arc::new(move |llm_ctx, _opts| {
let n = call_counter.fetch_add(1, Ordering::SeqCst);
if n == 1 {
let found = llm_ctx.messages.iter().any(|m| {
m.get("role").and_then(|r| r.as_str()) == Some("user")
&& m.get("content")
.and_then(|c| c.as_str())
.map(|s| s.contains("interrupt"))
== Some(true)
});
*saw_clone.lock().unwrap() = found;
}
let msg = if n == 0 {
tool_use_response("call-1", "echo", serde_json::json!({"v": 1}))
} else {
text_response("done")
};
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![StreamEvent::Done {
reason,
message: msg,
usage: None,
}]))
});
let mut config = build_config();
config.get_steering_messages = Some(steering);
let (tx, mut rx) = mpsc::channel::<LoopEvent>(128);
let messages = run_agent_loop(
vec![user("start")],
ctx,
config,
AbortSignal::new(),
&tx,
&factory,
None,
None, )
.await;
drop(tx);
assert!(
*saw_interrupt_on_second.lock().unwrap(),
"second LLM call should see the injected interrupt"
);
let user_contents: Vec<String> = messages
.iter()
.filter_map(|m| match m {
LoopMessage::User(u) => Some(u.content.clone()),
_ => None,
})
.collect();
assert_eq!(user_contents, vec!["start", "interrupt"]);
let events = drain(&mut rx).await;
let interrupt_idx = events.iter().position(|e| match e {
LoopEvent::MessageStart {
message: LoopMessage::User(u),
} => u.content == "interrupt",
_ => false,
});
let last_tool_result_end_idx = events.iter().rposition(|e| {
matches!(
e,
LoopEvent::MessageEnd {
message: LoopMessage::ToolResult(_)
}
)
});
assert!(
interrupt_idx.unwrap() > last_tool_result_end_idx.unwrap(),
"interrupt should appear AFTER the tool result message_end"
);
}
use crate::agent::agent_loop::result::LoopToolResult as PhaseSixToolResult;
use std::sync::Arc as PhaseSixArc;
#[tokio::test]
async fn loop_preserves_history_across_turns() {
use crate::agent::agent_loop::stream::{LlmContext, StreamFn};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
let observed_lens: PhaseSixArc<Mutex<Vec<usize>>> = PhaseSixArc::new(Mutex::new(Vec::new()));
let observed_clone = observed_lens.clone();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
#[derive(Debug)]
struct LocalEcho;
impl LoopTool for LocalEcho {
fn name(&self) -> &str {
"echo"
}
fn description(&self) -> &str {
"Echo"
}
fn label(&self) -> &str {
"Echo"
}
fn parameters(&self) -> &Value {
static EMPTY: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
EMPTY.get_or_init(|| serde_json::json!({"type": "object"}))
}
fn execute<'a>(
&'a self,
_id: &'a str,
_args: Value,
_signal: AbortSignal,
_on_update: super::super::tool::LoopToolUpdate,
) -> Pin<Box<dyn Future<Output = Result<PhaseSixToolResult, String>> + Send + 'a>> {
Box::pin(async move {
Ok(PhaseSixToolResult {
content: vec![serde_json::json!({
"type": "text",
"text": "ok",
})],
details: Value::Null,
terminate: None,
})
})
}
}
let factory: StreamFn = std::sync::Arc::new(move |ctx: LlmContext, _opts| {
observed_clone.lock().unwrap().push(ctx.messages.len());
let n = counter.fetch_add(1, Ordering::SeqCst);
let msg = if n == 0 {
tool_use_response("call-1", "echo", serde_json::json!({}))
} else {
text_response("done")
};
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![
crate::agent::agent_loop::message::StreamEvent::Done {
reason,
message: msg,
usage: None,
},
]))
});
let mut ctx = empty_context();
ctx.tools.push(PhaseSixArc::new(LocalEcho));
let mut cfg = build_config();
cfg.tool_execution = ToolExecutionMode::Sequential;
let (tx, _rx) = mpsc::channel::<LoopEvent>(64);
let _ = run_agent_loop(
vec![user("start")],
ctx,
cfg,
AbortSignal::new(),
&tx,
&factory,
None,
None, )
.await;
let lens = observed_lens.lock().unwrap().clone();
assert_eq!(lens.len(), 2, "expected two LLM calls");
assert_eq!(lens[0], 1);
assert_eq!(
lens[1], 3,
"second LLM call should see prior turn's history; got {} messages",
lens[1],
);
}
#[tokio::test]
async fn full_signal_chain_exits_cleanly() {
use crate::agent::agent_loop::stream::{LlmContext, StreamFn};
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct CancellableTool;
impl LoopTool for CancellableTool {
fn name(&self) -> &str {
"noop"
}
fn description(&self) -> &str {
"Cancellable"
}
fn label(&self) -> &str {
"Noop"
}
fn parameters(&self) -> &Value {
static EMPTY: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
EMPTY.get_or_init(|| serde_json::json!({"type": "object"}))
}
fn execute<'a>(
&'a self,
_id: &'a str,
_args: Value,
_signal: AbortSignal,
_on_update: super::super::tool::LoopToolUpdate,
) -> Pin<Box<dyn Future<Output = Result<PhaseSixToolResult, String>> + Send + 'a>> {
Box::pin(async move {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
Ok(PhaseSixToolResult {
content: Vec::new(),
details: Value::Null,
terminate: None,
})
})
}
}
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let factory: StreamFn = std::sync::Arc::new(move |_ctx: LlmContext, _opts| {
let n = counter.fetch_add(1, Ordering::SeqCst);
let msg = if n == 0 {
tool_use_response("call-1", "noop", serde_json::json!({}))
} else {
text_response("should-not-reach")
};
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![
crate::agent::agent_loop::message::StreamEvent::Done {
reason,
message: msg,
usage: None,
},
]))
});
let mut ctx = empty_context();
ctx.tools.push(PhaseSixArc::new(CancellableTool));
let mut cfg = build_config();
cfg.tool_execution = ToolExecutionMode::Sequential;
let (tx, _rx) = mpsc::channel::<LoopEvent>(64);
let signal = AbortSignal::new();
let signal_clone = signal.clone();
let task = tokio::spawn(async move {
run_agent_loop(
vec![user("start")],
ctx,
cfg,
signal_clone,
&tx,
&factory,
None,
None, )
.await
});
for _ in 0..5 {
tokio::task::yield_now().await;
}
signal.cancel();
let result = tokio::time::timeout(std::time::Duration::from_secs(2), task).await;
assert!(
result.is_ok(),
"loop should exit within 2s after signal cancel"
);
}
use crate::extras::memory_provider::MemoryProvider;
use std::sync::Arc;
#[derive(Default)]
struct PreCompressRecorder {
seen: Mutex<Vec<String>>,
return_value: Mutex<String>,
}
impl MemoryProvider for PreCompressRecorder {
fn name(&self) -> &str {
"pre-compress-recorder"
}
fn view(&self, _: &str) -> serde_json::Value {
serde_json::Value::Null
}
fn add(&self, _: &str, _: &str, _kind: Option<&str>) -> Result<serde_json::Value, String> {
Ok(serde_json::Value::Null)
}
fn replace(
&self,
_: &str,
_: &str,
_: &str,
_kind: Option<&str>,
) -> Result<serde_json::Value, String> {
Ok(serde_json::Value::Null)
}
fn remove(&self, _: &str, _: &str) -> Result<serde_json::Value, String> {
Ok(serde_json::Value::Null)
}
fn on_pre_compress(&self, transcript: &str) -> String {
self.seen.lock().unwrap().push(transcript.to_string());
self.return_value.lock().unwrap().clone()
}
}
fn make_middle() -> Vec<serde_json::Value> {
vec![
serde_json::json!({"role": "user", "content": "what is rust?"}),
serde_json::json!({"role": "assistant", "content": "a systems language"}),
]
}
#[test]
fn build_augmented_focus_returns_none_with_no_inputs() {
let result = super::build_augmented_focus(None, None, &make_middle());
assert!(
result.is_none(),
"no focus + no provider must yield None instructions"
);
}
#[test]
fn build_augmented_focus_preserves_focus_when_no_provider() {
let result = super::build_augmented_focus(Some("error handling"), None, &make_middle());
assert_eq!(result.as_deref(), Some("error handling"));
}
#[test]
fn build_augmented_focus_folds_provider_insights_into_focus() {
let provider = Arc::new(PreCompressRecorder::default());
*provider.return_value.lock().unwrap() = "user prefers async/await over threads".into();
let provider_dyn: Arc<dyn MemoryProvider> = provider.clone();
let result =
super::build_augmented_focus(Some("retry logic"), Some(&provider_dyn), &make_middle());
let out = result.expect("focus + insights produces Some");
assert!(out.contains("retry logic"), "user focus must survive");
assert!(
out.contains("user prefers async/await over threads"),
"provider insight must be folded in: {out}"
);
assert!(
out.contains("Provider insights:"),
"insights must be labelled so the summarizer can attribute them"
);
let seen = provider.seen.lock().unwrap();
assert_eq!(seen.len(), 1, "hook fires exactly once");
assert!(
seen[0].contains("user: what is rust?")
&& seen[0].contains("assistant: a systems language"),
"transcript must contain both messages: {:?}",
seen[0]
);
}
#[test]
fn build_augmented_focus_yields_insights_alone_when_no_focus() {
let provider = Arc::new(PreCompressRecorder::default());
*provider.return_value.lock().unwrap() = "remember the build flags".into();
let provider_dyn: Arc<dyn MemoryProvider> = provider.clone();
let result = super::build_augmented_focus(None, Some(&provider_dyn), &make_middle());
let out = result.expect("insights alone produce Some");
assert!(out.starts_with("Provider insights:"));
assert!(out.contains("remember the build flags"));
}
#[test]
fn build_augmented_focus_treats_empty_provider_output_as_none() {
let provider = Arc::new(PreCompressRecorder::default());
*provider.return_value.lock().unwrap() = "".into();
let provider_dyn: Arc<dyn MemoryProvider> = provider.clone();
let result = super::build_augmented_focus(None, Some(&provider_dyn), &make_middle());
assert!(
result.is_none(),
"empty provider output + no focus must yield None"
);
assert_eq!(provider.seen.lock().unwrap().len(), 1);
}
#[test]
fn transcript_from_value_slice_renders_role_prefixes() {
let messages = vec![
serde_json::json!({"role": "user", "content": "hello"}),
serde_json::json!({"role": "assistant", "content": "hi"}),
serde_json::json!({"role": "system", "content": ""}), ];
let t = super::transcript_from_value_slice(&messages);
assert!(t.contains("user: hello"));
assert!(t.contains("assistant: hi"));
assert!(
!t.contains("system: "),
"empty content must be skipped: {t:?}"
);
}
#[test]
fn build_critic_transcript_pins_the_exact_critic_facing_format() {
use crate::agent::agent_loop::message::ToolResultMessage;
let msgs = vec![
user("do the thing"),
LoopMessage::Assistant(AssistantMessage::new(
vec![
ContentBlock::Text {
text: " on it ".to_string(),
},
ContentBlock::ToolCall {
id: "c1".to_string(),
name: "read".to_string(),
arguments: serde_json::json!({"path": "/x"}),
},
],
StopReason::Stop,
)),
LoopMessage::ToolResult(ToolResultMessage {
tool_call_id: "c1".to_string(),
tool_name: "read".to_string(),
content: vec![ContentBlock::Text {
text: "file contents".to_string(),
}],
details: serde_json::json!({}),
is_error: false,
}),
];
assert_eq!(
super::build_critic_transcript(&msgs),
"USER: do the thing\n\
ASSISTANT: on it\n\
ASSISTANT called read({\"path\":\"/x\"})\n\
TOOL read [result]: file contents\n",
);
}
#[test]
fn build_critic_transcript_keeps_request_and_recent_work_when_over_budget() {
use crate::agent::agent_loop::message::ToolResultMessage;
let mut msgs = vec![user("REQUEST: build an animated water canvas")];
for i in 0..120 {
msgs.push(LoopMessage::Assistant(AssistantMessage::new(
vec![ContentBlock::Text {
text: format!("planning step {i}: {}", "x".repeat(200)),
}],
StopReason::Stop,
)));
}
msgs.push(LoopMessage::Assistant(AssistantMessage::new(
vec![ContentBlock::Text {
text: "DONE: created water.js + flowfield.js; tests 12/12 pass".to_string(),
}],
StopReason::Stop,
)));
msgs.push(LoopMessage::ToolResult(ToolResultMessage {
tool_call_id: "v".to_string(),
tool_name: "bash".to_string(),
content: vec![ContentBlock::Text {
text: "VERIFIED: WATER RENDERED (cyan/blue flow field)".to_string(),
}],
details: serde_json::json!({}),
is_error: false,
}));
let t = super::build_critic_transcript(&msgs);
assert!(
t.contains("REQUEST: build an animated water canvas"),
"original request (head) must survive truncation"
);
assert!(
t.contains("WATER RENDERED"),
"recent verification (tail) must survive — this is what the critic judges"
);
assert!(
t.contains("tests 12/12 pass"),
"recent work (tail) must survive"
);
assert!(
t.contains("elided"),
"an elision marker should mark the dropped middle"
);
}
#[test]
fn scavenge_source_recovers_dsml_invoke_from_text_only() {
let dsml = "<|DSML|invoke name=\"read_file\"><|DSML|parameter name=\"path\" string=\"true\">/tmp/x</|DSML|parameter></|DSML|invoke>";
let blocks = vec![ContentBlock::Text {
text: dsml.to_string(),
}];
let source = super::build_scavenge_source(&blocks);
assert!(
source.contains("DSML"),
"scavenge source must include Text block content: {source:?}",
);
let allowed: std::collections::HashSet<String> =
["read_file".to_string()].into_iter().collect();
let result =
crate::agent::agent_loop::scavenge::scavenge_tool_calls(Some(&source), &allowed, 4);
assert_eq!(
result.calls.len(),
1,
"orphan DSML in Text must be recovered: calls={:?}",
result.calls
);
assert_eq!(result.calls[0].name, "read_file");
}
#[test]
fn scavenge_source_concatenates_thinking_and_text() {
let blocks = vec![
ContentBlock::Thinking {
text: "Plan: call list_dir.".to_string(),
},
ContentBlock::Text {
text: "Acting now.".to_string(),
},
];
let source = super::build_scavenge_source(&blocks);
assert_eq!(source, "Plan: call list_dir.\nActing now.");
}
#[test]
fn scavenge_source_skips_non_text_blocks() {
let blocks = vec![
ContentBlock::Text {
text: "visible".to_string(),
},
ContentBlock::ToolCall {
id: "call_1".to_string(),
name: "noop".to_string(),
arguments: serde_json::json!({}),
},
];
let source = super::build_scavenge_source(&blocks);
assert_eq!(source, "visible");
}
#[test]
fn truncation_repair_canonicalizes_divergent_streams_before_storm() {
use crate::agent::agent_loop::tool_input_repair::{RepairKind, RepairStats};
use crate::agent::agent_loop::tools::ToolCall;
let call_a_raw = r#"{"path": "/tmp/x""#; let call_b_raw = r#"{"path": "/tmp/x"}"#; assert_ne!(call_a_raw, call_b_raw);
let mut tool_calls = vec![
ToolCall {
id: "call_a".to_string(),
name: "read_file".to_string(),
arguments: serde_json::Value::String(call_a_raw.to_string()),
},
ToolCall {
id: "call_b".to_string(),
name: "read_file".to_string(),
arguments: serde_json::Value::String(call_b_raw.to_string()),
},
];
let stats = RepairStats::new();
let notes = std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::<
String,
Vec<String>,
>::new()));
super::apply_truncation_repair(&mut tool_calls, &stats, ¬es);
assert_eq!(tool_calls[0].arguments, tool_calls[1].arguments);
assert_eq!(tool_calls[0].arguments["path"], "/tmp/x");
assert!(
stats.snapshot().truncation_fixed >= 1,
"at least the truncated call must record TruncationFixed",
);
}
#[test]
fn truncation_repair_preserves_raw_on_hard_fallback() {
use crate::agent::agent_loop::tool_input_repair::RepairStats;
use crate::agent::agent_loop::tools::ToolCall;
let unsalvageable = "}}}garbage no opening".to_string();
let mut tool_calls = vec![ToolCall {
id: "call_garbage".to_string(),
name: "read_file".to_string(),
arguments: serde_json::Value::String(unsalvageable.clone()),
}];
let stats = RepairStats::new();
let notes = std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::<
String,
Vec<String>,
>::new()));
super::apply_truncation_repair(&mut tool_calls, &stats, ¬es);
if let serde_json::Value::String(after) = &tool_calls[0].arguments {
assert_eq!(
after, &unsalvageable,
"hard fallback must not mutate the raw string",
);
}
assert_ne!(
tool_calls[0].arguments,
serde_json::json!({}),
"hard fallback must not silently fabricate an empty object",
);
assert_eq!(
stats.snapshot().truncation_fixed,
1,
"fallback must still bump truncation_fixed for operator telemetry",
);
let sink = notes.lock().unwrap();
let entry = sink
.get("call_garbage")
.expect("notes must be recorded for the fallback call");
assert!(
entry.iter().any(|n| n.contains("TRUNCATION UNRECOVERABLE")),
"expected ⚠️ TRUNCATION UNRECOVERABLE prefix in notes: {entry:?}",
);
assert!(
entry.iter().any(|n| n.contains("[read_file]")),
"expected [tool_name] prefix in notes: {entry:?}",
);
}
#[tokio::test]
async fn dirge_7bwx_end_to_end_storm_dedupes_after_truncation_repair() {
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.tools.push(echo.clone());
fn truncated(raw: &str) -> serde_json::Value {
serde_json::Value::String(raw.to_string())
}
let response = AssistantMessage::new(
vec![
ContentBlock::ToolCall {
id: "tool-1".to_string(),
name: "echo".to_string(),
arguments: truncated(r#"{"v":1"#), },
ContentBlock::ToolCall {
id: "tool-2".to_string(),
name: "echo".to_string(),
arguments: truncated(r#"{"v": 1"#), },
ContentBlock::ToolCall {
id: "tool-3".to_string(),
name: "echo".to_string(),
arguments: truncated(r#"{"v": 1"#), },
],
StopReason::ToolUse,
);
let factory = canned_factory(vec![response, text_response("done")]);
let (tx, mut rx) = mpsc::channel::<LoopEvent>(128);
let config = build_config();
let repair_stats = config.repair_stats.clone();
let _messages = run_agent_loop(
vec![user("echo")],
ctx,
config,
AbortSignal::new(),
&tx,
&factory,
None,
None,
)
.await;
drop(tx);
let executed_count = echo.executed.lock().unwrap().len();
assert_eq!(
executed_count, 2,
"storm must catch the 3rd identical-post-repair call; got {executed_count} executions",
);
let snap = repair_stats.snapshot();
assert_eq!(
snap.truncation_fixed, 3,
"truncation_fixed must be incremented per truncated call; got {snap:?}",
);
let events = drain(&mut rx).await;
let execution_ends = events
.iter()
.filter(|e| e.kind() == "tool_execution_end")
.count();
assert_eq!(
execution_ends,
2,
"expected 2 tool_execution_end events; got events={:?}",
events.iter().map(|e| e.kind()).collect::<Vec<_>>(),
);
}
#[tokio::test]
async fn storm_terminal_emits_failure_narrative() {
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.tools.push(echo.clone());
let make = |i: usize| {
AssistantMessage::new(
vec![ContentBlock::ToolCall {
id: format!("call-{i}"),
name: "echo".to_string(),
arguments: serde_json::json!({"v": 1}),
}],
StopReason::ToolUse,
)
};
let factory = canned_factory((0..5).map(make).collect());
let (tx, _rx) = mpsc::channel::<LoopEvent>(128);
let config = build_config();
let messages = run_agent_loop(
vec![user("echo")],
ctx,
config,
AbortSignal::new(),
&tx,
&factory,
None,
None,
)
.await;
drop(tx);
let has_narrative = messages.iter().any(|m| match m {
LoopMessage::Assistant(a) => a.content.iter().any(|b| match b {
ContentBlock::Text { text } => text.contains("stopped here to avoid spinning"),
_ => false,
}),
_ => false,
});
assert!(
has_narrative,
"expected a storm failure-narrative assistant message; got {} messages",
messages.len()
);
}
#[tokio::test]
async fn dirge_ngic_end_to_end_orphan_dsml_in_text_dispatches() {
let echo = std::sync::Arc::new(EchoTool::new());
let mut ctx = empty_context();
ctx.tools.push(echo.clone());
let dsml = r#"<|DSML|invoke name="echo"><|DSML|parameter name="v" string="false">1</|DSML|parameter></|DSML|invoke>"#;
let response = AssistantMessage::new(
vec![ContentBlock::Text {
text: dsml.to_string(),
}],
StopReason::ToolUse,
);
let factory = canned_factory(vec![response, text_response("done")]);
let (tx, _rx) = mpsc::channel::<LoopEvent>(128);
let config = build_config();
let _messages = run_agent_loop(
vec![user("echo")],
ctx,
config,
AbortSignal::new(),
&tx,
&factory,
None,
None,
)
.await;
drop(tx);
let executed = echo.executed.lock().unwrap();
assert_eq!(
executed.len(),
1,
"orphan DSML in Text must be recovered and dispatched (post-dirge-ngic); got {} executions",
executed.len(),
);
}
#[test]
fn truncation_repair_forwards_notes_on_successful_repair() {
use crate::agent::agent_loop::tool_input_repair::RepairStats;
use crate::agent::agent_loop::tools::ToolCall;
let truncated = r#"{"path": "/tmp/x"#; let mut tool_calls = vec![ToolCall {
id: "call_ok".to_string(),
name: "read_file".to_string(),
arguments: serde_json::Value::String(truncated.to_string()),
}];
let stats = RepairStats::new();
let notes = std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::<
String,
Vec<String>,
>::new()));
super::apply_truncation_repair(&mut tool_calls, &stats, ¬es);
assert_eq!(tool_calls[0].arguments["path"], "/tmp/x");
assert_eq!(stats.snapshot().truncation_fixed, 1);
let sink = notes.lock().unwrap();
let entry = sink
.get("call_ok")
.expect("notes must be recorded for the successful repair");
assert!(entry.iter().any(|n| n.contains("[read_file]")));
assert!(
entry
.iter()
.all(|n| !n.contains("TRUNCATION UNRECOVERABLE")),
"successful repair must not carry the unrecoverable prefix: {entry:?}",
);
}
#[test]
fn truncation_repair_leaves_already_parsed_args_alone() {
use crate::agent::agent_loop::tool_input_repair::{RepairKind, RepairStats};
use crate::agent::agent_loop::tools::ToolCall;
let already_parsed = serde_json::json!({ "path": "/tmp/y" });
let mut tool_calls = vec![ToolCall {
id: "call_ok".to_string(),
name: "read_file".to_string(),
arguments: already_parsed.clone(),
}];
let stats = RepairStats::new();
let notes = std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::<
String,
Vec<String>,
>::new()));
super::apply_truncation_repair(&mut tool_calls, &stats, ¬es);
assert_eq!(tool_calls[0].arguments, already_parsed);
assert_eq!(
stats.snapshot().truncation_fixed,
0,
"no repair should be recorded for already-parsed args",
);
}
#[tokio::test]
async fn dirge_k6be_oversized_tool_result_capped_before_next_model_call() {
use crate::agent::agent_loop::stream::{LlmContext, StreamFn};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct BigOutputTool;
impl LoopTool for BigOutputTool {
fn name(&self) -> &str {
"big_read"
}
fn description(&self) -> &str {
"Big tool"
}
fn label(&self) -> &str {
"BigRead"
}
fn parameters(&self) -> &Value {
static EMPTY: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
EMPTY.get_or_init(|| serde_json::json!({"type": "object"}))
}
fn execute<'a>(
&'a self,
_id: &'a str,
_args: Value,
_signal: AbortSignal,
_on_update: super::super::tool::LoopToolUpdate,
) -> Pin<Box<dyn Future<Output = Result<super::super::LoopToolResult, String>> + Send + 'a>>
{
let huge = "x".repeat(60_000);
Box::pin(async move {
Ok(super::super::LoopToolResult {
content: vec![serde_json::json!({
"type": "text",
"text": huge,
})],
details: Value::Null,
terminate: None,
})
})
}
}
let observed_second_call_payload: std::sync::Arc<Mutex<Option<Vec<Value>>>> =
std::sync::Arc::new(Mutex::new(None));
let observed_clone = observed_second_call_payload.clone();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let factory: StreamFn = std::sync::Arc::new(move |ctx: LlmContext, _opts| {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n == 1 {
*observed_clone.lock().unwrap() = Some(ctx.messages.clone());
}
let msg = if n == 0 {
tool_use_response("call-1", "big_read", serde_json::json!({}))
} else {
text_response("done")
};
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![
crate::agent::agent_loop::message::StreamEvent::Done {
reason,
message: msg,
usage: None,
},
]))
});
let mut ctx = empty_context();
ctx.tools.push(std::sync::Arc::new(BigOutputTool));
let mut cfg = build_config();
cfg.tool_execution = ToolExecutionMode::Sequential;
let (tx, _rx) = mpsc::channel::<LoopEvent>(64);
let _ = run_agent_loop(
vec![user("start")],
ctx,
cfg,
AbortSignal::new(),
&tx,
&factory,
None,
None,
)
.await;
let observed = observed_second_call_payload.lock().unwrap();
let messages = observed
.as_ref()
.expect("second model call must have happened");
let tool_result = messages
.iter()
.find(|m| {
m.get("role").and_then(|v| v.as_str()) == Some("toolResult")
|| m.get("role").and_then(|v| v.as_str()) == Some("tool")
})
.expect("second call must include the tool result");
let blocks = tool_result["content"]
.as_array()
.expect("tool result content should be an array of blocks");
let total_text_len: usize = blocks
.iter()
.filter_map(|b| b.get("text").and_then(|t| t.as_str()))
.map(|t| t.len())
.sum();
assert!(
total_text_len < 60_000,
"tool result must be capped before the second model call; got {total_text_len} chars",
);
assert!(
total_text_len < 14_000,
"capped result must be near the ~12 KB cap; got {total_text_len} chars",
);
let combined: String = blocks
.iter()
.filter_map(|b| b.get("text").and_then(|t| t.as_str()))
.collect();
assert!(
combined.contains("truncated"),
"capped result must carry the truncation marker",
);
}
#[tokio::test]
async fn dirge_el3n_proactive_fold_fires_when_threshold_crossed_at_turn_start() {
use crate::agent::agent_loop::stream::{LlmContext, StreamFn};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
let huge_text = "x".repeat(500_000);
let preloaded = vec![serde_json::json!({
"role": "toolResult",
"content": [{"type": "text", "text": huge_text}],
"toolName": "read",
})];
let observed_second_call_total_chars: std::sync::Arc<Mutex<Option<usize>>> =
std::sync::Arc::new(Mutex::new(None));
let observed_clone = observed_second_call_total_chars.clone();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let factory: StreamFn = std::sync::Arc::new(move |ctx: LlmContext, _opts| {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n == 0 {
let total: usize = ctx
.messages
.iter()
.map(|m| match m.get("content") {
Some(serde_json::Value::String(s)) => s.len(),
Some(serde_json::Value::Array(blocks)) => blocks
.iter()
.filter_map(|b| b.get("text").and_then(|t| t.as_str()))
.map(|t| t.len())
.sum(),
_ => 0,
})
.sum();
*observed_clone.lock().unwrap() = Some(total);
}
let msg = text_response("ok");
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![
crate::agent::agent_loop::message::StreamEvent::Done {
reason,
message: msg,
usage: None,
},
]))
});
let mut ctx = empty_context();
ctx.messages = preloaded;
let mut cfg = build_config();
cfg.tool_execution = ToolExecutionMode::Sequential;
let (tx, _rx) = mpsc::channel::<LoopEvent>(64);
let _ = run_agent_loop(
vec![user("start")],
ctx,
cfg,
AbortSignal::new(),
&tx,
&factory,
None,
None,
)
.await;
let observed = observed_second_call_total_chars.lock().unwrap();
let total_after_fold = observed.expect("first model call must have happened");
assert!(
total_after_fold < 100_000,
"proactive fold should have shrunk the preloaded transcript; saw {total_after_fold} chars",
);
}
#[tokio::test]
async fn dirge_el3n_proactive_fold_does_not_fire_under_threshold() {
use crate::agent::agent_loop::stream::{LlmContext, StreamFn};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
let modest = "y".repeat(4_000);
let preloaded = vec![serde_json::json!({
"role": "toolResult",
"content": [{"type": "text", "text": modest}],
"toolName": "read",
})];
let observed_first_call_chars: std::sync::Arc<Mutex<Option<usize>>> =
std::sync::Arc::new(Mutex::new(None));
let observed_clone = observed_first_call_chars.clone();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let factory: StreamFn = std::sync::Arc::new(move |ctx: LlmContext, _opts| {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n == 0 {
let total: usize = ctx
.messages
.iter()
.map(|m| match m.get("content") {
Some(serde_json::Value::String(s)) => s.len(),
Some(serde_json::Value::Array(blocks)) => blocks
.iter()
.filter_map(|b| b.get("text").and_then(|t| t.as_str()))
.map(|t| t.len())
.sum(),
_ => 0,
})
.sum();
*observed_clone.lock().unwrap() = Some(total);
}
let msg = text_response("ok");
let reason = msg.stop_reason;
Box::pin(futures::stream::iter(vec![
crate::agent::agent_loop::message::StreamEvent::Done {
reason,
message: msg,
usage: None,
},
]))
});
let mut ctx = empty_context();
ctx.messages = preloaded;
let mut cfg = build_config();
cfg.tool_execution = ToolExecutionMode::Sequential;
let (tx, _rx) = mpsc::channel::<LoopEvent>(64);
let _ = run_agent_loop(
vec![user("start")],
ctx,
cfg,
AbortSignal::new(),
&tx,
&factory,
None,
None,
)
.await;
let observed = observed_first_call_chars.lock().unwrap();
let total = observed.expect("first model call must have happened");
assert!(
total >= 4_000,
"under-threshold ratio must not trigger fold; saw {total} chars (input was 4000)",
);
}
#[test]
fn record_compaction_outcome_drives_counter() {
let mut f = 0u32;
super::record_compaction_outcome(&mut f, super::SummaryOutcome::Failed);
assert_eq!(f, 1);
super::record_compaction_outcome(&mut f, super::SummaryOutcome::Failed);
assert_eq!(f, 2);
super::record_compaction_outcome(&mut f, super::SummaryOutcome::Skipped);
assert_eq!(f, 2, "skip must not change the counter");
super::record_compaction_outcome(&mut f, super::SummaryOutcome::Succeeded(0));
assert_eq!(f, 0, "success resets the counter");
}
#[tokio::test]
async fn compaction_circuit_breaker_skips_summarizer_after_max_failures() {
use std::sync::atomic::{AtomicUsize, Ordering};
let calls = std::sync::Arc::new(AtomicUsize::new(0));
let calls_inner = calls.clone();
let summarize_fn: Option<crate::agent::compression::SummarizeFn> =
Some(std::sync::Arc::new(move |_prompt: String| {
let c = calls_inner.clone();
Box::pin(async move {
c.fetch_add(1, Ordering::SeqCst);
Err(anyhow::anyhow!("summarizer boom"))
})
}));
let make_ctx = || {
let mut ctx = empty_context();
ctx.messages
.push(serde_json::json!({"role":"system","content":"agent"}));
ctx.messages
.push(serde_json::json!({"role":"user","content":"task"}));
for i in 0..20 {
let role = if i % 2 == 0 { "assistant" } else { "user" };
ctx.messages.push(serde_json::json!({
"role": role, "content": format!("turn {i} with filler content")
}));
}
ctx.messages
.push(serde_json::json!({"role":"user","content":"latest"}));
ctx
};
let (tx, _rx) = mpsc::channel::<LoopEvent>(64);
for failures in 0..super::MAX_CONSECUTIVE_COMPACTION_FAILURES {
let mut ctx = make_ctx();
let outcome = super::run_compaction_pass(
&mut ctx,
&summarize_fn,
5,
failures,
&None,
None,
&tx,
&empty_checkpoint_slot(),
&mut 0,
u64::MAX,
)
.await;
assert_eq!(
outcome,
super::SummaryOutcome::Failed,
"failures={failures}: summarizer should run and fail"
);
}
let calls_before_open = calls.load(Ordering::SeqCst);
assert_eq!(
calls_before_open,
super::MAX_CONSECUTIVE_COMPACTION_FAILURES as usize,
"summarizer should run once per sub-threshold attempt"
);
let mut ctx = make_ctx();
let n_before = ctx.messages.len();
let outcome = super::run_compaction_pass(
&mut ctx,
&summarize_fn,
5,
super::MAX_CONSECUTIVE_COMPACTION_FAILURES,
&None,
None,
&tx,
&empty_checkpoint_slot(),
&mut 0,
u64::MAX,
)
.await;
assert_eq!(
outcome,
super::SummaryOutcome::Skipped,
"breaker open → summarizer skipped"
);
assert_eq!(
calls.load(Ordering::SeqCst),
calls_before_open,
"breaker open: summarizer must NOT be invoked"
);
assert!(
ctx.messages.len() <= n_before,
"prune-only fallback must not grow context"
);
}
#[tokio::test]
async fn context_compacted_reports_compaction_kind() {
use crate::event::CompactionKind;
async fn kind_for(
summarize_fn: Option<crate::agent::compression::SummarizeFn>,
failures: u32,
) -> CompactionKind {
let mut ctx = empty_context();
ctx.messages
.push(serde_json::json!({"role":"system","content":"agent"}));
ctx.messages
.push(serde_json::json!({"role":"user","content":"task"}));
for i in 0..20 {
let role = if i % 2 == 0 { "assistant" } else { "user" };
ctx.messages.push(serde_json::json!({
"role": role, "content": format!("turn {i} with filler content")
}));
}
ctx.messages
.push(serde_json::json!({"role":"user","content":"latest"}));
let (tx, mut rx) = mpsc::channel::<LoopEvent>(8);
super::run_compaction_pass(
&mut ctx,
&summarize_fn,
5,
failures,
&None,
None,
&tx,
&empty_checkpoint_slot(),
&mut 0,
u64::MAX,
)
.await;
drop(tx);
while let Some(ev) = rx.recv().await {
if let LoopEvent::ContextCompacted {
compaction_kind, ..
} = ev
{
return compaction_kind;
}
}
panic!("no ContextCompacted event emitted");
}
let good: Option<crate::agent::compression::SummarizeFn> = Some(std::sync::Arc::new(
|_p: String| {
Box::pin(async move {
Ok("## Active Task\nx\n\n## Goal\ny\n\n## Completed Actions\n1. z\n\n## Remaining Work\nw"
.to_string())
})
},
));
assert_eq!(kind_for(good, 0).await, CompactionKind::PruneAndSummary);
let bad: Option<crate::agent::compression::SummarizeFn> =
Some(std::sync::Arc::new(|_p: String| {
Box::pin(async move { Err(anyhow::anyhow!("boom")) })
}));
assert_eq!(
kind_for(bad, 0).await,
CompactionKind::PruneAndFailedSummary
);
assert_eq!(kind_for(None, 0).await, CompactionKind::PruneOnly);
let would_succeed: Option<crate::agent::compression::SummarizeFn> = Some(std::sync::Arc::new(
|_p: String| {
Box::pin(async move {
Ok("## Active Task\nx\n\n## Goal\ny\n\n## Completed Actions\n1. z\n\n## Remaining Work\nw"
.to_string())
})
},
));
assert_eq!(
kind_for(would_succeed, super::MAX_CONSECUTIVE_COMPACTION_FAILURES).await,
CompactionKind::PruneSummarizerDisabled
);
}
#[test]
fn todo_nudge_message_pluralizes() {
let one = match todo_nudge_message(1) {
LoopMessage::User(u) => u.content,
_ => panic!("expected a user message"),
};
assert!(one.contains("1 unfinished todo "), "singular: {one}");
let many = match todo_nudge_message(3) {
LoopMessage::User(u) => u.content,
_ => panic!("expected a user message"),
};
assert!(many.contains("3 unfinished todos "), "plural: {many}");
}
#[tokio::test]
async fn finalization_hook_short_circuits_lower_gates() {
let mut config = build_config();
config.get_followup_messages = Some(std::sync::Arc::new(|| {
Box::pin(async {
vec![LoopMessage::User(
crate::agent::agent_loop::message::UserMessage {
content: "hook follow-up".into(),
},
)]
})
}));
let mut critic_done = false;
let mut goal_reacts = 0u8;
let mut todo_nudges = 0u8;
let (msgs, source) = poll_finalization_follow_up(
&config,
"sys",
&[],
&mut critic_done,
&mut goal_reacts,
&mut todo_nudges,
)
.await;
assert_eq!(source, FollowUpSource::Hook);
assert_eq!(msgs.len(), 1);
assert!(
!critic_done,
"hook must short-circuit before the critic runs"
);
assert_eq!(todo_nudges, 0, "todo gate must not be reached");
}
#[tokio::test]
async fn finalization_all_gates_silent_yields_none() {
let config = build_config(); let mut critic_done = false;
let mut goal_reacts = 0u8;
let mut todo_nudges = MAX_TODO_NUDGES; let (msgs, source) = poll_finalization_follow_up(
&config,
"sys",
&[],
&mut critic_done,
&mut goal_reacts,
&mut todo_nudges,
)
.await;
assert!(msgs.is_empty());
assert_eq!(source, FollowUpSource::None);
}
#[tokio::test]
async fn finalization_goal_unmet_reenters_and_counts() {
use crate::agent::agent_loop::critic::CriticFn;
let mut config = build_config();
config.goal = Some("all tests pass and committed".into());
let judge: CriticFn =
Arc::new(|_p| Box::pin(async { Ok("GOAL: UNMET\n- tests still failing".to_string()) }));
config.critic_fn = Some(judge);
let mut critic_done = true; let mut goal_reacts = 0u8;
let mut todo_nudges = MAX_TODO_NUDGES;
let (msgs, source) = poll_finalization_follow_up(
&config,
"sys",
&[],
&mut critic_done,
&mut goal_reacts,
&mut todo_nudges,
)
.await;
assert_eq!(source, FollowUpSource::Goal);
assert_eq!(goal_reacts, 1, "an unmet goal counts one re-entry");
assert_eq!(msgs.len(), 1);
}
#[tokio::test]
async fn finalization_goal_met_finalizes() {
use crate::agent::agent_loop::critic::CriticFn;
let mut config = build_config();
config.goal = Some("all tests pass".into());
let judge: CriticFn = Arc::new(|_p| Box::pin(async { Ok("GOAL: MET".to_string()) }));
config.critic_fn = Some(judge);
let mut critic_done = true;
let mut goal_reacts = 0u8;
let mut todo_nudges = MAX_TODO_NUDGES;
let (msgs, source) = poll_finalization_follow_up(
&config,
"sys",
&[],
&mut critic_done,
&mut goal_reacts,
&mut todo_nudges,
)
.await;
assert!(msgs.is_empty());
assert_eq!(source, FollowUpSource::None);
assert_eq!(goal_reacts, 0);
}
#[tokio::test]
async fn finalization_goal_bound_stops_reentry() {
use crate::agent::agent_loop::critic::CriticFn;
let mut config = build_config();
config.goal = Some("unsatisfiable".into());
let judge: CriticFn = Arc::new(|_p| Box::pin(async { Ok("GOAL: UNMET".to_string()) }));
config.critic_fn = Some(judge);
let mut critic_done = true;
let mut goal_reacts = crate::agent::agent_loop::goal::MAX_GOAL_REACT;
let mut todo_nudges = MAX_TODO_NUDGES;
let (msgs, source) = poll_finalization_follow_up(
&config,
"sys",
&[],
&mut critic_done,
&mut goal_reacts,
&mut todo_nudges,
)
.await;
assert!(msgs.is_empty());
assert_eq!(source, FollowUpSource::None, "bound reached → finalize");
}
#[tokio::test]
async fn finalization_goal_without_judge_is_inert() {
let mut config = build_config();
config.goal = Some("all tests pass".into());
config.critic_fn = None;
let mut critic_done = true;
let mut goal_reacts = 0u8;
let mut todo_nudges = MAX_TODO_NUDGES;
let (msgs, source) = poll_finalization_follow_up(
&config,
"sys",
&[],
&mut critic_done,
&mut goal_reacts,
&mut todo_nudges,
)
.await;
assert!(msgs.is_empty());
assert_eq!(source, FollowUpSource::None);
assert_eq!(goal_reacts, 0);
}
#[derive(Debug)]
struct FailingTool;
impl LoopTool for FailingTool {
fn name(&self) -> &str {
"boom"
}
fn description(&self) -> &str {
"Always fails"
}
fn label(&self) -> &str {
"Boom"
}
fn parameters(&self) -> &Value {
static EMPTY: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
EMPTY.get_or_init(|| serde_json::json!({"type": "object"}))
}
fn execute<'a>(
&'a self,
_id: &'a str,
_args: Value,
_signal: AbortSignal,
_on_update: LoopToolUpdate,
) -> Pin<Box<dyn Future<Output = Result<super::super::LoopToolResult, String>> + Send + 'a>>
{
Box::pin(async move { Err("boom: nothing matched".to_string()) })
}
}
#[tokio::test]
async fn consecutive_distinct_failures_inject_recovery_checkpoint() {
let mut ctx = empty_context();
ctx.tools.push(std::sync::Arc::new(FailingTool));
let factory = canned_factory(vec![
tool_use_response("c1", "boom", serde_json::json!({"n": 1})),
tool_use_response("c2", "boom", serde_json::json!({"n": 2})),
tool_use_response("c3", "boom", serde_json::json!({"n": 3})),
text_response("giving up"),
]);
let (tx, _rx) = mpsc::channel::<LoopEvent>(256);
let messages = run_agent_loop(
vec![user("do the thing")],
ctx,
build_config(),
AbortSignal::new(),
&tx,
&factory,
None,
None,
)
.await;
drop(tx);
let checkpoint = messages.iter().find_map(|m| match m {
LoopMessage::User(u) if u.content.contains("[Recovery checkpoint]") => {
Some(u.content.clone())
}
_ => None,
});
let body =
checkpoint.expect("a recovery checkpoint must be injected after 3 distinct failures");
assert!(body.contains("3 tool calls in a row have failed"));
assert!(body.contains("boom: nothing matched"));
assert!(body.contains("DIFFERENT next step"));
}
#[tokio::test]
async fn failure_then_success_injects_no_checkpoint() {
let mut ctx = empty_context();
ctx.tools.push(std::sync::Arc::new(FailingTool));
ctx.tools.push(std::sync::Arc::new(EchoTool::new()));
let factory = canned_factory(vec![
tool_use_response("c1", "boom", serde_json::json!({"n": 1})),
tool_use_response("c2", "echo", serde_json::json!({"v": 1})),
tool_use_response("c3", "boom", serde_json::json!({"n": 2})),
text_response("ok"),
]);
let (tx, _rx) = mpsc::channel::<LoopEvent>(256);
let messages = run_agent_loop(
vec![user("go")],
ctx,
build_config(),
AbortSignal::new(),
&tx,
&factory,
None,
None,
)
.await;
drop(tx);
assert!(
!messages.iter().any(|m| matches!(
m,
LoopMessage::User(u) if u.content.contains("[Recovery checkpoint]")
)),
"a success between failures must reset the streak"
);
}