use crate::compact::compact_history;
use crate::llm::{Completion, ConversationUsageWindow, LLMClientDyn};
use crate::prompts::subagent_system_prompt;
use crate::session::{SessionWriter, ToolCallRecord, now_unix_ms};
use crate::tools::{Tool, floor_char_boundary, tool_definitions};
use eyre::Result;
use rig::OneOrMany;
use rig::completion::Message;
use rig::completion::message::{ToolResult, ToolResultContent, UserContent};
use serde_json::{Value, json};
use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tracing::{info, warn};
const MAX_TOOL_RESULT_BYTES: usize = 50_000;
const MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS: usize = 3;
const MAX_CYCLE_REPETITIONS: usize = 2;
const TOOL_CALL_HISTORY_WINDOW: usize = 8;
const MAX_SUBAGENT_DEPTH: usize = 2;
const MAX_CONSECUTIVE_BLOCKED_TOOL_CALLS: usize = 3;
const FINAL_TURN_WRAP_UP_PROMPT: &str = "Budget is nearly exhausted. Stop investigating and wrap up now with your best final answer based on the evidence you already gathered. Do not call more investigation tools.";
pub struct AgentResult {
pub text: String,
pub turns: usize,
pub tool_calls: usize,
pub subagents_spawned: usize,
pub total_input_tokens: u64,
pub total_output_tokens: u64,
pub total_tokens: u64,
}
pub struct AgentProgress {
pub turns: usize,
pub tool_calls: usize,
pub subagents_spawned: usize,
pub last_subagent: Option<String>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum AgentDepth {
TopLevel,
Subagent { level: usize },
}
impl AgentDepth {
fn level(self) -> usize {
match self {
AgentDepth::TopLevel => 0,
AgentDepth::Subagent { level } => level,
}
}
fn is_subagent(self) -> bool {
matches!(self, AgentDepth::Subagent { .. })
}
fn can_spawn_subagent(self) -> bool {
self.level() < MAX_SUBAGENT_DEPTH
}
}
pub struct AgentConfig {
pub name: String,
pub session_agent: String,
pub model: String,
pub max_turns: usize,
pub compact_threshold: Option<u64>,
pub system_prompt: String,
pub client: Arc<dyn LLMClientDyn>,
pub depth: AgentDepth,
pub terminal_tools: Vec<String>,
pub empty_response_nudge: Option<String>,
pub max_empty_responses: usize,
pub subagent_counter: Arc<AtomicUsize>,
pub progress: Option<Arc<dyn Fn(AgentProgress) + Send + Sync>>,
pub project_context: Option<String>,
pub session_writer: Option<SessionWriter>,
}
struct FinishTool {
result: Arc<Mutex<Option<String>>>,
}
struct ToolCallContext<'a> {
config: &'a AgentConfig,
runtime_tools: &'a HashMap<String, Arc<dyn Tool>>,
tools_map: &'a HashMap<String, Arc<dyn Tool>>,
work_dir: &'a Path,
turn: usize,
current_turns: usize,
total_tool_calls: usize,
initial_subagent_count: usize,
}
struct ToolCallOutcome {
output: String,
nested_tool_calls: usize,
repeated_tool_call_blocked: bool,
status: &'static str,
spawned_agent: Option<String>,
subagent_input_tokens: u64,
subagent_output_tokens: u64,
subagent_total_tokens: u64,
}
struct PreparedSubagent {
task: String,
spawned_agent: String,
config: AgentConfig,
}
struct SubagentOutcome {
output: String,
tool_calls: usize,
spawned_agent: Option<String>,
input_tokens: u64,
output_tokens: u64,
total_tokens: u64,
}
impl Tool for FinishTool {
fn name(&self) -> String {
"finish".to_string()
}
fn definition(&self) -> rig::completion::ToolDefinition {
rig::completion::ToolDefinition {
name: "finish".to_string(),
description:
"Finish the assigned subtask and return the final result to the parent agent."
.to_string(),
parameters: json!({
"type": "object",
"properties": {
"result": {
"type": "string",
"description": "Concise final result for the parent agent"
}
},
"required": ["result"],
"additionalProperties": false
}),
}
}
fn call(
&self,
args: Value,
_work_dir: PathBuf,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send>> {
let result_store = Arc::clone(&self.result);
Box::pin(async move {
let result = args
.get("result")
.and_then(|value| value.as_str())
.ok_or_else(|| eyre::eyre!("missing result"))?
.to_string();
*result_store.lock().unwrap_or_else(|e| e.into_inner()) = Some(result);
Ok("ok".to_string())
})
}
}
pub async fn run_agent(
config: AgentConfig,
initial_message: &str,
tools_map: &HashMap<String, Arc<dyn Tool>>,
work_dir: &Path,
) -> Result<AgentResult> {
let finish_store = Arc::new(Mutex::new(None));
let mut runtime_tools = tools_map.clone();
if config.depth.is_subagent() {
let finish_tool = Arc::new(FinishTool {
result: Arc::clone(&finish_store),
});
runtime_tools.insert("finish".to_string(), finish_tool as Arc<dyn Tool>);
}
let mut effective_system_prompt = config.system_prompt.clone();
if let Some(ref ctx) = config.project_context {
if !ctx.is_empty() {
effective_system_prompt.push_str("\n\n");
effective_system_prompt.push_str(ctx);
}
}
let mut history = Vec::new();
let mut prompt = Message::user(initial_message.to_string());
history.push(prompt.clone());
let mut total_input_tokens = 0u64;
let mut total_output_tokens = 0u64;
let mut total_tokens = 0u64;
let mut conversation_usage = ConversationUsageWindow::new(config.compact_threshold);
let mut total_tool_calls = 0usize;
let mut empty_response_count = 0usize;
let mut tool_call_history: VecDeque<String> = VecDeque::new();
let initial_subagent_count = config.subagent_counter.load(Ordering::Relaxed);
let mut consecutive_blocked_count = 0usize;
let mut last_subagent: Option<String> = None;
for turn in 0..config.max_turns {
let is_final_turn = turn + 1 == config.max_turns;
let mut available_tools = runtime_tools.clone();
if !config.depth.can_spawn_subagent() || is_final_turn {
available_tools.remove("spawn_subagent");
}
if !is_final_turn && conversation_usage.should_compact() {
let usage_before_compaction = conversation_usage.usage();
if let Some(compaction_usage) = compact_history(
Arc::clone(&config.client),
&config.model,
&effective_system_prompt,
&mut history,
&mut prompt,
turn + 1,
usage_before_compaction,
)
.await?
{
total_input_tokens =
total_input_tokens.saturating_add(compaction_usage.input_tokens);
total_output_tokens =
total_output_tokens.saturating_add(compaction_usage.output_tokens);
total_tokens = total_tokens.saturating_add(compaction_usage.total_tokens);
}
conversation_usage.reset();
}
if is_final_turn {
available_tools.retain(|name, _| {
config.terminal_tools.iter().any(|terminal| terminal == name)
|| (config.depth.is_subagent() && name == "finish")
});
let wrap_up_prompt = Message::user(FINAL_TURN_WRAP_UP_PROMPT.to_string());
history.push(wrap_up_prompt.clone());
prompt = wrap_up_prompt;
}
let completion = Completion {
model: config.model.clone(),
prompt: prompt.clone(),
preamble: Some(effective_system_prompt.clone()),
history: history[..history.len().saturating_sub(1)].to_vec(),
tools: tool_definitions(&available_tools),
tool_choice: None,
max_tokens: Some(8192),
additional_params: None,
};
let response = config.client.completion(completion).await?;
total_input_tokens = total_input_tokens.saturating_add(response.usage.input_tokens);
total_output_tokens = total_output_tokens.saturating_add(response.usage.output_tokens);
total_tokens = total_tokens.saturating_add(response.usage.total_tokens);
conversation_usage.record(response.usage);
let assistant_message = response.message();
history.push(assistant_message.clone());
if let Some(tool_calls) = response.tool_calls() {
empty_response_count = 0;
let mut results = Vec::new();
total_tool_calls += tool_calls.len();
report_progress(&config, turn + 1, total_tool_calls, initial_subagent_count, last_subagent.clone());
let mut should_terminate = false;
for call in tool_calls {
let tool_name = call.function.name.clone();
let args = call.function.arguments.clone();
let tool_call_key = format!("{tool_name}:{args}");
tool_call_history.push_back(tool_call_key);
if tool_call_history.len() > TOOL_CALL_HISTORY_WINDOW {
tool_call_history.pop_front();
}
let cycle_len = detect_tool_call_cycle(&tool_call_history);
should_terminate |= config.terminal_tools.iter().any(|name| name == &tool_name);
if tool_name == "spawn_subagent" {
if let Some(task) = args.get("task").and_then(|v| v.as_str()) {
last_subagent = Some(first_line(task));
}
}
info!(agent = %config.name, tool = %tool_name, args = %args, turn, "tool call");
let outcome = execute_tool_call(
ToolCallContext {
config: &config,
runtime_tools: &available_tools,
tools_map,
work_dir,
turn,
current_turns: turn + 1,
total_tool_calls,
initial_subagent_count,
},
&tool_name,
args,
cycle_len,
)
.await?;
if outcome.repeated_tool_call_blocked {
consecutive_blocked_count += 1;
} else {
consecutive_blocked_count = 0;
}
let ToolCallOutcome {
output,
nested_tool_calls,
repeated_tool_call_blocked: _,
status: _,
spawned_agent: _,
subagent_input_tokens,
subagent_output_tokens,
subagent_total_tokens,
} = outcome;
total_tool_calls += nested_tool_calls;
total_input_tokens = total_input_tokens.saturating_add(subagent_input_tokens);
total_output_tokens = total_output_tokens.saturating_add(subagent_output_tokens);
total_tokens = total_tokens.saturating_add(subagent_total_tokens);
if tool_name == "spawn_subagent" {
last_subagent = None;
}
report_progress(&config, turn + 1, total_tool_calls, initial_subagent_count, last_subagent.clone());
let mut output = output;
if output.len() > MAX_TOOL_RESULT_BYTES {
let boundary = floor_char_boundary(&output, MAX_TOOL_RESULT_BYTES);
output.truncate(boundary);
output.push_str("\n... truncated (>50k bytes)");
}
results.push(ToolResult {
id: call.id.clone(),
call_id: call.call_id.clone(),
content: OneOrMany::one(ToolResultContent::text(output)),
});
}
if config.depth.is_subagent() {
if let Some(result) = finish_store
.lock()
.unwrap_or_else(|e| e.into_inner())
.take()
{
info!(
agent = %config.name,
turn,
input_tokens = response.usage.input_tokens,
output_tokens = response.usage.output_tokens,
total_tokens = response.usage.total_tokens,
total_input_tokens,
total_output_tokens,
total_tokens_so_far = total_tokens,
response_len = result.len(),
"subagent finished"
);
return Ok(AgentResult {
text: result,
turns: turn + 1,
tool_calls: total_tool_calls,
subagents_spawned: config.subagent_counter.load(Ordering::Relaxed)
- initial_subagent_count,
total_input_tokens,
total_output_tokens,
total_tokens,
});
}
}
if should_terminate {
info!(
agent = %config.name,
turn,
total_tool_calls,
total_input_tokens,
total_output_tokens,
total_tokens,
"terminal tool called"
);
return Ok(AgentResult {
text: String::new(),
turns: turn + 1,
tool_calls: total_tool_calls,
subagents_spawned: config.subagent_counter.load(Ordering::Relaxed)
- initial_subagent_count,
total_input_tokens,
total_output_tokens,
total_tokens,
});
}
let tool_message = Message::User {
content: OneOrMany::many(results.into_iter().map(UserContent::ToolResult))
.expect("tool results must not be empty"),
};
history.push(tool_message.clone());
if consecutive_blocked_count >= MAX_CONSECUTIVE_BLOCKED_TOOL_CALLS && !is_final_turn {
warn!(
agent = %config.name,
turn,
consecutive_blocked_count,
"forcing compaction to break tool-call cycle"
);
let usage = conversation_usage.usage();
let compaction_usage = compact_history(
Arc::clone(&config.client),
&config.model,
&effective_system_prompt,
&mut history,
&mut prompt,
turn + 1,
usage,
)
.await?;
if let Some(cu) = compaction_usage {
total_input_tokens = total_input_tokens.saturating_add(cu.input_tokens);
total_output_tokens = total_output_tokens.saturating_add(cu.output_tokens);
total_tokens = total_tokens.saturating_add(cu.total_tokens);
}
conversation_usage.reset();
let cycle_break_msg = Message::user(
"Note: you were stuck in a repetitive tool-call loop. \
Avoid repeating the same tool calls. Try a different approach."
.to_string(),
);
if let Some(last) = history.last_mut() {
*last = cycle_break_msg.clone();
}
prompt = cycle_break_msg;
tool_call_history.clear();
consecutive_blocked_count = 0;
continue;
}
prompt = tool_message;
} else {
tool_call_history.clear();
let text = response.text();
report_progress(&config, turn + 1, total_tool_calls, initial_subagent_count, last_subagent.clone());
if text.is_empty() {
if let Some(nudge) = &config.empty_response_nudge {
empty_response_count += 1;
if empty_response_count <= config.max_empty_responses && !is_final_turn {
let nudge = Message::user(nudge.clone());
history.push(nudge.clone());
prompt = nudge;
continue;
}
}
eyre::bail!("empty response from model (no text, no tool calls)");
}
if config.depth.is_subagent() {
eyre::bail!("subagent returned text without calling finish")
}
info!(
agent = %config.name,
turn,
input_tokens = response.usage.input_tokens,
output_tokens = response.usage.output_tokens,
total_tokens = response.usage.total_tokens,
total_input_tokens,
total_output_tokens,
total_tokens_so_far = total_tokens,
response_len = text.len(),
"finished"
);
return Ok(AgentResult {
text,
turns: turn + 1,
tool_calls: total_tool_calls,
subagents_spawned: config.subagent_counter.load(Ordering::Relaxed)
- initial_subagent_count,
total_input_tokens,
total_output_tokens,
total_tokens,
});
}
}
if config.depth.is_subagent() {
eyre::bail!("subagent exhausted {} turns without calling finish", config.max_turns);
}
eyre::bail!(
"agent exhausted {} turns without producing a final answer after wrap-up prompt",
config.max_turns
)
}
fn report_progress(
config: &AgentConfig,
turns: usize,
tool_calls: usize,
initial_subagent_count: usize,
last_subagent: Option<String>,
) {
if let Some(progress) = &config.progress {
progress(AgentProgress {
turns,
tool_calls,
subagents_spawned: config.subagent_counter.load(Ordering::Relaxed)
- initial_subagent_count,
last_subagent,
});
}
}
pub fn add_spawn_subagent_tool(tools_map: &mut HashMap<String, Arc<dyn Tool>>) {
tools_map.insert("spawn_subagent".to_string(), Arc::new(SpawnSubagentTool));
}
struct SpawnSubagentTool;
impl Tool for SpawnSubagentTool {
fn name(&self) -> String {
"spawn_subagent".to_string()
}
fn definition(&self) -> rig::completion::ToolDefinition {
rig::completion::ToolDefinition {
name: "spawn_subagent".to_string(),
description:
"Delegate a complex multi-step investigation to a subagent. Only use when the task requires several tool calls (e.g. tracing logic across multiple files). Do NOT use for single file reads or simple lookups — call those tools directly instead."
.to_string(),
parameters: json!({
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "A compact self-contained task for the subagent"
}
},
"required": ["task"],
"additionalProperties": false
}),
}
}
fn call(
&self,
_args: Value,
_work_dir: PathBuf,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String>> + Send>> {
Box::pin(async { unreachable!("spawn_subagent is handled internally") })
}
}
fn prepare_subagent(
parent_config: &AgentConfig,
args: &Value,
parent_turns: usize,
parent_tool_calls: usize,
initial_subagent_count: usize,
) -> Result<PreparedSubagent> {
let task = match args.get("task").and_then(|value| value.as_str()) {
Some(task) if !task.trim().is_empty() => task.to_string(),
_ => eyre::bail!("missing task"),
};
let subagent_id = parent_config
.subagent_counter
.fetch_add(1, Ordering::Relaxed)
+ 1;
let spawned_agent = format!("subagent-{subagent_id}");
report_progress(
parent_config,
parent_turns,
parent_tool_calls,
initial_subagent_count,
Some(first_line(&task)),
);
let subagent_level = parent_config.depth.level() + 1;
let subagent_config = AgentConfig {
name: format!("{}/subagent-{subagent_id}", parent_config.name),
session_agent: spawned_agent.clone(),
model: parent_config.model.clone(),
max_turns: parent_config.max_turns,
compact_threshold: parent_config.compact_threshold,
system_prompt: subagent_system_prompt().to_string(),
client: Arc::clone(&parent_config.client),
depth: AgentDepth::Subagent {
level: subagent_level,
},
terminal_tools: Vec::new(),
empty_response_nudge: None,
max_empty_responses: 0,
subagent_counter: Arc::clone(&parent_config.subagent_counter),
progress: None,
project_context: parent_config.project_context.clone(),
session_writer: parent_config.session_writer.clone(),
};
Ok(PreparedSubagent {
task,
spawned_agent,
config: subagent_config,
})
}
async fn run_subagent(
prepared: PreparedSubagent,
tools_map: &HashMap<String, Arc<dyn Tool>>,
work_dir: &Path,
) -> SubagentOutcome {
let PreparedSubagent {
task,
spawned_agent,
config,
} = prepared;
match Box::pin(run_agent(config, &task, tools_map, work_dir)).await {
Ok(result) => SubagentOutcome {
output: result.text,
tool_calls: result.tool_calls,
spawned_agent: Some(spawned_agent),
input_tokens: result.total_input_tokens,
output_tokens: result.total_output_tokens,
total_tokens: result.total_tokens,
},
Err(err) => SubagentOutcome {
output: format!("Error: {err}"),
tool_calls: 0,
spawned_agent: Some(spawned_agent),
input_tokens: 0,
output_tokens: 0,
total_tokens: 0,
},
}
}
fn first_line(s: &str) -> String {
s.lines().next().unwrap_or(s).to_string()
}
fn detect_tool_call_cycle(history: &VecDeque<String>) -> usize {
let h: Vec<&str> = history.iter().map(String::as_str).collect();
let n = h.len();
for period in 1..=3 {
let reps = if period == 1 {
MAX_CONSECUTIVE_IDENTICAL_TOOL_CALLS
} else {
MAX_CYCLE_REPETITIONS
};
let needed = period * reps;
if n < needed {
continue;
}
let start = n - needed;
let pattern = &h[start..start + period];
if h[start..]
.chunks(period)
.take(reps)
.all(|chunk| chunk == pattern)
{
return period;
}
}
0
}
async fn execute_tool_call(
ctx: ToolCallContext<'_>,
tool_name: &str,
args: Value,
cycle_len: usize,
) -> Result<ToolCallOutcome> {
if cycle_len > 0 {
warn!(
agent = %ctx.config.name,
tool = %tool_name,
args = %args,
turn = ctx.turn,
cycle_len,
"blocking cyclic tool call"
);
let msg = if cycle_len == 1 {
format!(
"Warning: repeated identical tool call blocked for {tool_name}. Think twice; try changing the arguments or using a different tool."
)
} else {
format!(
"Warning: tool call cycle of period {cycle_len} detected (e.g. A→B→A→B). You are looping without making progress. Try a different approach or tool."
)
};
let outcome = ToolCallOutcome {
output: msg,
nested_tool_calls: 0,
repeated_tool_call_blocked: true,
status: "blocked_cycle",
spawned_agent: None,
subagent_input_tokens: 0,
subagent_output_tokens: 0,
subagent_total_tokens: 0,
};
log_tool_call(
ctx.config,
ctx.turn + 1,
tool_name,
&args,
outcome.status,
outcome.spawned_agent.as_deref(),
)
.await;
return Ok(outcome);
}
if tool_name == "spawn_subagent" {
if !ctx.config.depth.can_spawn_subagent() || !ctx.runtime_tools.contains_key("spawn_subagent") {
let outcome = ToolCallOutcome {
output: "Error: subagent depth limit reached; cannot spawn another subagent"
.to_string(),
nested_tool_calls: 0,
repeated_tool_call_blocked: false,
status: "error",
spawned_agent: None,
subagent_input_tokens: 0,
subagent_output_tokens: 0,
subagent_total_tokens: 0,
};
log_tool_call(
ctx.config,
ctx.turn + 1,
tool_name,
&args,
outcome.status,
outcome.spawned_agent.as_deref(),
)
.await;
return Ok(outcome);
}
info!(agent = %ctx.config.name, turn = ctx.turn, "spawning subagent");
let prepared = match prepare_subagent(
ctx.config,
&args,
ctx.current_turns,
ctx.total_tool_calls,
ctx.initial_subagent_count,
) {
Ok(prepared) => prepared,
Err(err) => {
let outcome = ToolCallOutcome {
output: format!("Error: {err}"),
nested_tool_calls: 0,
repeated_tool_call_blocked: false,
status: "error",
spawned_agent: None,
subagent_input_tokens: 0,
subagent_output_tokens: 0,
subagent_total_tokens: 0,
};
log_tool_call(
ctx.config,
ctx.turn + 1,
tool_name,
&args,
outcome.status,
outcome.spawned_agent.as_deref(),
)
.await;
return Ok(outcome);
}
};
log_tool_call(
ctx.config,
ctx.turn + 1,
tool_name,
&args,
"started",
Some(&prepared.spawned_agent),
)
.await;
let sub = run_subagent(prepared, ctx.tools_map, ctx.work_dir).await;
let status = if sub.output.starts_with("Error:") { "error" } else { "ok" };
let outcome = ToolCallOutcome {
output: sub.output,
nested_tool_calls: sub.tool_calls,
repeated_tool_call_blocked: false,
status,
spawned_agent: sub.spawned_agent,
subagent_input_tokens: sub.input_tokens,
subagent_output_tokens: sub.output_tokens,
subagent_total_tokens: sub.total_tokens,
};
return Ok(outcome);
}
let logged_args = args.clone();
let outcome = match ctx.runtime_tools.get(tool_name) {
Some(tool) => match tool.call(args, ctx.work_dir.to_path_buf()).await {
Ok(output) => ToolCallOutcome {
output,
nested_tool_calls: 0,
repeated_tool_call_blocked: false,
status: "ok",
spawned_agent: None,
subagent_input_tokens: 0,
subagent_output_tokens: 0,
subagent_total_tokens: 0,
},
Err(err) => {
warn!(agent = %ctx.config.name, tool = %tool_name, error = %err, "tool error");
ToolCallOutcome {
output: format!("Error: {err}"),
nested_tool_calls: 0,
repeated_tool_call_blocked: false,
status: "error",
spawned_agent: None,
subagent_input_tokens: 0,
subagent_output_tokens: 0,
subagent_total_tokens: 0,
}
}
},
None => {
let msg = format!("Error: unknown tool '{tool_name}'");
warn!(agent = %ctx.config.name, tool = %tool_name, "unknown tool");
ToolCallOutcome {
output: msg,
nested_tool_calls: 0,
repeated_tool_call_blocked: false,
status: "error",
spawned_agent: None,
subagent_input_tokens: 0,
subagent_output_tokens: 0,
subagent_total_tokens: 0,
}
}
};
log_tool_call(
ctx.config,
ctx.turn + 1,
tool_name,
&logged_args,
outcome.status,
outcome.spawned_agent.as_deref(),
)
.await;
Ok(outcome)
}
async fn log_tool_call(
config: &AgentConfig,
turn: usize,
tool_name: &str,
args: &Value,
status: &'static str,
spawned_agent: Option<&str>,
) {
let Some(session_writer) = config.session_writer.as_ref() else {
return;
};
let record = ToolCallRecord {
ts_unix_ms: now_unix_ms(),
agent: config.session_agent.clone(),
depth: config.depth.level(),
turn,
tool: tool_name.to_string(),
args: args.clone(),
status: status.to_string(),
spawned_agent: spawned_agent.map(str::to_string),
};
if let Err(err) = session_writer.append_tool_call(&record).await {
warn!(tool = %tool_name, error = %err, "failed to write trajectory log");
}
}