use serde_json::Value;
use tokio::sync::mpsc;
use super::context_manager::{self, PostUsageDecisionKind};
use super::inflight::InflightSet;
use super::message::{
AssistantMessage, ContentBlock, LoopEvent, LoopMessage, StopReason, ToolResultMessage,
loop_message_to_value, tool_result_to_value,
};
use super::storm::StormBreaker;
use super::stream::{StreamFn, stream_assistant_response};
use super::tool::AbortSignal;
use super::types::{Context, LoopConfig};
async fn poll_steering_and_reminder(
config: &LoopConfig,
guards: &super::activity::LoopGuards,
) -> (Vec<LoopMessage>, bool) {
let mut out = match &config.get_steering_messages {
Some(get) => get().await,
None => Vec::new(),
};
let had_user_steering = !out.is_empty();
if let Some(tracker) = &config.file_touch_tracker {
out.extend(tracker.poll_reminder());
}
out.extend(guards.poll_reflection());
(out, had_user_steering)
}
fn tool_result_excerpt(content: &[super::message::ContentBlock]) -> String {
content
.iter()
.filter_map(|b| match b {
super::message::ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join(" ")
}
#[allow(clippy::type_complexity)]
fn storm_for_config(config: &LoopConfig) -> StormBreaker {
let has_custom = config.storm_mutating_tools.is_some() || config.storm_exempt_tools.is_some();
if !has_custom {
return StormBreaker::default();
}
let mutating: Option<Box<dyn Fn(&super::tools::ToolCall) -> bool + Send + Sync>> =
config.storm_mutating_tools.as_ref().map(|extras| {
let extra_set: std::collections::HashSet<String> = extras.iter().cloned().collect();
Box::new(move |c: &super::tools::ToolCall| {
super::storm::default_mutating(c) || extra_set.contains(&c.name)
}) as Box<dyn Fn(&super::tools::ToolCall) -> bool + Send + Sync>
});
let exempt: Option<Box<dyn Fn(&super::tools::ToolCall) -> bool + Send + Sync>> =
config.storm_exempt_tools.as_ref().map(|extras| {
let extra_set: std::collections::HashSet<String> = extras.iter().cloned().collect();
Box::new(move |c: &super::tools::ToolCall| {
super::storm::default_exempt(c) || extra_set.contains(&c.name)
}) as Box<dyn Fn(&super::tools::ToolCall) -> bool + Send + Sync>
});
StormBreaker::new(6, 3, mutating, exempt)
}
const MAX_TODO_NUDGES: u8 = 3;
const FAILURE_REFLECTION_THRESHOLD: usize = 3;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FollowUpSource {
Hook,
Verifier,
Critic,
Goal,
Todo,
None,
}
pub(crate) const TODO_NUDGE_TAG: &str = "[todo]";
pub(crate) const MAX_TURNS_NOTICE_PREFIX: &str = "[dirge] Max agent turns";
fn todo_nudge_message(unfinished: usize) -> LoopMessage {
LoopMessage::User(super::message::UserMessage {
content: format!(
"{TODO_NUDGE_TAG} You still have {unfinished} unfinished todo{} (pending or in progress). \
Finish the remaining work, or if it's genuinely done or no longer needed, \
update the todo list (mark items completed/cancelled) before stopping.",
if unfinished == 1 { "" } else { "s" }
),
})
}
async fn poll_finalization_follow_up(
config: &LoopConfig,
system_prompt: &str,
new_messages: &[LoopMessage],
critic_done: &mut bool,
goal_reacts: &mut u8,
todo_nudges: &mut u8,
) -> (Vec<LoopMessage>, FollowUpSource) {
if let Some(get) = &config.get_followup_messages {
let msgs = get().await;
if !msgs.is_empty() {
return (msgs, FollowUpSource::Hook);
}
}
if let Some(verifier) = &config.verifier {
let msgs = verifier.check_before_finalize();
if !msgs.is_empty() {
return (msgs, FollowUpSource::Verifier);
}
}
if !*critic_done && config.critic_fn.is_some() && run_made_tool_calls(new_messages) {
*critic_done = true;
if let Some(critic) = &config.critic_fn {
let transcript = build_critic_transcript(new_messages);
let verification = config.verifier.as_ref().map(|v| v.status());
let msgs =
super::critic::run_critic(critic, system_prompt, &transcript, verification).await;
if !msgs.is_empty() {
return (msgs, FollowUpSource::Critic);
}
}
}
if *goal_reacts < super::goal::MAX_GOAL_REACT
&& let Some(goal) = &config.goal
&& let Some(judge) = &config.critic_fn
{
let transcript = build_critic_transcript(new_messages);
let verification = config.verifier.as_ref().map(|v| v.status());
let msgs =
super::goal::run_goal_gate(judge, goal, system_prompt, &transcript, verification).await;
if !msgs.is_empty() {
*goal_reacts += 1;
return (msgs, FollowUpSource::Goal);
}
}
if *todo_nudges < MAX_TODO_NUDGES {
let unfinished = crate::agent::tools::todo::unfinished_count();
if unfinished > 0 {
*todo_nudges += 1;
return (vec![todo_nudge_message(unfinished)], FollowUpSource::Todo);
}
}
(Vec::new(), FollowUpSource::None)
}
fn build_augmented_focus(
focus_topic: Option<&str>,
provider: Option<&std::sync::Arc<dyn crate::extras::memory_provider::MemoryProvider>>,
middle: &[serde_json::Value],
) -> Option<String> {
let insights = provider.map(|p| {
let transcript = transcript_from_value_slice(middle);
crate::agent::review::fire_pre_compress(p.as_ref(), &transcript)
});
match (
focus_topic.map(str::trim),
insights.as_deref().map(str::trim),
) {
(Some(focus), Some(ins)) if !focus.is_empty() && !ins.is_empty() => {
Some(format!("{focus}\n\nProvider insights:\n{ins}"))
}
(Some(focus), _) if !focus.is_empty() => Some(focus.to_string()),
(_, Some(ins)) if !ins.is_empty() => Some(format!("Provider insights:\n{ins}")),
_ => None,
}
}
fn transcript_from_value_slice(messages: &[serde_json::Value]) -> String {
use std::fmt::Write as _;
let mut out = String::new();
for m in messages {
let role = m.get("role").and_then(|v| v.as_str()).unwrap_or("?");
let content = m
.get("content")
.and_then(|v| v.as_str())
.unwrap_or_default();
if !content.is_empty() {
let _ = writeln!(out, "{}: {}", role, content);
out.push('\n');
}
}
out
}
const MAX_CONSECUTIVE_COMPACTION_FAILURES: u32 = 3;
const EXEMPLAR_TOP_K: usize = 3;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SummaryOutcome {
Succeeded(usize),
Failed,
Skipped,
}
fn record_compaction_outcome(failures: &mut u32, outcome: SummaryOutcome) {
match outcome {
SummaryOutcome::Succeeded(_) => *failures = 0,
SummaryOutcome::Failed => *failures = failures.saturating_add(1),
SummaryOutcome::Skipped => {}
}
}
#[derive(Clone)]
struct CachedCheckpoint {
summary: String,
boundary: usize,
generation: u64,
}
type CheckpointSlot = std::sync::Arc<std::sync::Mutex<Option<CachedCheckpoint>>>;
const COMPACTION_SUMMARY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
const CHECKPOINT_SUMMARY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(120);
fn spawn_incremental_checkpoint(
sfn: crate::agent::compression::SummarizeFn,
messages: Vec<serde_json::Value>,
emit: mpsc::Sender<LoopEvent>,
slot: CheckpointSlot,
generation: u64,
) {
tokio::spawn(async move {
use crate::agent::compression;
if messages.is_empty() {
return;
}
let boundary = messages.len();
let budget = compression::summary_budget(compression::estimate_messages_tokens(&messages));
let prompt = compression::build_summary_prompt(&messages, budget, None, None);
let result = tokio::time::timeout(CHECKPOINT_SUMMARY_TIMEOUT, sfn(prompt)).await;
if let Ok(Ok(summary)) = result
&& compression::validate_summary(&summary)
{
if let Ok(mut guard) = slot.lock() {
*guard = Some(CachedCheckpoint {
summary: summary.clone(),
boundary,
generation,
});
}
let _ = emit.send(LoopEvent::CheckpointRefresh { summary }).await;
}
});
}
#[allow(clippy::too_many_arguments)]
async fn run_compaction_pass(
current_context: &mut Context,
summarize_fn: &Option<crate::agent::compression::SummarizeFn>,
protect_tail: usize,
compaction_failures: u32,
memory_provider: &Option<std::sync::Arc<dyn crate::extras::memory_provider::MemoryProvider>>,
compaction_hooks: Option<&crate::agent::agent_loop::types::CompactionHooks>,
emit: &mpsc::Sender<LoopEvent>,
checkpoint_slot: &CheckpointSlot,
generation: &mut u64,
fold_target: u64,
) -> SummaryOutcome {
run_compaction_pass_with_focus(
current_context,
summarize_fn,
protect_tail,
compaction_failures,
None,
memory_provider,
compaction_hooks,
emit,
checkpoint_slot,
generation,
fold_target,
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn run_compaction_pass_with_focus(
current_context: &mut Context,
summarize_fn: &Option<crate::agent::compression::SummarizeFn>,
protect_tail: usize,
compaction_failures: u32,
focus_topic: Option<String>,
memory_provider: &Option<std::sync::Arc<dyn crate::extras::memory_provider::MemoryProvider>>,
compaction_hooks: Option<&crate::agent::agent_loop::types::CompactionHooks>,
emit: &mpsc::Sender<LoopEvent>,
checkpoint_slot: &CheckpointSlot,
generation: &mut u64,
fold_target: u64,
) -> SummaryOutcome {
use crate::agent::compression;
let before = compression::estimate_messages_tokens(¤t_context.messages);
if let Some(hooks) = compaction_hooks {
(hooks.on_before)(current_context.messages.len(), before).await;
}
let pruned = compression::prune_tool_outputs(¤t_context.messages, protect_tail);
current_context.messages = pruned;
let after_prune = compression::estimate_messages_tokens(¤t_context.messages);
let mut after_summary = after_prune;
let mut applied_summary = String::new();
let mut applied_first_kept = current_context.messages.len();
let mut outcome = SummaryOutcome::Skipped;
let mut breaker_open = false;
if compaction_failures >= MAX_CONSECUTIVE_COMPACTION_FAILURES {
breaker_open = true;
tracing::warn!(
target: "dirge::agent_loop",
failures = compaction_failures,
"compaction summarizer failed {compaction_failures} consecutive times — circuit breaker open, skipping LLM summarization",
);
} else if let Some(sfn) = summarize_fn {
let reusable = checkpoint_slot
.lock()
.ok()
.and_then(|g| g.clone())
.filter(|cp| cp.generation == *generation);
let mut reused = false;
if let Some(cp) = reusable
&& let Some((new_msgs, first_kept)) = compression::apply_checkpoint_summary(
¤t_context.messages,
&cp.summary,
cp.boundary,
)
{
let projected = compression::estimate_messages_tokens(&new_msgs);
if projected <= fold_target {
current_context.messages = new_msgs;
after_summary = projected;
applied_summary = cp.summary;
applied_first_kept = first_kept;
outcome = SummaryOutcome::Succeeded(first_kept);
reused = true;
tracing::info!(
target: "dirge::agent_loop",
boundary = cp.boundary,
tokens_after = projected,
"fast compaction: reused background checkpoint summary (no inline LLM call)",
);
}
}
let (start, end) = compression::compute_compress_window(
¤t_context.messages,
compression::PROTECT_HEAD_DEFAULT,
protect_tail.max(compression::PROTECT_TAIL_DEFAULT),
);
if !reused && start < end {
let _ = emit
.send(LoopEvent::CompactionStarted {
tokens_before: before,
})
.await;
let middle: Vec<serde_json::Value> = current_context.messages[start..end].to_vec();
let prev =
compression::find_previous_summary(¤t_context.messages).map(|(_, body)| body);
let budget =
compression::summary_budget(compression::estimate_messages_tokens(&middle));
let augmented_focus =
build_augmented_focus(focus_topic.as_deref(), memory_provider.as_ref(), &middle);
let plugin_summary: Option<String> = match compaction_hooks {
Some(hooks) => match (hooks.on_compact)(middle.clone()).await {
Some(s) if compression::validate_summary(&s) => Some(s),
_ => None,
},
None => None,
};
let summary_result: Result<String, _> = match plugin_summary {
Some(s) => Ok(s),
None => {
let prompt = compression::build_summary_prompt(
&middle,
budget,
prev.as_deref(),
augmented_focus.as_deref(),
);
match tokio::time::timeout(COMPACTION_SUMMARY_TIMEOUT, sfn(prompt)).await {
Ok(r) => r,
Err(_) => Err(anyhow::anyhow!(
"compaction summarizer timed out after {}s",
COMPACTION_SUMMARY_TIMEOUT.as_secs()
)),
}
}
};
match summary_result {
Ok(summary) if compression::validate_summary(&summary) => {
let new_msgs =
compression::apply_summary(¤t_context.messages, &summary, start, end);
current_context.messages = new_msgs;
after_summary =
compression::estimate_messages_tokens(¤t_context.messages);
applied_summary = summary;
applied_first_kept = start;
outcome = SummaryOutcome::Succeeded(start);
}
Ok(_) => {
tracing::warn!(
target: "dirge::agent_loop",
"compaction summarizer returned an unvalidated summary — keeping pruned context",
);
outcome = SummaryOutcome::Failed;
}
Err(e) => {
tracing::warn!(
target: "dirge::agent_loop",
error = %e,
"compaction summarizer failed — keeping pruned context",
);
outcome = SummaryOutcome::Failed;
}
}
}
}
if matches!(outcome, SummaryOutcome::Succeeded(_)) {
*generation = generation.wrapping_add(1);
if let Ok(mut guard) = checkpoint_slot.lock() {
*guard = None;
}
}
let compaction_kind = if breaker_open {
crate::event::CompactionKind::PruneSummarizerDisabled
} else {
match outcome {
SummaryOutcome::Succeeded(_) => crate::event::CompactionKind::PruneAndSummary,
SummaryOutcome::Failed => crate::event::CompactionKind::PruneAndFailedSummary,
SummaryOutcome::Skipped => crate::event::CompactionKind::PruneOnly,
}
};
let new_id = compression::rotate_session_id();
let _ = emit
.send(LoopEvent::ContextCompacted {
new_session_id: new_id,
tokens_before: before,
tokens_after: after_summary,
summary: applied_summary,
first_kept_index: applied_first_kept,
compaction_kind,
summary_model: None,
})
.await;
outcome
}
const POST_COMPACT_MAX_READ_BYTES: u64 = 2 * 1024 * 1024;
const POST_COMPACT_RESTORE_CEILING: f64 = 0.50;
async fn restore_working_files(
config: &LoopConfig,
ctx: &mut Context,
summary_idx: usize,
ctx_max: u64,
) {
let Some(tracker) = &config.file_touch_tracker else {
return;
};
let files = tracker.working_files();
if files.is_empty() {
return;
}
let post_fold = crate::agent::compression::estimate_messages_tokens(&ctx.messages);
if post_fold as f64 > POST_COMPACT_RESTORE_CEILING * ctx_max.max(1) as f64 {
tracing::debug!(
target: "dirge::agent_loop",
post_fold,
ctx_max,
"skipping post-compaction file restore — insufficient headroom",
);
return;
}
let mut contents: Vec<(std::path::PathBuf, String)> = Vec::new();
for path in files
.into_iter()
.take(crate::agent::compression::POST_COMPACT_MAX_FILES)
{
match tokio::fs::metadata(&path).await {
Ok(m) if m.len() > POST_COMPACT_MAX_READ_BYTES => continue,
Ok(_) => {}
Err(_) => continue,
}
if let Ok(body) = tokio::fs::read_to_string(&path).await {
contents.push((path, body));
}
}
if contents.is_empty() {
return;
}
let snapshots = crate::agent::compression::build_post_compact_snapshots(&contents);
let at = (summary_idx + 1).min(ctx.messages.len());
for (offset, snap) in snapshots.into_iter().enumerate() {
ctx.messages.insert(at + offset, snap);
}
}
#[allow(clippy::too_many_arguments)]
pub async fn run_agent_loop(
prompts: Vec<LoopMessage>,
mut context: Context,
config: LoopConfig,
signal: AbortSignal,
emit: &mpsc::Sender<LoopEvent>,
stream_fn: &StreamFn,
summarize_fn: Option<crate::agent::compression::SummarizeFn>,
memory_provider: Option<std::sync::Arc<dyn crate::extras::memory_provider::MemoryProvider>>,
) -> Vec<LoopMessage> {
let new_messages = prompts.clone();
let task_query: String = prompts
.iter()
.filter_map(|m| match m {
LoopMessage::User(u) => Some(u.content.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join(" ");
if let Some(block) = crate::agent::exemplars::block_for_task(&task_query, EXEMPLAR_TOP_K) {
let ex_msg = LoopMessage::User(super::message::UserMessage { content: block });
context.messages.push(loop_message_to_value(&ex_msg));
}
for prompt in &prompts {
context.messages.push(loop_message_to_value(prompt));
if let (Some(tracker), LoopMessage::User(u)) = (&config.file_touch_tracker, prompt) {
tracker.record_user_message(&u.content);
}
}
if super::context_manager::verbatim_pre_recall_enabled()
&& let Some(provider) = &memory_provider
&& super::context_manager::query_worth_pre_recalling(&task_query)
{
let snapshot = provider.format_for_system_prompt();
let q = task_query.clone();
let p = provider.clone();
match tokio::task::spawn_blocking(move || p.search(&q)).await {
Ok(Ok(resp)) => {
if let Some(block) = super::context_manager::pre_recall_block(&resp, &snapshot) {
let msg = LoopMessage::User(super::message::UserMessage { content: block });
context.messages.push(loop_message_to_value(&msg));
}
}
Ok(Err(e)) => {
tracing::debug!(target: "dirge::memory", error = %e, "pre-recall search failed")
}
Err(e) => {
tracing::debug!(target: "dirge::memory", error = %e, "pre-recall task join failed")
}
}
}
let _ = emit.send(LoopEvent::AgentStart).await;
let _ = emit.send(LoopEvent::TurnStart).await;
for prompt in &prompts {
let _ = emit
.send(LoopEvent::MessageStart {
message: prompt.clone(),
})
.await;
let _ = emit
.send(LoopEvent::MessageEnd {
message: prompt.clone(),
})
.await;
}
run_loop(
context,
new_messages,
config,
signal,
emit,
stream_fn,
summarize_fn,
memory_provider,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn run_loop(
mut current_context: Context,
mut new_messages: Vec<LoopMessage>,
mut config: LoopConfig,
signal: AbortSignal,
emit: &mpsc::Sender<LoopEvent>,
stream_fn: &StreamFn,
summarize_fn: Option<crate::agent::compression::SummarizeFn>,
memory_provider: Option<std::sync::Arc<dyn crate::extras::memory_provider::MemoryProvider>>,
) -> Vec<LoopMessage> {
let mut first_turn = true;
let mut guards = super::activity::LoopGuards::new(
storm_for_config(&config),
super::failure_tracker::FailureTracker::new(FAILURE_REFLECTION_THRESHOLD),
);
let inflight = InflightSet::new();
let mut folded_this_turn: bool;
let mut compaction_failures: u32 = 0;
let mut snip_tokens_freed: u64 = 0;
let (mut pending_messages, _initial_user_steering): (Vec<LoopMessage>, bool) =
poll_steering_and_reminder(&config, &guards).await;
let mut turns_taken: usize = 0;
let mut reflections = super::reflexion::ReflectionLog::new();
let mut critic_done = false;
let mut goal_reacts: u8 = 0;
let mut checkpoint_schedule: Option<context_manager::CheckpointSchedule> = None;
let checkpoint_slot: CheckpointSlot = std::sync::Arc::new(std::sync::Mutex::new(None));
let mut checkpoint_generation: u64 = 0;
let mut todo_nudges: u8 = 0;
'outer: loop {
guards.reset_turn();
let mut turn_self_corrected = false;
folded_this_turn = false;
let mut has_more_tool_calls = true;
while has_more_tool_calls || !pending_messages.is_empty() {
let mut compaction_recorded_this_iter = false;
let model_window = context_manager::context_window_override().unwrap_or_else(|| {
config
.model_name
.as_deref()
.and_then(crate::config::context_window_for_model)
.unwrap_or(128_000)
});
let ctx_max = context_manager::effective_ctx_max(model_window);
if !first_turn {
let _ = emit.send(LoopEvent::TurnStart).await;
} else {
first_turn = false;
}
if !folded_this_turn {
let rough_estimate =
crate::agent::compression::estimate_messages_tokens(¤t_context.messages);
let estimate = context_manager::estimate_turn_start(rough_estimate, ctx_max);
if estimate.ratio > context_manager::TURN_START_FOLD_THRESHOLD {
tracing::info!(
target: "dirge::agent_loop",
estimate_tokens = %estimate.estimate_tokens,
ctx_max = %estimate.ctx_max,
ratio = %estimate.ratio,
"context-manager: turn-start fold firing ({}% of context)",
(estimate.ratio * 100.0) as u32,
);
let outcome = run_compaction_pass(
&mut current_context,
&summarize_fn,
5, compaction_failures,
&memory_provider,
config.compaction_hooks.as_ref(),
emit,
&checkpoint_slot,
&mut checkpoint_generation,
(ctx_max as f64 * context_manager::HISTORY_FOLD_THRESHOLD) as u64,
)
.await;
if let SummaryOutcome::Succeeded(idx) = outcome {
restore_working_files(&config, &mut current_context, idx, ctx_max).await;
}
if !compaction_recorded_this_iter {
record_compaction_outcome(&mut compaction_failures, outcome);
compaction_recorded_this_iter = true;
}
folded_this_turn = true;
}
}
if let Some(provider) = &memory_provider
&& context_manager::take_memories_dirty()
{
let block = provider.format_for_system_prompt();
if !block.trim().is_empty() {
current_context.messages.push(serde_json::json!({
"role": "system",
"content": format!(
"## Updated memory (consolidated mid-session)\n{block}"
),
}));
}
}
if !pending_messages.is_empty() {
for msg in &pending_messages {
let _ = emit
.send(LoopEvent::MessageStart {
message: msg.clone(),
})
.await;
let _ = emit
.send(LoopEvent::MessageEnd {
message: msg.clone(),
})
.await;
current_context.messages.push(loop_message_to_value(msg));
new_messages.push(msg.clone());
if let (Some(tracker), LoopMessage::User(u)) = (&config.file_touch_tracker, msg)
&& !u.content.contains("[Context-depth reminder]")
{
tracker.record_user_message(&u.content);
}
}
pending_messages.clear();
}
let cap_estimate =
crate::agent::compression::estimate_messages_tokens(¤t_context.messages);
let result_cap = crate::agent::compression::tiered_result_cap(cap_estimate, ctx_max);
let (capped, freed) = crate::agent::compression::cap_oversized_tool_results_counted(
¤t_context.messages,
result_cap,
);
current_context.messages = capped;
snip_tokens_freed = snip_tokens_freed.saturating_add(freed);
let (assistant_msg, token_usage) = stream_assistant_response(
&mut current_context,
&config,
signal.clone(),
emit,
stream_fn,
)
.await;
new_messages.push(LoopMessage::Assistant(assistant_msg.clone()));
if matches!(
assistant_msg.stop_reason,
StopReason::Error | StopReason::Aborted
) {
let _ = emit
.send(LoopEvent::TurnEnd {
message: assistant_msg.clone(),
tool_results: Vec::new(),
})
.await;
let _ = emit
.send(LoopEvent::AgentEnd {
messages: new_messages.clone(),
})
.await;
return new_messages;
}
let mut tool_calls = extract_tool_calls_from(&assistant_msg);
let allowed_names: std::collections::HashSet<String> = current_context
.tools
.iter()
.map(|t| t.name().to_string())
.collect();
let scavenge_source = build_scavenge_source(&assistant_msg.content);
if !scavenge_source.is_empty() {
let scavenge_result =
super::scavenge::scavenge_tool_calls(Some(&scavenge_source), &allowed_names, 4);
if !scavenge_result.calls.is_empty() {
use super::message::canonical_json;
let seen_signatures: std::collections::HashSet<String> = tool_calls
.iter()
.map(|tc| format!("{}::{}", tc.name, canonical_json(&tc.arguments)))
.collect();
for sc in &scavenge_result.calls {
let sig = format!("{}::{}", sc.name, canonical_json(&sc.arguments));
if !seen_signatures.contains(&sig) {
tool_calls.push(sc.clone());
}
}
}
}
apply_truncation_repair(
&mut tool_calls,
&config.repair_stats,
&config.truncation_notes,
);
let mut tool_results: Vec<ToolResultMessage> = Vec::new();
has_more_tool_calls = false;
let mut storm_give_up_tools: Option<Vec<String>> = None;
if !tool_calls.is_empty() {
let original_count = tool_calls.len();
let (surviving_calls, storm_report) = guards.inspect_calls(&tool_calls);
let all_suppressed = storm_report.all_suppressed(original_count);
if all_suppressed && !turn_self_corrected {
turn_self_corrected = true;
const REPEAT_LOOP_GUARD: &str = "[repeat-loop guard] You've made this exact call more than once and gotten the same result — you're stuck in a loop. Do NOT repeat it. Before doing anything else, work through these steps:\n\
1. State what you were trying to achieve with this call and why it isn't getting you there.\n\
2. Look at the earlier results for it above. What assumption of yours might be wrong, and what do those results actually tell you?\n\
3. Propose 2-3 FUNDAMENTALLY different approaches — a different tool, a different entry point, or a different interpretation of the problem — and pick the most promising one.\n\
4. Proceed with that approach.\n\
If none of them can work with the tools available, say so plainly and report what you found instead of trying again.";
for call in &tool_calls {
let args = super::message::canonical_json(&call.arguments);
let sig = super::reflexion::approach_signature(&call.name, &args);
reflections.record(sig);
}
let guard_text = format!(
"{REPEAT_LOOP_GUARD}{}",
reflections.block().unwrap_or_default()
);
let guard_blocks = vec![ContentBlock::Text {
text: guard_text.clone(),
}];
for call in &tool_calls {
let tr = ToolResultMessage {
tool_call_id: call.id.clone(),
tool_name: call.name.clone(),
content: guard_blocks.clone(),
details: Value::Null,
is_error: false,
};
current_context.messages.push(tool_result_to_value(&tr));
new_messages.push(LoopMessage::ToolResult(tr.clone()));
tool_results.push(tr);
}
has_more_tool_calls = true;
} else if storm_report.storms_broken > 0 && surviving_calls.is_empty() {
has_more_tool_calls = false;
storm_give_up_tools = Some(tool_calls.iter().map(|c| c.name.clone()).collect());
}
if !surviving_calls.is_empty() {
let batch = super::tools::execute_tool_calls(
¤t_context,
&assistant_msg,
&surviving_calls,
&config,
&signal,
emit,
&inflight,
)
.await;
tool_results.extend(batch.messages.clone());
has_more_tool_calls = !batch.terminate;
for result in &batch.messages {
let excerpt = tool_result_excerpt(&result.content);
let originating = surviving_calls
.iter()
.find(|c| c.id == result.tool_call_id)
.cloned()
.unwrap_or_else(|| super::tools::ToolCall {
id: result.tool_call_id.clone(),
name: result.tool_name.clone(),
arguments: serde_json::Value::Null,
});
guards.record_result(&originating, result.is_error, &excerpt);
current_context.messages.push(tool_result_to_value(result));
new_messages.push(LoopMessage::ToolResult(result.clone()));
}
}
for tr in super::tools::backfill_missing_tool_results(&tool_calls, &tool_results) {
current_context.messages.push(tool_result_to_value(&tr));
new_messages.push(LoopMessage::ToolResult(tr.clone()));
tool_results.push(tr);
}
if let Some(tools) = storm_give_up_tools.take() {
let text = super::storm::failure_narrative(&tools);
let msg =
AssistantMessage::new(vec![ContentBlock::Text { text }], StopReason::Stop);
let _ = emit
.send(LoopEvent::MessageStart {
message: LoopMessage::Assistant(msg.clone()),
})
.await;
let _ = emit
.send(LoopEvent::MessageUpdate {
message: msg.clone(),
phase: super::message::DeltaPhase::TextStart,
})
.await;
let _ = emit
.send(LoopEvent::MessageEnd {
message: LoopMessage::Assistant(msg.clone()),
})
.await;
current_context
.messages
.push(loop_message_to_value(&LoopMessage::Assistant(msg.clone())));
new_messages.push(LoopMessage::Assistant(msg));
}
}
let _ = emit
.send(LoopEvent::TurnEnd {
message: assistant_msg.clone(),
tool_results: tool_results.clone(),
})
.await;
{
let decision = context_manager::decide_after_usage(
token_usage.map(|u| u.input_tokens),
ctx_max,
folded_this_turn,
);
match decision.kind {
PostUsageDecisionKind::Fold if !folded_this_turn => {
folded_this_turn = true;
if crate::agent::compression::snip_bought_enough(
snip_tokens_freed,
ctx_max,
decision.aggressive,
) {
tracing::info!(
target: "dirge::agent_loop",
freed = snip_tokens_freed,
ratio = %decision.ratio,
"snip freed {snip_tokens_freed} tokens — sufficient, skipping fold",
);
} else {
tracing::info!(
target: "dirge::agent_loop",
ratio = %decision.ratio,
aggressive = decision.aggressive,
tail_budget = ?decision.tail_budget,
"context-manager: fold recommended ({})",
if decision.aggressive { "aggressive" } else { "normal" },
);
if let Some(prompt_tokens) = token_usage.map(|u| u.input_tokens)
&& crate::agent::compression::should_compress(
prompt_tokens,
ctx_max,
)
{
let outcome = run_compaction_pass(
&mut current_context,
&summarize_fn,
5, compaction_failures,
&memory_provider,
config.compaction_hooks.as_ref(),
emit,
&checkpoint_slot,
&mut checkpoint_generation,
(ctx_max as f64 * context_manager::HISTORY_FOLD_THRESHOLD)
as u64,
)
.await;
if let SummaryOutcome::Succeeded(idx) = outcome {
restore_working_files(
&config,
&mut current_context,
idx,
ctx_max,
)
.await;
}
if !compaction_recorded_this_iter {
record_compaction_outcome(&mut compaction_failures, outcome);
}
}
}
}
PostUsageDecisionKind::ExitWithSummary => {
tracing::warn!(
target: "dirge::agent_loop",
ratio = %decision.ratio,
"context-manager: forcing summary and ending turn",
);
let outcome = run_compaction_pass(
&mut current_context,
&summarize_fn,
3, compaction_failures,
&memory_provider,
config.compaction_hooks.as_ref(),
emit,
&checkpoint_slot,
&mut checkpoint_generation,
(ctx_max as f64 * context_manager::HISTORY_FOLD_THRESHOLD) as u64,
)
.await;
if let SummaryOutcome::Succeeded(idx) = outcome {
restore_working_files(&config, &mut current_context, idx, ctx_max)
.await;
}
if !compaction_recorded_this_iter {
record_compaction_outcome(&mut compaction_failures, outcome);
}
}
_ => {}
}
if context_manager::incremental_checkpoint_enabled()
&& let Some(sfn) = &summarize_fn
{
let sched = checkpoint_schedule
.get_or_insert_with(|| context_manager::CheckpointSchedule::new(ctx_max));
match decision.kind {
PostUsageDecisionKind::Fold | PostUsageDecisionKind::ExitWithSummary => {
sched.reset()
}
PostUsageDecisionKind::None => {
if sched.is_enabled() && sched.note_usage(decision.ratio) {
spawn_incremental_checkpoint(
sfn.clone(),
current_context.messages.clone(),
emit.clone(),
checkpoint_slot.clone(),
checkpoint_generation,
);
}
}
}
}
snip_tokens_freed = 0;
}
if let Some(hook) = &config.prepare_next_turn {
let hook_ctx = super::hooks::TurnHookContext {
message: assistant_msg.clone(),
tool_results: tool_results.clone(),
context: current_context.clone(),
new_messages: new_messages.clone(),
};
if let Some(update) = hook(hook_ctx).await {
if let Some(new_ctx) = update.context {
current_context = new_ctx;
}
if let Some(level) = update.thinking_level {
config.reasoning = Some(level);
tracing::debug!(
target: "dirge::agent_loop",
thinking = ?level,
"prepareNextTurn applied a new thinking_level for the next turn",
);
}
if let Some(model) = &update.model {
tracing::warn!(
target: "dirge::agent_loop",
requested_model = %model,
"prepareNextTurn returned a new model but mid-run model swap is not yet wired — ignoring",
);
}
}
}
if let Some(hook) = &config.should_stop_after_turn {
let hook_ctx = super::hooks::TurnHookContext {
message: assistant_msg.clone(),
tool_results: tool_results.clone(),
context: current_context.clone(),
new_messages: new_messages.clone(),
};
if hook(hook_ctx).await {
let _ = emit
.send(LoopEvent::AgentEnd {
messages: new_messages.clone(),
})
.await;
return new_messages;
}
}
let (next_pending, had_user_steering) =
poll_steering_and_reminder(&config, &guards).await;
pending_messages = next_pending;
if had_user_steering {
turns_taken = 0;
}
turns_taken += 1;
if let Some(cap) = config.max_turns
&& turns_taken >= cap
{
tracing::warn!(
target: "dirge::agent_loop",
turns = turns_taken,
cap = cap,
"max_turns reached — terminating run"
);
let notice = format!(
"{MAX_TURNS_NOTICE_PREFIX} ({cap}) reached. Stopping the run. Increase --max-agent-turns or `max_agent_turns` in config.json to allow more."
);
let _ = emit
.send(LoopEvent::SystemNotice {
content: notice.clone(),
})
.await;
new_messages.push(LoopMessage::User(super::message::UserMessage {
content: notice,
}));
break 'outer;
}
}
if signal.is_interjected() {
break;
}
let (follow_up, source) = poll_finalization_follow_up(
&config,
¤t_context.system_prompt,
&new_messages,
&mut critic_done,
&mut goal_reacts,
&mut todo_nudges,
)
.await;
if !follow_up.is_empty() {
tracing::trace!(target: "dirge::loop", ?source, "finalization follow-up interjected");
pending_messages = follow_up;
continue 'outer;
}
break;
}
{
let snapshot = config.repair_stats.snapshot();
if !snapshot.is_empty() {
let _ = emit.send(LoopEvent::RepairStats { snapshot }).await;
}
}
let _ = emit
.send(LoopEvent::AgentEnd {
messages: new_messages.clone(),
})
.await;
new_messages
}
fn extract_tool_calls_from(msg: &AssistantMessage) -> Vec<super::tools::ToolCall> {
super::tools::extract_tool_calls(msg)
}
fn run_made_tool_calls(new_messages: &[LoopMessage]) -> bool {
new_messages
.iter()
.any(|m| matches!(m, LoopMessage::ToolResult(_)))
}
fn build_critic_transcript(new_messages: &[LoopMessage]) -> String {
const MAX_CHARS: usize = 12_000;
const HEAD_CHARS: usize = 2_000;
const PER_RESULT_CHARS: usize = 400;
const ELISION: &str =
"\n…(earlier run steps elided; showing the start and the most recent activity)…\n";
let mut blocks: Vec<String> = Vec::new();
for m in new_messages {
match m {
LoopMessage::User(u) => {
blocks.push(format!("USER: {}\n", u.content.trim()));
}
LoopMessage::Assistant(a) => {
for block in &a.content {
match block {
ContentBlock::Text { text } if !text.trim().is_empty() => {
blocks.push(format!("ASSISTANT: {}\n", text.trim()));
}
ContentBlock::ToolCall {
name, arguments, ..
} => {
let args = serde_json::to_string(arguments).unwrap_or_default();
let args: String = args.chars().take(200).collect();
blocks.push(format!("ASSISTANT called {name}({args})\n"));
}
_ => {}
}
}
}
LoopMessage::ToolResult(t) => {
let text: String = t
.content
.iter()
.filter_map(|c| match c {
ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join(" ");
let denied = t.is_error && crate::agent::tools::is_permission_denial(&text);
let text: String = text.chars().take(PER_RESULT_CHARS).collect();
let tag = if denied {
"DENIED"
} else if t.is_error {
"ERROR"
} else {
"result"
};
blocks.push(format!("TOOL {} [{}]: {}\n", t.tool_name, tag, text.trim()));
}
_ => {}
}
}
let total: usize = blocks.iter().map(|b| b.chars().count()).sum();
if total <= MAX_CHARS {
return blocks.concat();
}
let mut head_end = 0;
let mut head_len = 0;
while head_end < blocks.len() {
let n = blocks[head_end].chars().count();
if head_len + n > HEAD_CHARS && head_end > 0 {
break;
}
head_len += n;
head_end += 1;
if head_len >= HEAD_CHARS {
break;
}
}
let tail_budget = MAX_CHARS.saturating_sub(head_len + ELISION.chars().count());
let mut tail_start = blocks.len();
let mut tail_len = 0;
while tail_start > head_end {
let n = blocks[tail_start - 1].chars().count();
if tail_len + n > tail_budget && tail_start < blocks.len() {
break;
}
tail_len += n;
tail_start -= 1;
if tail_len >= tail_budget {
break;
}
}
let mut out = String::new();
out.push_str(&blocks[..head_end].concat());
out.push_str(ELISION);
out.push_str(&blocks[tail_start..].concat());
let len = out.chars().count();
if len > MAX_CHARS {
return out.chars().skip(len - MAX_CHARS).collect();
}
out
}
pub(crate) fn build_scavenge_source(blocks: &[ContentBlock]) -> String {
blocks
.iter()
.filter_map(|b| match b {
ContentBlock::Thinking { text } => Some(text.as_str()),
ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("\n")
}
pub(crate) fn apply_truncation_repair(
tool_calls: &mut [crate::agent::agent_loop::ToolCall],
repair_stats: &crate::agent::agent_loop::tool_input_repair::RepairStats,
truncation_notes: &std::sync::Arc<
std::sync::Mutex<std::collections::HashMap<String, Vec<String>>>,
>,
) {
use crate::agent::agent_loop::tool_input_repair::{RepairKind, repair_truncated_json};
for tc in tool_calls.iter_mut() {
if let serde_json::Value::String(raw) = &tc.arguments {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(raw) {
tc.arguments = parsed;
continue;
}
let r = repair_truncated_json(raw);
if !r.changed {
continue;
}
repair_stats.record(RepairKind::TruncationFixed);
let prefix = if r.fallback {
format!("[{}] ⚠️ TRUNCATION UNRECOVERABLE", tc.name)
} else {
format!("[{}]", tc.name)
};
let mut sink = truncation_notes.lock().expect("truncation_notes poisoned");
let entry = sink.entry(tc.id.clone()).or_default();
for n in &r.notes {
entry.push(format!("{prefix} {n}"));
}
drop(sink);
if !r.fallback {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&r.repaired) {
tc.arguments = parsed;
}
}
}
}
}
#[cfg(test)]
#[path = "run_tests.rs"]
mod tests;