use std::path::PathBuf;
use std::sync::Arc;
use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason as AcpStopReason};
use tokio_util::sync::CancellationToken;
use crate::event::AgentEvent;
use crate::fs::FsBackend;
use crate::hooks::{HookCtx, HookEngine};
use crate::http::HttpClient;
use crate::llm::{
CompletionRequest, HostedCapabilities, LlmProvider, Message, MessageContent, Role,
SamplingParams, StopReason as LlmStopReason, ToolChoice, Usage,
};
use crate::policy::SandboxPolicy;
use crate::session::events::EventEmitter;
use crate::session::permissions::PermissionGate;
use crate::session::{History, ToolRegistry, TurnError};
use crate::shell::ShellBackend;
const DEFAULT_PROMPT_FILE: &str = "AGENTS.md";
mod request_audit;
mod compact;
mod microcompact;
mod compaction_slot;
pub use compaction_slot::CompactionSlot;
mod sanitize;
mod content;
mod llm_drive;
mod tools;
mod hooks;
use content::content_block_to_message_content;
use hooks::UserPromptHookFlow;
use llm_drive::{assistant_message, real_input_tokens};
use tools::{
Approved, DecisionFlow, approved_tool_name, reject_oversized_results, tool_results_message,
};
pub(crate) use request_audit::RequestAuditTracker;
pub(crate) use compact::{CompactionCtx, run_sync as run_sync_compaction};
#[derive(Debug, Clone, Copy)]
pub enum TurnRequestLimit {
Unbounded,
Fixed(u32),
Adaptive {
initial: u32,
expand_on_progress: bool,
},
}
impl TurnRequestLimit {
fn initial_cap(&self) -> Option<u32> {
match *self {
Self::Unbounded => None,
Self::Fixed(n) => Some(n),
Self::Adaptive { initial, .. } => Some(initial),
}
}
fn expand_on_progress(&self) -> bool {
matches!(
self,
Self::Adaptive {
expand_on_progress: true,
..
}
)
}
}
#[derive(Debug, Clone)]
pub struct TurnConfig {
pub provider: String,
pub model: String,
pub allowed_models: Option<Vec<String>>,
pub base_prompt: BasePromptConfig,
pub system_prompt: Option<String>,
pub prompt: PromptConfig,
pub sampling: SamplingParams,
pub request_limit: TurnRequestLimit,
pub compact_threshold_tokens: Option<u64>,
pub compact_ratio: Option<f64>,
pub background_compact_enabled: bool,
pub compact_soft_ratio: Option<f64>,
pub microcompact_enabled: bool,
pub microcompact_ratio: Option<f64>,
pub max_llm_retries: u32,
pub max_concurrent_tools: usize,
pub max_hook_continues: u32,
pub subagent_max_depth: u32,
}
impl Default for TurnConfig {
fn default() -> Self {
Self {
provider: String::new(),
model: String::new(),
allowed_models: None,
base_prompt: BasePromptConfig::default(),
system_prompt: None,
prompt: PromptConfig::default(),
sampling: SamplingParams::default(),
request_limit: TurnRequestLimit::Adaptive {
initial: 32,
expand_on_progress: true,
},
compact_threshold_tokens: None,
compact_ratio: Some(0.85),
background_compact_enabled: true,
compact_soft_ratio: Some(0.7),
microcompact_enabled: true,
microcompact_ratio: Some(0.6),
max_llm_retries: 3,
max_concurrent_tools: 0,
max_hook_continues: DEFAULT_MAX_HOOK_CONTINUES,
subagent_max_depth: DEFAULT_SUBAGENT_MAX_DEPTH,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct BasePromptConfig {
pub file: Option<PathBuf>,
pub text: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PromptConfig {
pub file: String,
pub text: Option<String>,
pub provider_overlays: std::collections::BTreeMap<String, String>,
pub model_overlays: std::collections::BTreeMap<String, String>,
}
impl Default for PromptConfig {
fn default() -> Self {
Self {
file: DEFAULT_PROMPT_FILE.to_owned(),
text: None,
provider_overlays: std::collections::BTreeMap::new(),
model_overlays: std::collections::BTreeMap::new(),
}
}
}
pub struct TurnRunner<'a> {
pub history: &'a dyn History,
pub tools: &'a dyn ToolRegistry,
pub session_tools: Option<Arc<dyn ToolRegistry>>,
pub provider: &'a dyn LlmProvider,
pub policy: Arc<dyn SandboxPolicy>,
pub events: Arc<EventEmitter>,
pub permissions: &'a PermissionGate,
pub cancel: CancellationToken,
pub config: &'a TurnConfig,
pub config_arc: Option<Arc<TurnConfig>>,
pub system_prompt: Option<Arc<str>>,
pub cwd: &'a std::path::Path,
pub fs: Arc<dyn FsBackend>,
pub shell: Arc<dyn ShellBackend>,
pub http: Arc<dyn HttpClient>,
pub hosted_capabilities: HostedCapabilities,
pub hooks: &'a dyn HookEngine,
pub session_id: &'a SessionId,
pub background: Option<crate::session::BackgroundTasks>,
pub goal: Option<Arc<crate::session::GoalState>>,
pub compaction_slot: Option<crate::session::CompactionSlot>,
pub history_arc: Option<Arc<dyn History>>,
pub provider_arc: Option<Arc<dyn LlmProvider>>,
pub session_cancel: Option<CancellationToken>,
pub ingest_source: crate::hooks::step::IngestSource,
pub(crate) request_audit: &'a RequestAuditTracker,
}
impl<'a> TurnRunner<'a> {
pub async fn run(&self, prompt: Vec<ContentBlock>) -> Result<AcpStopReason, TurnError> {
let prompt = match self.fire_user_prompt_submit(prompt).await {
UserPromptHookFlow::Continue(p) => p,
UserPromptHookFlow::Refused => {
return Ok(AcpStopReason::Refusal);
}
};
let rollback_len = self.history.len();
self.events
.emit(AgentEvent::UserPromptCommitted {
content: prompt.clone(),
})
.await;
self.history.append(Message {
role: Role::User,
content: prompt
.into_iter()
.map(content_block_to_message_content)
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect(),
});
{
let mut step = crate::hooks::step::AfterIngest {
committed_len: 1,
additional_context: Vec::new(),
};
let _ = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
if !step.additional_context.is_empty() {
self.append_user_feedback(step.additional_context);
}
}
self.events.emit(AgentEvent::TurnStarted).await;
{
let mut step = crate::hooks::step::AfterTurnEnter {
is_subagent: false,
agent_type: None,
additional_context: Vec::new(),
};
let control = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
if !step.additional_context.is_empty() {
self.append_user_feedback(step.additional_context);
}
if let crate::hooks::step::HookControl::Break { .. } = control {
return Ok(AcpStopReason::EndTurn);
}
}
let result = self.run_inner().await;
match &result {
Ok(outcome) => {
self.events
.emit(AgentEvent::TurnEnded {
reason: outcome.reason,
usage: outcome.usage,
})
.await;
}
Err(_) => {
self.history.truncate(rollback_len);
self.events.emit(AgentEvent::TurnAborted).await;
}
}
result.map(|outcome| outcome.reason)
}
async fn run_inner(&self) -> Result<TurnOutcome, TurnError> {
let mut state = TurnState::new(self.config.request_limit, self.config.max_hook_continues);
loop {
if self.cancel.is_cancelled() {
return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
}
self.manage_context().await?;
let mut req = self.build_request();
let mut before_gen = crate::hooks::step::BeforeGenerate {
model: req.model.clone(),
message_count: req.messages.len(),
attempt: state.request_count.saturating_add(1),
assistant_text: None,
};
let bg_control = self.hooks.dispatch(&mut before_gen, self.hook_ctx()).await;
req.model = before_gen.model;
if let Some(text) = before_gen.assistant_text {
self.history.append(Message {
role: Role::Assistant,
content: vec![MessageContent::Text { text }].into(),
});
if self
.decide_turn_end(&mut state, AcpStopReason::EndTurn, true)
.await
{
continue;
}
return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
}
if let crate::hooks::step::HookControl::Break { reason } = bg_control {
return Ok(turn_outcome(&state, reason));
}
let (mut stream, attempt) = self.call_llm_with_retry(&req, &mut state).await?;
let outcome = self.drain_provider_stream(&mut stream, &mut state).await?;
if outcome.cancelled {
return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
}
self.events
.emit(AgentEvent::LlmCallFinished {
model: req.model.clone(),
attempt,
usage: outcome.usage,
error: None,
})
.await;
let stop_reason_for_hook = match outcome.stop {
LlmStopReason::EndTurn | LlmStopReason::StopSequence => AcpStopReason::EndTurn,
LlmStopReason::Refusal => AcpStopReason::Refusal,
LlmStopReason::MaxTokens => AcpStopReason::MaxTokens,
LlmStopReason::ToolUse => AcpStopReason::EndTurn,
};
let mut after_gen = crate::hooks::step::AfterGenerate {
model: req.model.clone(),
usage: outcome.usage,
stop: stop_reason_for_hook,
error: None,
};
let _ = self.hooks.dispatch(&mut after_gen, self.hook_ctx()).await;
if let Some(real_input) = real_input_tokens(&outcome.usage) {
self.history.record_input_tokens(real_input);
}
let assistant = assistant_message(&outcome);
if !assistant.content.is_empty() {
self.history.append(assistant);
}
match outcome.stop {
LlmStopReason::EndTurn | LlmStopReason::StopSequence => {
if self
.decide_turn_end(&mut state, AcpStopReason::EndTurn, true)
.await
{
continue;
}
return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
}
LlmStopReason::Refusal => {
return Ok(turn_outcome(&state, AcpStopReason::Refusal));
}
LlmStopReason::MaxTokens => {
return Ok(turn_outcome(&state, AcpStopReason::MaxTokens));
}
LlmStopReason::ToolUse => {}
}
if outcome.tool_uses.is_empty() {
if self
.decide_turn_end(&mut state, AcpStopReason::EndTurn, true)
.await
{
continue;
}
return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
}
for tu in &outcome.tool_uses {
let mut bp = crate::hooks::step::BeforePermission {
tool: tu.name.clone(),
decision: "ask".to_string(),
resolved: None,
};
let _ = self.hooks.dispatch(&mut bp, self.hook_ctx()).await;
}
let approved = match self.decide_permissions(&outcome.tool_uses).await? {
DecisionFlow::Continue(list) => list,
DecisionFlow::Cancelled => {
return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
}
};
for a in &approved {
let (tool, granted) = match a {
Approved::Run { .. } => (approved_tool_name(a), true),
Approved::Denied { .. } | Approved::FailedArgs { .. } => {
(approved_tool_name(a), false)
}
};
let mut ap = crate::hooks::step::AfterPermission { tool, granted };
let _ = self.hooks.dispatch(&mut ap, self.hook_ctx()).await;
}
let progressed = approved.iter().any(|a| matches!(a, Approved::Run { .. }));
if progressed {
state.note_progress();
}
let mut results = self.run_tools_concurrently(approved).await;
let rejected = reject_oversized_results(&mut results, self.effective_context_window());
if rejected > 0 {
tracing::warn!(
rejected,
"rejected oversized tool result(s) exceeding the context window"
);
}
let mut batch = crate::hooks::step::AfterToolBatch {
results: results
.iter()
.map(|r| crate::hooks::step::ToolBatchEntry {
tool_name: r.name.clone(),
is_error: r.is_error,
})
.collect(),
additional_context: Vec::new(),
};
let batch_control = self.hooks.dispatch(&mut batch, self.hook_ctx()).await;
self.history.append(tool_results_message(results));
if !batch.additional_context.is_empty() {
self.append_user_feedback(batch.additional_context);
}
if let crate::hooks::step::HookControl::Break { reason } = batch_control {
return Ok(turn_outcome(&state, reason));
}
if state.exceeded_request_cap() {
if self
.decide_turn_end(&mut state, AcpStopReason::MaxTurnRequests, false)
.await
{
continue;
}
return Ok(turn_outcome(&state, AcpStopReason::MaxTurnRequests));
}
}
}
fn build_request(&self) -> CompletionRequest {
let messages = sanitize::sanitize_tool_pairing(self.history.snapshot());
let req = CompletionRequest {
model: self.config.model.clone(),
system: self.system_prompt.clone(),
messages,
tools: self.tools.schemas(),
tool_choice: ToolChoice::Auto,
sampling: self.config.sampling.clone(),
hosted_capabilities: self.hosted_capabilities,
};
self.request_audit.record(&req);
req
}
async fn manage_context(&self) -> Result<(), TurnError> {
let thresholds = self.compact_thresholds();
if thresholds.is_empty() {
return Ok(());
}
let Some(estimate) = self.history.token_estimate() else {
return Ok(());
};
let estimate = if self.config.microcompact_enabled
&& thresholds.micro.is_some_and(|t| estimate >= t)
{
self.run_microcompact().await;
self.history.token_estimate().unwrap_or(estimate)
} else {
estimate
};
if self.config.background_compact_enabled
&& let (Some(soft), Some(hard)) = (thresholds.soft, thresholds.hard)
&& estimate >= soft
&& estimate < hard
{
self.spawn_background_compaction(hard).await;
return Ok(());
}
if let Some(hard) = thresholds.hard
&& estimate >= hard
{
self.compact_hard(estimate, hard).await?;
}
Ok(())
}
async fn run_microcompact(&self) {
let messages = self.history.snapshot();
let Some((rebuilt, report)) = microcompact::run(&messages) else {
return;
};
self.history.replace(rebuilt);
tracing::info!(
cleared = report.cleared,
tokens_before = report.tokens_before,
tokens_after = report.tokens_after,
"context microcompacted"
);
self.events
.emit(AgentEvent::ContextMicrocompacted {
tokens_before: report.tokens_before,
tokens_after: report.tokens_after,
cleared: report.cleared,
})
.await;
}
async fn spawn_background_compaction(&self, hard_threshold: u64) {
let (Some(slot), Some(history_arc), Some(provider_arc)) = (
self.compaction_slot.as_ref(),
self.history_arc.as_ref(),
self.provider_arc.as_ref(),
) else {
return;
};
if slot.is_in_flight() {
return; }
let cancel = self
.session_cancel
.clone()
.unwrap_or_else(|| self.cancel.clone())
.child_token();
let ctx = compact::CompactionCtx {
provider: provider_arc.clone(),
model: self.config.model.clone(),
sampling: self.config.sampling.clone(),
tools: self.tools.schemas(),
cancel,
};
let events = self.events.clone();
let on_done: Arc<
dyn Fn(crate::session::CompactionReport) -> futures::future::BoxFuture<'static, ()>
+ Send
+ Sync,
> = Arc::new(move |report| {
let events = events.clone();
Box::pin(async move {
events
.emit(AgentEvent::ContextCompressed {
tokens_before: report.tokens_before,
tokens_after: report.tokens_after,
})
.await;
})
});
let started = slot.try_spawn(history_arc.clone(), ctx, hard_threshold, on_done);
if started {
tracing::info!(hard_threshold, "background compaction started");
}
}
async fn compact_hard(&self, estimate: u64, hard: u64) -> Result<(), TurnError> {
if let Some(slot) = self.compaction_slot.as_ref()
&& slot.is_in_flight()
{
slot.await_in_flight().await;
return Ok(());
}
let mut before = crate::hooks::step::BeforeCompact {
token_estimate: estimate,
threshold: hard,
};
if let crate::hooks::step::HookControl::Skip =
self.hooks.dispatch(&mut before, self.hook_ctx()).await
{
tracing::info!("compaction vetoed by before-compact hook");
return Ok(());
}
let ctx = self.sync_compaction_ctx();
let Some(report) = compact::run_sync(self.history, &ctx, hard).await else {
return Ok(());
};
self.events
.emit(AgentEvent::ContextCompressed {
tokens_before: report.tokens_before,
tokens_after: report.tokens_after,
})
.await;
let mut after = crate::hooks::step::AfterCompact {
tokens_before: report.tokens_before,
tokens_after: report.tokens_after,
additional_context: Vec::new(),
};
let _ = self.hooks.dispatch(&mut after, self.hook_ctx()).await;
if !after.additional_context.is_empty() {
self.append_user_feedback(after.additional_context);
}
Ok(())
}
fn sync_compaction_ctx(&self) -> compact::CompactionCtx {
compact::CompactionCtx {
provider: self
.provider_arc
.clone()
.expect("sync compaction requires provider_arc"),
model: self.config.model.clone(),
sampling: self.config.sampling.clone(),
tools: self.tools.schemas(),
cancel: self.cancel.clone(),
}
}
fn context_window(&self) -> Option<u64> {
self.provider
.model_info(&self.config.model)
.and_then(|m| m.context_window)
}
fn effective_context_window(&self) -> Option<u64> {
if let Some(window) = self.context_window() {
return Some(window);
}
if self.config.compact_threshold_tokens.is_some() {
return None;
}
warn_missing_context_window(&self.config.model);
Some(FALLBACK_CONTEXT_WINDOW)
}
fn compact_thresholds(&self) -> CompactThresholds {
let window = self.effective_context_window();
let hard = self.config.compact_threshold_tokens.or_else(|| {
let ratio = self.config.compact_ratio?;
ratio_threshold(window?, ratio)
});
let from_ratio =
|ratio: Option<f64>| ratio.and_then(|r| window.and_then(|w| ratio_threshold(w, r)));
CompactThresholds {
micro: from_ratio(self.config.microcompact_ratio),
soft: from_ratio(self.config.compact_soft_ratio),
hard,
}
}
pub(super) fn hook_ctx(&self) -> HookCtx<'_> {
HookCtx::new(self.session_id, self.cwd, self.cancel.clone())
}
}
#[derive(Clone, Copy)]
struct TurnOutcome {
reason: AcpStopReason,
usage: Usage,
}
#[derive(Clone, Copy)]
struct CompactThresholds {
micro: Option<u64>,
soft: Option<u64>,
hard: Option<u64>,
}
impl CompactThresholds {
fn is_empty(&self) -> bool {
self.micro.is_none() && self.soft.is_none() && self.hard.is_none()
}
}
fn ratio_threshold(context_window: u64, ratio: f64) -> Option<u64> {
let threshold = (context_window as f64 * ratio).floor() as u64;
(threshold > 0).then_some(threshold)
}
const FALLBACK_CONTEXT_WINDOW: u64 = 128_000;
fn warn_missing_context_window(model: &str) {
use std::collections::HashSet;
use std::sync::Mutex;
use std::sync::OnceLock;
static WARNED: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
let mut warned = WARNED
.get_or_init(|| Mutex::new(HashSet::new()))
.lock()
.expect("warn-once mutex poisoned");
if warned.insert(model.to_string()) {
tracing::warn!(
model,
fallback = FALLBACK_CONTEXT_WINDOW,
"model exposes no context_window; assuming a conservative fallback for compaction. \
Declare the real value via the model's `context_window` in config or set \
`[turn].compact_threshold_tokens` to silence this."
);
}
}
pub(crate) const DEFAULT_MAX_HOOK_CONTINUES: u32 = 3;
pub(crate) const DEFAULT_SUBAGENT_MAX_DEPTH: u32 = 1;
struct TurnState {
request_count: u32,
usage: Usage,
cap: Option<u32>,
expand_on_progress: bool,
stop_hook_continues: u32,
max_stop_hook_continues: u32,
}
impl TurnState {
fn new(limit: TurnRequestLimit, max_hook_continues: u32) -> Self {
Self {
request_count: 0,
usage: Usage::default(),
cap: limit.initial_cap(),
expand_on_progress: limit.expand_on_progress(),
stop_hook_continues: 0,
max_stop_hook_continues: max_hook_continues,
}
}
fn note_progress(&mut self) {
if self.expand_on_progress
&& let Some(cap) = self.cap.as_mut()
{
*cap = cap.saturating_add(1);
}
}
fn reset_request_budget(&mut self, limit: TurnRequestLimit) {
self.request_count = 0;
self.cap = limit.initial_cap();
}
fn exceeded_request_cap(&self) -> bool {
match self.cap {
None => false,
Some(cap) => self.request_count >= cap,
}
}
fn may_stop_hook_continue(&self) -> bool {
self.stop_hook_continues < self.max_stop_hook_continues
}
fn note_stop_hook_continue(&mut self) {
self.stop_hook_continues = self.stop_hook_continues.saturating_add(1);
}
}
fn turn_outcome(state: &TurnState, reason: AcpStopReason) -> TurnOutcome {
TurnOutcome {
reason,
usage: state.usage,
}
}
#[cfg(test)]
mod tests;