defect_agent/session/turn.rs
1//! Turn main loop.
2//!
3//! Turn main loop — the "heart" of the agent. This file implements the state machine.
4//!
5//! Key dependencies:
6//! - [`History`]: read/write message history
7//! - [`ToolRegistry`]: tool lookup
8//! - [`LlmProvider`]: LLM invocation
9//! - [`EventEmitter`]: event emission (shared via `Arc` so tool tasks can also emit)
10//! - [`PermissionGate`]: wait for permission requests
11
12use std::path::PathBuf;
13use std::sync::Arc;
14
15use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason as AcpStopReason};
16use tokio_util::sync::CancellationToken;
17
18use crate::event::AgentEvent;
19use crate::fs::FsBackend;
20use crate::hooks::{HookCtx, HookEngine};
21use crate::http::HttpClient;
22use crate::llm::{
23 CompletionRequest, HostedCapabilities, LlmProvider, Message, MessageContent, Role,
24 SamplingParams, StopReason as LlmStopReason, ToolChoice, Usage,
25};
26use crate::policy::SandboxPolicy;
27use crate::session::events::EventEmitter;
28use crate::session::permissions::PermissionGate;
29use crate::session::{History, ToolRegistry, TurnError};
30use crate::shell::ShellBackend;
31
32const DEFAULT_PROMPT_FILE: &str = "AGENTS.md";
33
34mod request_audit;
35
36mod compact;
37
38mod microcompact;
39
40mod compaction_slot;
41
42pub use compaction_slot::CompactionSlot;
43
44mod sanitize;
45
46mod content;
47
48mod llm_drive;
49
50mod tools;
51
52mod hooks;
53
54use content::content_block_to_message_content;
55use hooks::UserPromptHookFlow;
56use llm_drive::{assistant_message, real_input_tokens};
57use tools::{
58 Approved, DecisionFlow, approved_tool_name, reject_oversized_results, tool_results_message,
59};
60
61pub(crate) use request_audit::RequestAuditTracker;
62// Out-of-band `/compact` slash command reuses the same synchronous compaction primitive
63// as the turn loop's hard-watermark fallback, so the two share boundary selection and
64// summary logic instead of forking a second compaction path.
65pub(crate) use compact::{CompactionCtx, run_sync as run_sync_compaction};
66
67/// Strategy for capping LLM calls.
68#[derive(Debug, Clone, Copy)]
69pub enum TurnRequestLimit {
70 /// No upper limit.
71 Unbounded,
72 /// Fixed limit: returns [`AcpStopReason::MaxTurnRequests`] after N turns.
73 Fixed(u32),
74 /// Adaptive: each time a tool use is approved and executed in the current turn, that
75 /// counts as progress and the limit is automatically incremented by 1; otherwise,
76 /// termination follows [`Self::Fixed`].
77 Adaptive {
78 initial: u32,
79 expand_on_progress: bool,
80 },
81}
82
83impl TurnRequestLimit {
84 fn initial_cap(&self) -> Option<u32> {
85 match *self {
86 Self::Unbounded => None,
87 Self::Fixed(n) => Some(n),
88 Self::Adaptive { initial, .. } => Some(initial),
89 }
90 }
91
92 fn expand_on_progress(&self) -> bool {
93 matches!(
94 self,
95 Self::Adaptive {
96 expand_on_progress: true,
97 ..
98 }
99 )
100 }
101}
102
103/// Configuration for a turn.
104#[derive(Debug, Clone)]
105pub struct TurnConfig {
106 /// The selected provider vendor (the provider half of the selection key). Together
107 /// with [`Self::model`], this is used to resolve the actual provider entry in the
108 /// registry by the `(vendor, model)` pair.
109 pub provider: String,
110 pub model: String,
111 pub allowed_models: Option<Vec<String>>,
112 pub base_prompt: BasePromptConfig,
113 pub system_prompt: Option<String>,
114 pub prompt: PromptConfig,
115 pub sampling: SamplingParams,
116 pub request_limit: TurnRequestLimit,
117 /// Explicit absolute override for the compaction threshold (in tokens). When `Some`,
118 /// takes precedence over the value inferred from [`Self::compact_ratio`]. When
119 /// `None`, the threshold is automatically derived from the ratio.
120 pub compact_threshold_tokens: Option<u64>,
121 /// Compression threshold as a fraction of the model's `context_window` (e.g. `0.85` =
122 /// trigger when usage exceeds 85%). This is the **hard watermark**: when reached, if
123 /// no background compaction is in flight, the turn main loop performs **synchronous**
124 /// compaction as a fallback (blocking the current turn but guaranteeing the context
125 /// is not exceeded). `None` = no automatic ratio-based compression (and if no
126 /// absolute threshold is set either, no compression occurs for this turn). Only
127 /// effective when `compact_threshold_tokens` is `None` and the model exposes a
128 /// `context_window`. See `session/turn/compact.rs` for details.
129 pub compact_ratio: Option<f64>,
130 /// **Background full compaction** toggle. When `true`, once the soft watermark
131 /// derived from [`Self::compact_soft_ratio`] is exceeded, a summarization compaction
132 /// is started **asynchronously** (without blocking the current turn); it quietly
133 /// compresses history before the hard watermark is hit. When disabled, only
134 /// synchronous compaction at the hard watermark remains. See
135 /// `session/turn/compaction_slot.rs`.
136 pub background_compact_enabled: bool,
137 /// Soft watermark for background compaction, as a fraction of `context_window`
138 /// (default `0.7`). Must be less than `compact_ratio` (the hard watermark) to leave a
139 /// window between soft and hard for background summarization to complete. Only
140 /// effective when `background_compact_enabled` is set and a threshold can be derived.
141 pub compact_soft_ratio: Option<f64>,
142 /// Enables **micro‑compaction**. When `true`, each turn first runs a micro‑compaction
143 /// (cleans oversized `tool_result` in older turns, without calling the LLM or
144 /// deleting messages) at the water level above [`Self::microcompact_ratio`],
145 /// deferring expensive full compaction. See `session/turn/microcompact.rs`.
146 pub microcompact_enabled: bool,
147 /// Micro‑compaction watermark as a fraction of `context_window` (default `0.6`).
148 /// Typically below the soft watermark — micro‑compaction is the cheapest first line
149 /// of defense. Only effective when `microcompact_enabled` and a threshold can be
150 /// derived.
151 pub microcompact_ratio: Option<f64>,
152 pub max_llm_retries: u32,
153 /// `0` = unlimited. The default is unlimited.
154 pub max_concurrent_tools: usize,
155 /// Hard upper limit on forced continuations from the `before turn-end` hook —
156 /// prevents infinite loops from repeated hook `Continue` calls. Default: 3.
157 pub max_hook_continues: u32,
158 /// Maximum subagent nesting depth (vertical recursion limit) for this turn.
159 /// `spawn_agent` uses this to pass the "remaining depth" to tools via
160 /// [`crate::tool::ToolContext::subagent_depth`]; a child agent's nested turn receives
161 /// `subagent_max_depth` = parent's remaining depth minus one. `0` means this turn's
162 /// tool set contains no `spawn_agent`, structurally forbidding dispatch — replacing
163 /// the old hardcoded gate of "whitelist never contains `spawn_agent`". Default:
164 /// `DEFAULT_SUBAGENT_MAX_DEPTH`.
165 pub subagent_max_depth: u32,
166}
167
168impl Default for TurnConfig {
169 fn default() -> Self {
170 Self {
171 provider: String::new(),
172 model: String::new(),
173 allowed_models: None,
174 base_prompt: BasePromptConfig::default(),
175 system_prompt: None,
176 prompt: PromptConfig::default(),
177 sampling: SamplingParams::default(),
178 request_limit: TurnRequestLimit::Adaptive {
179 initial: 32,
180 expand_on_progress: true,
181 },
182 compact_threshold_tokens: None,
183 // Trigger compaction at 85% of `context_window` (hard watermark), reserving
184 // ~15% for summary output and headroom — within the reasonable range of codex
185 // (90%), Claude (~93%), and opencode (window-20k).
186 compact_ratio: Some(0.85),
187 // Background compaction enabled by default: starts async summarization at
188 // soft=0.7, aiming to finish before hard=0.85 is reached.
189 background_compact_enabled: true,
190 compact_soft_ratio: Some(0.7),
191 // Micro‑compaction enabled by default: at 0.6 it evicts old large
192 // `tool_result`s — the cheapest first line of defense.
193 microcompact_enabled: true,
194 microcompact_ratio: Some(0.6),
195 max_llm_retries: 3,
196 max_concurrent_tools: 0,
197 max_hook_continues: DEFAULT_MAX_HOOK_CONTINUES,
198 subagent_max_depth: DEFAULT_SUBAGENT_MAX_DEPTH,
199 }
200 }
201}
202
203#[derive(Debug, Clone, PartialEq, Eq, Default)]
204pub struct BasePromptConfig {
205 pub file: Option<PathBuf>,
206 pub text: Option<String>,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct PromptConfig {
211 pub file: String,
212 pub text: Option<String>,
213 pub provider_overlays: std::collections::BTreeMap<String, String>,
214 pub model_overlays: std::collections::BTreeMap<String, String>,
215}
216
217impl Default for PromptConfig {
218 fn default() -> Self {
219 Self {
220 file: DEFAULT_PROMPT_FILE.to_owned(),
221 text: None,
222 provider_overlays: std::collections::BTreeMap::new(),
223 model_overlays: std::collections::BTreeMap::new(),
224 }
225 }
226}
227/// All dependencies and accumulated state for a single turn execution.
228///
229/// This struct is constructed by [`crate::session::DefaultSession`] on each `run_turn`
230/// call,
231/// borrowing sub-components of the session and being dropped after the turn completes.
232pub struct TurnRunner<'a> {
233 pub history: &'a dyn History,
234 pub tools: &'a dyn ToolRegistry,
235 /// The same session tool pool as [`Self::tools`], but as an owned `Arc` so it can be
236 /// injected into [`crate::tool::ToolContext`] (which needs `'static`/owned). Carries
237 /// the fully assembled composite (built-in + MCP) so `spawn_agent` can build a child
238 /// agent's tool subset that includes `mcp__*` tools. `None` in legacy/test runners
239 /// that only set the borrow.
240 pub session_tools: Option<Arc<dyn ToolRegistry>>,
241 pub provider: &'a dyn LlmProvider,
242 /// The active policy for this turn's snapshot. Owned as an `Arc` rather than
243 /// borrowed: it flows with [`crate::tool::ToolContext`] into `spawn_agent`, where
244 /// child agents wrap it with
245 /// [`NonInteractivePolicy`](crate::policy::NonInteractivePolicy) — must be the
246 /// parent's actual policy at this moment.
247 pub policy: Arc<dyn SandboxPolicy>,
248 pub events: Arc<EventEmitter>,
249 pub permissions: &'a PermissionGate,
250 pub cancel: CancellationToken,
251 pub config: &'a TurnConfig,
252 /// The same turn config as [`Self::config`], as an owned `Arc` for injection into
253 /// [`crate::tool::ToolContext`] so `spawn_agent` can let a child agent inherit the
254 /// parent's turn settings. `None` in legacy/test runners that only set the borrow.
255 pub config_arc: Option<Arc<TurnConfig>>,
256 /// The system prompt resolved for this turn. `Arc<str>`: each `build_request` call
257 /// `clone`s it into `CompletionRequest.system`; the `Arc` reduces this to a reference
258 /// count increment.
259 pub system_prompt: Option<Arc<str>>,
260 pub cwd: &'a std::path::Path,
261 pub fs: Arc<dyn FsBackend>,
262 pub shell: Arc<dyn ShellBackend>,
263 pub http: Arc<dyn HttpClient>,
264 /// Hosted capabilities determined at session startup.
265 /// Reused directly on each turn when assembling requests, without re-querying.
266 pub hosted_capabilities: HostedCapabilities,
267 /// Hook engine. The turn main loop emits Sync events at four points
268 /// (`UserPromptSubmit` / `PreToolUse` / `PostToolUse` / `PostToolUseFailure`).
269 /// Waits for hooks to finish before proceeding.
270 pub hooks: &'a dyn HookEngine,
271 /// Current session ID. Injected into `HookCtx` so that hook handlers can route or
272 /// audit by session.
273 pub session_id: &'a SessionId,
274 /// Session-level background task handle. When `Some`, enables the tool's
275 /// `run_in_background` capability (injected into tools via
276 /// [`crate::tool::ToolContext::background`]); nested sub-agent turns receive `None`,
277 /// structurally preventing background task self-replication.
278 pub background: Option<crate::session::BackgroundTasks>,
279 /// Shared state for the `--goal` goal-driven loop. When `Some`, this session is
280 /// running in goal mode: it is injected into the `goal_done` tool via
281 /// [`crate::tool::ToolContext::goal`], and the `goal-gate` hook uses it in
282 /// `before_turn_end` to allow or extend the session. `None` = non-goal mode
283 /// (default).
284 pub goal: Option<Arc<crate::session::GoalState>>,
285 /// Session-level single-flight compaction slot. When `Some`, background full
286 /// compaction is available — exceeding the soft watermark triggers an async summary
287 /// compaction without blocking the current turn. Nested sub-agent turns pass `None`
288 /// (sub-agent contexts are short-lived and should not spawn background tasks).
289 /// Requires `Arc<dyn History>`/`Arc<dyn LlmProvider>` for the task to hold `'static`
290 /// references across turns, hence the accompanying `history_arc`/`provider_arc`.
291 pub compaction_slot: Option<crate::session::CompactionSlot>,
292 /// `Arc<dyn History>` for `compaction_slot` (points to the same object as the
293 /// `history` borrow). Held by the background compaction task across turns. When
294 /// `None`, background compaction is unavailable and falls back to synchronous
295 /// compaction only.
296 pub history_arc: Option<Arc<dyn History>>,
297 /// `Arc` for the provider used by `compaction_slot`. Same as above.
298 pub provider_arc: Option<Arc<dyn LlmProvider>>,
299 /// Session-level cancellation token; the background compaction task's cancellation
300 /// token is derived from this one (independent of the turn's cancel, cancelled when
301 /// the session ends). When `None`, background compaction uses the turn's `cancel`
302 /// (sub-agent path).
303 pub session_cancel: Option<CancellationToken>,
304 /// The ingestion source for this turn's input — determines the `source` field of the
305 /// `before_ingest` step envelope.
306 /// User turns use `User`; background continuation turns started by the session driver
307 /// use `Background`.
308 pub ingest_source: crate::hooks::step::IngestSource,
309 /// Request stability diagnostics: compares snapshots of two consecutive requests
310 /// actually sent to the provider, helping locate sources of high volatility in low
311 /// prompt cache hit rates.
312 pub(crate) request_audit: &'a RequestAuditTracker,
313}
314
315impl<'a> TurnRunner<'a> {
316 /// Runs a single turn.
317 pub async fn run(&self, prompt: Vec<ContentBlock>) -> Result<AcpStopReason, TurnError> {
318 // ① UserPromptSubmit hook (sync interception)
319 // Gives hooks a chance to rewrite or intercept the prompt before it lands in
320 // history.
321 let prompt = match self.fire_user_prompt_submit(prompt).await {
322 UserPromptHookFlow::Continue(p) => p,
323 UserPromptHookFlow::Refused => {
324 // Hook blocked: do not emit `UserPromptCommitted`, do not append to
325 // history; return `Refusal` directly so the ACP bridge responds with
326 // `PromptResponse`.
327 return Ok(AcpStopReason::Refusal);
328 }
329 };
330
331 // Rollback boundary: history length before this turn appends anything. If the turn
332 // fails permanently, everything appended from here on (the user prompt, hook
333 // feedback, partial assistant/tool messages) is truncated away, so a failed turn
334 // leaves no orphan in history to be replayed on reload or re-sent next request.
335 let rollback_len = self.history.len();
336
337 self.events
338 .emit(AgentEvent::UserPromptCommitted {
339 content: prompt.clone(),
340 })
341 .await;
342 self.history.append(Message {
343 role: Role::User,
344 content: prompt
345 .into_iter()
346 .map(content_block_to_message_content)
347 .collect::<Result<Vec<_>, _>>()?
348 .into_iter()
349 .flatten()
350 .collect(),
351 });
352
353 // After Ingest hook: input has been merged into history. Injection only.
354 {
355 let mut step = crate::hooks::step::AfterIngest {
356 committed_len: 1,
357 additional_context: Vec::new(),
358 };
359 let _ = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
360 if !step.additional_context.is_empty() {
361 self.append_user_feedback(step.additional_context);
362 }
363 }
364
365 self.events.emit(AgentEvent::TurnStarted).await;
366
367 // After-turn-enter hook: the turn scope has been entered. Allows injecting system
368 // context or a Break to reject the turn.
369 // Note: currently the hook point is placed after prompt ingestion (moving the hook
370 // point earlier is a deferred adjustment).
371 {
372 let mut step = crate::hooks::step::AfterTurnEnter {
373 is_subagent: false,
374 agent_type: None,
375 additional_context: Vec::new(),
376 };
377 let control = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
378 if !step.additional_context.is_empty() {
379 self.append_user_feedback(step.additional_context);
380 }
381 if let crate::hooks::step::HookControl::Break { .. } = control {
382 return Ok(AcpStopReason::EndTurn);
383 }
384 }
385
386 let result = self.run_inner().await;
387
388 match &result {
389 Ok(outcome) => {
390 self.events
391 .emit(AgentEvent::TurnEnded {
392 reason: outcome.reason,
393 usage: outcome.usage,
394 })
395 .await;
396 }
397 Err(_) => {
398 // Permanent failure (e.g. provider error after retries). Roll history back
399 // to the pre-turn boundary so the orphan user prompt does not linger; the
400 // bridge layer decides the wire response. `TurnAborted` tells storage to
401 // drop the same tail it already journaled (the `UserPromptCommitted` /
402 // `TurnStarted` records emitted before the failure), keeping the in-memory
403 // and persisted histories consistent.
404 self.history.truncate(rollback_len);
405 self.events.emit(AgentEvent::TurnAborted).await;
406 }
407 }
408 // The `Err` path does not emit `TurnEnded`; the bridge layer decides the wire
409 // response based on the future outcome.
410
411 result.map(|outcome| outcome.reason)
412 }
413
414 async fn run_inner(&self) -> Result<TurnOutcome, TurnError> {
415 let mut state = TurnState::new(self.config.request_limit, self.config.max_hook_continues);
416 loop {
417 if self.cancel.is_cancelled() {
418 return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
419 }
420
421 self.manage_context().await?;
422
423 let mut req = self.build_request();
424
425 // Before Generate hook: can modify request (model), short-circuit (fill in
426 // synthetic assistant to skip LLM), or Break.
427 let mut before_gen = crate::hooks::step::BeforeGenerate {
428 model: req.model.clone(),
429 message_count: req.messages.len(),
430 attempt: state.request_count.saturating_add(1),
431 assistant_text: None,
432 };
433 let bg_control = self.hooks.dispatch(&mut before_gen, self.hook_ctx()).await;
434 req.model = before_gen.model;
435 if let Some(text) = before_gen.assistant_text {
436 // Short-circuit: use a synthetic assistant reply to skip the real LLM
437 // call, then proceed to the before-turn-end check.
438 self.history.append(Message {
439 role: Role::Assistant,
440 content: vec![MessageContent::Text { text }].into(),
441 });
442 if self
443 .decide_turn_end(&mut state, AcpStopReason::EndTurn, true)
444 .await
445 {
446 continue;
447 }
448 return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
449 }
450 if let crate::hooks::step::HookControl::Break { reason } = bg_control {
451 return Ok(turn_outcome(&state, reason));
452 }
453
454 let (mut stream, attempt) = self.call_llm_with_retry(&req, &mut state).await?;
455
456 let outcome = self.drain_provider_stream(&mut stream, &mut state).await?;
457
458 if outcome.cancelled {
459 return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
460 }
461
462 // The stream has been drained and all usage for this call is available — emit
463 // `LlmCallFinished` with the **per-call** actual usage (`outcome.usage`, not
464 // the turn-accumulated `state.usage`).
465 self.events
466 .emit(AgentEvent::LlmCallFinished {
467 model: req.model.clone(),
468 attempt,
469 usage: outcome.usage,
470 error: None,
471 })
472 .await;
473
474 // After the Generate hook: observe (usage / stop / error). No output to fill;
475 // to intervene, route the next turn through before-turn-end.
476 let stop_reason_for_hook = match outcome.stop {
477 LlmStopReason::EndTurn | LlmStopReason::StopSequence => AcpStopReason::EndTurn,
478 LlmStopReason::Refusal => AcpStopReason::Refusal,
479 LlmStopReason::MaxTokens => AcpStopReason::MaxTokens,
480 LlmStopReason::ToolUse => AcpStopReason::EndTurn,
481 };
482 let mut after_gen = crate::hooks::step::AfterGenerate {
483 model: req.model.clone(),
484 usage: outcome.usage,
485 stop: stop_reason_for_hook,
486 error: None,
487 };
488 let _ = self.hooks.dispatch(&mut after_gen, self.hook_ctx()).await;
489
490 // Feed the actual input token count returned by this call into `history` as
491 // the precise baseline for compaction threshold decisions (see
492 // `session/turn/compact.rs`). The messages sent in this call are
493 // `req.messages`, and their real input size is the sum of the three
494 // input-side fields in `outcome.usage`.
495 if let Some(real_input) = real_input_tokens(&outcome.usage) {
496 self.history.record_input_tokens(real_input);
497 }
498
499 let assistant = assistant_message(&outcome);
500 if !assistant.content.is_empty() {
501 self.history.append(assistant);
502 }
503
504 // Passive stop (Refusal / MaxTokens): skip the before-turn-end hook (the hook
505 // cannot extend these), exit directly.
506 match outcome.stop {
507 LlmStopReason::EndTurn | LlmStopReason::StopSequence => {
508 // Voluntary stop → before-turn-end decision point.
509 if self
510 .decide_turn_end(&mut state, AcpStopReason::EndTurn, true)
511 .await
512 {
513 continue;
514 }
515 return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
516 }
517 LlmStopReason::Refusal => {
518 return Ok(turn_outcome(&state, AcpStopReason::Refusal));
519 }
520 LlmStopReason::MaxTokens => {
521 return Ok(turn_outcome(&state, AcpStopReason::MaxTokens));
522 }
523 LlmStopReason::ToolUse => {}
524 }
525
526 if outcome.tool_uses.is_empty() {
527 // Voluntary stop (no tool requested) → same before-turn-end decision
528 // point.
529 if self
530 .decide_turn_end(&mut state, AcpStopReason::EndTurn, true)
531 .await
532 {
533 continue;
534 }
535 return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
536 }
537
538 // Before the Permission hook (currently only observes/stubs; policy still
539 // delegates to the authority).
540 for tu in &outcome.tool_uses {
541 let mut bp = crate::hooks::step::BeforePermission {
542 tool: tu.name.clone(),
543 decision: "ask".to_string(),
544 resolved: None,
545 };
546 let _ = self.hooks.dispatch(&mut bp, self.hook_ctx()).await;
547 }
548
549 let approved = match self.decide_permissions(&outcome.tool_uses).await? {
550 DecisionFlow::Continue(list) => list,
551 DecisionFlow::Cancelled => {
552 return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
553 }
554 };
555
556 // After permission hook (currently only observes/stubs).
557 for a in &approved {
558 let (tool, granted) = match a {
559 Approved::Run { .. } => (approved_tool_name(a), true),
560 Approved::Denied { .. } | Approved::FailedArgs { .. } => {
561 (approved_tool_name(a), false)
562 }
563 };
564 let mut ap = crate::hooks::step::AfterPermission { tool, granted };
565 let _ = self.hooks.dispatch(&mut ap, self.hook_ctx()).await;
566 }
567
568 let progressed = approved.iter().any(|a| matches!(a, Approved::Run { .. }));
569 if progressed {
570 state.note_progress();
571 }
572
573 let mut results = self.run_tools_concurrently(approved).await;
574
575 // Reject any single tool result that exceeds the model's context window: it can
576 // never fit, so appending it as-is would only blow up the next request. Replace
577 // it with an actionable error before it enters history. See
578 // `reject_oversized_results`.
579 let rejected = reject_oversized_results(&mut results, self.effective_context_window());
580 if rejected > 0 {
581 tracing::warn!(
582 rejected,
583 "rejected oversized tool result(s) exceeding the context window"
584 );
585 }
586
587 // After `ToolBatch` hook: after all parallel tools finish, before the next
588 // LLM call. Allows injection or graceful break.
589 let mut batch = crate::hooks::step::AfterToolBatch {
590 results: results
591 .iter()
592 .map(|r| crate::hooks::step::ToolBatchEntry {
593 tool_name: r.name.clone(),
594 is_error: r.is_error,
595 })
596 .collect(),
597 additional_context: Vec::new(),
598 };
599 let batch_control = self.hooks.dispatch(&mut batch, self.hook_ctx()).await;
600
601 self.history.append(tool_results_message(results));
602 if !batch.additional_context.is_empty() {
603 self.append_user_feedback(batch.additional_context);
604 }
605 if let crate::hooks::step::HookControl::Break { reason } = batch_control {
606 return Ok(turn_outcome(&state, reason));
607 }
608
609 if state.exceeded_request_cap() {
610 // Hitting the per-turn request cap is an involuntary stop. Still consult
611 // the before-turn-end hook: in goal mode the goal gate decides whether to
612 // keep working (resetting the request budget for the next round, bounded
613 // by `max_hook_continues`). Without a continuing hook, the turn stops with
614 // `MaxTurnRequests` as before.
615 if self
616 .decide_turn_end(&mut state, AcpStopReason::MaxTurnRequests, false)
617 .await
618 {
619 continue;
620 }
621 return Ok(turn_outcome(&state, AcpStopReason::MaxTurnRequests));
622 }
623 }
624 }
625
626 fn build_request(&self) -> CompletionRequest {
627 // Before sending the request, pair any orphaned `tool_use` (left over from an
628 // interruption, with no matching `tool_result`) — otherwise the provider will
629 // permanently reject the request. Only patch the copy sent to the provider; the
630 // true history remains untouched. See `sanitize`.
631 let messages = sanitize::sanitize_tool_pairing(self.history.snapshot());
632 let req = CompletionRequest {
633 model: self.config.model.clone(),
634 system: self.system_prompt.clone(),
635 messages,
636 tools: self.tools.schemas(),
637 tool_choice: ToolChoice::Auto,
638 sampling: self.config.sampling.clone(),
639 hosted_capabilities: self.hosted_capabilities,
640 };
641 self.request_audit.record(&req);
642 req
643 }
644
645 /// Layered context management: micro → soft (background) → hard (synchronous
646 /// fallback). Called at the start of each main loop iteration.
647 ///
648 /// Three water levels (see `compact_thresholds` for details):
649 /// 1. **micro** (default 0.6·window): if micro-compaction is enabled, first evict old
650 /// large `tool_result` entries — no LLM calls, no message deletion, near-zero
651 /// latency, deferring expensive full compaction.
652 /// 2. **soft** (default 0.7·window): if background compaction is enabled,
653 /// **asynchronously** start a summary compaction; the turn does not block, quietly
654 /// compacting before hitting hard (single-flight, won't re-start if one is already
655 /// in flight).
656 /// 3. **hard** (default 0.85·window, equivalent to the old `compact_ratio`
657 /// semantics): at this level compaction is mandatory — if a background compaction
658 /// is already in flight, `await` its completion; otherwise compact
659 /// **synchronously** as a fallback.
660 ///
661 /// micro/soft require the model to expose `context_window`; hard also supports an
662 /// absolute override via `compact_threshold_tokens`. If any level cannot obtain its
663 /// threshold, that level is skipped (preserving the conservative "no information, no
664 /// compaction" semantics).
665 async fn manage_context(&self) -> Result<(), TurnError> {
666 let thresholds = self.compact_thresholds();
667 // All three thresholds absent → no proactive compaction this turn (preserves the
668 // existing semantics).
669 if thresholds.is_empty() {
670 return Ok(());
671 }
672 let Some(estimate) = self.history.token_estimate() else {
673 return Ok(());
674 };
675
676 // ① micro: synchronous, cheapest. May reduce `estimate` below soft/hard
677 // thresholds, so re-fetch after compaction.
678 let estimate = if self.config.microcompact_enabled
679 && thresholds.micro.is_some_and(|t| estimate >= t)
680 {
681 self.run_microcompact().await;
682 self.history.token_estimate().unwrap_or(estimate)
683 } else {
684 estimate
685 };
686
687 // ② soft: crossing the threshold triggers an async background compaction without
688 // blocking the current round.
689 if self.config.background_compact_enabled
690 && let (Some(soft), Some(hard)) = (thresholds.soft, thresholds.hard)
691 && estimate >= soft
692 && estimate < hard
693 {
694 self.spawn_background_compaction(hard).await;
695 // Non-blocking – continue assembling requests this round; summary persistence
696 // happens in a later round (or later).
697 return Ok(());
698 }
699
700 // ③ hard: must compact.
701 if let Some(hard) = thresholds.hard
702 && estimate >= hard
703 {
704 self.compact_hard(estimate, hard).await?;
705 }
706 Ok(())
707 }
708
709 /// Run a micro-compact and write back via `replace`. Best-effort: does nothing if
710 /// there is nothing to clean up.
711 async fn run_microcompact(&self) {
712 let messages = self.history.snapshot();
713 let Some((rebuilt, report)) = microcompact::run(&messages) else {
714 return;
715 };
716 self.history.replace(rebuilt);
717 tracing::info!(
718 cleared = report.cleared,
719 tokens_before = report.tokens_before,
720 tokens_after = report.tokens_after,
721 "context microcompacted"
722 );
723 self.events
724 .emit(AgentEvent::ContextMicrocompacted {
725 tokens_before: report.tokens_before,
726 tokens_after: report.tokens_after,
727 cleared: report.cleared,
728 })
729 .await;
730 }
731
732 /// Spawns a single-flight background full compaction when the soft threshold is
733 /// exceeded. Requires a slot and `Arc` references to history and provider (only
734 /// available in the top-level turn; child agent turns silently skip this, leaving it
735 /// to the synchronous compaction at the hard threshold).
736 async fn spawn_background_compaction(&self, hard_threshold: u64) {
737 let (Some(slot), Some(history_arc), Some(provider_arc)) = (
738 self.compaction_slot.as_ref(),
739 self.history_arc.as_ref(),
740 self.provider_arc.as_ref(),
741 ) else {
742 return;
743 };
744 if slot.is_in_flight() {
745 return; // Single-flight: a compaction is already in flight, do not start another.
746 }
747
748 // The compaction cancel token is independent of the turn cancel — the summary
749 // should be allowed to finish even if the originating turn has ended; however, it
750 // is cancelled when the session ends. For sub-agent paths that have no
751 // `session_cancel`, it falls back to the turn cancel.
752 let cancel = self
753 .session_cancel
754 .clone()
755 .unwrap_or_else(|| self.cancel.clone())
756 .child_token();
757 let ctx = compact::CompactionCtx {
758 provider: provider_arc.clone(),
759 model: self.config.model.clone(),
760 sampling: self.config.sampling.clone(),
761 tools: self.tools.schemas(),
762 cancel,
763 };
764 let events = self.events.clone();
765 let on_done: Arc<
766 dyn Fn(crate::session::CompactionReport) -> futures::future::BoxFuture<'static, ()>
767 + Send
768 + Sync,
769 > = Arc::new(move |report| {
770 // Return a future so that `emit` is awaited inside the compaction task body —
771 // no detached task is spawned, and event emission is governed by the
772 // compaction task's cancel/track semantics.
773 let events = events.clone();
774 Box::pin(async move {
775 events
776 .emit(AgentEvent::ContextCompressed {
777 tokens_before: report.tokens_before,
778 tokens_after: report.tokens_after,
779 })
780 .await;
781 })
782 });
783 let started = slot.try_spawn(history_arc.clone(), ctx, hard_threshold, on_done);
784 if started {
785 tracing::info!(hard_threshold, "background compaction started");
786 }
787 }
788
789 /// Hard threshold fallback: wait for an in-flight background compaction to finish, or
790 /// run a synchronous compaction.
791 async fn compact_hard(&self, estimate: u64, hard: u64) -> Result<(), TurnError> {
792 // A background compaction is already in flight; wait for it to finish to avoid
793 // redundant work.
794 if let Some(slot) = self.compaction_slot.as_ref()
795 && slot.is_in_flight()
796 {
797 slot.await_in_flight().await;
798 return Ok(());
799 }
800
801 // Before the compact hook: the hook may `Skip` to veto this compaction (a
802 // mutating step).
803 let mut before = crate::hooks::step::BeforeCompact {
804 token_estimate: estimate,
805 threshold: hard,
806 };
807 if let crate::hooks::step::HookControl::Skip =
808 self.hooks.dispatch(&mut before, self.hook_ctx()).await
809 {
810 tracing::info!("compaction vetoed by before-compact hook");
811 return Ok(());
812 }
813
814 let ctx = self.sync_compaction_ctx();
815 let Some(report) = compact::run_sync(self.history, &ctx, hard).await else {
816 // No safe compaction boundary (e.g., a single very long turn) — skip this
817 // round, no event emitted.
818 return Ok(());
819 };
820 self.events
821 .emit(AgentEvent::ContextCompressed {
822 tokens_before: report.tokens_before,
823 tokens_after: report.tokens_after,
824 })
825 .await;
826
827 // After the compact hook: observe and allow injection (injected content goes into
828 // history).
829 let mut after = crate::hooks::step::AfterCompact {
830 tokens_before: report.tokens_before,
831 tokens_after: report.tokens_after,
832 additional_context: Vec::new(),
833 };
834 let _ = self.hooks.dispatch(&mut after, self.hook_ctx()).await;
835 if !after.additional_context.is_empty() {
836 self.append_user_feedback(after.additional_context);
837 }
838 Ok(())
839 }
840
841 /// The [`compact::CompactionCtx`] for synchronous compaction. Wrapping a borrowed
842 /// provider in a temporary `Arc` is not feasible (a trait object borrow cannot be
843 /// `Arc`), so the synchronous path requires `provider_arc`. Falling back when it is
844 /// missing is impossible (the top-level always has one; child agents use a borrowed
845 /// `provider`—see the `sync_compaction_ctx` implementation).
846 fn sync_compaction_ctx(&self) -> compact::CompactionCtx {
847 compact::CompactionCtx {
848 provider: self
849 .provider_arc
850 .clone()
851 .expect("sync compaction requires provider_arc"),
852 model: self.config.model.clone(),
853 sampling: self.config.sampling.clone(),
854 tools: self.tools.schemas(),
855 cancel: self.cancel.clone(),
856 }
857 }
858
859 /// Parse the three-tier compaction thresholds (in tokens) for this turn. Any tier set
860 /// to `None` means that tier is not triggered.
861 /// The model's context window in tokens, exactly as the provider reports it. `None` ⇒
862 /// the provider does not expose it (notably Bedrock, whose SDK returns no model
863 /// metadata). For decisions that need a ceiling, prefer [`Self::effective_context_window`].
864 fn context_window(&self) -> Option<u64> {
865 self.provider
866 .model_info(&self.config.model)
867 .and_then(|m| m.context_window)
868 }
869
870 /// The context window to actually drive compaction / oversized-result rejection with.
871 ///
872 /// Falls back to [`FALLBACK_CONTEXT_WINDOW`] when the provider exposes no window and the
873 /// user has not configured an explicit absolute ceiling
874 /// ([`TurnConfig::compact_threshold_tokens`]) — otherwise an unknown window means no
875 /// compaction at all and the context grows until the provider hard-rejects the request.
876 /// The fallback is deliberately conservative (compacting early is safe; overshooting is
877 /// not). A one-line warning is emitted once per model so the user knows to declare the
878 /// real `context_window` in config.
879 fn effective_context_window(&self) -> Option<u64> {
880 if let Some(window) = self.context_window() {
881 return Some(window);
882 }
883 // An explicit absolute hard threshold is the user's deliberate ceiling; respect it
884 // and do not fabricate a window (micro/soft simply stay off, as before).
885 if self.config.compact_threshold_tokens.is_some() {
886 return None;
887 }
888 warn_missing_context_window(&self.config.model);
889 Some(FALLBACK_CONTEXT_WINDOW)
890 }
891
892 fn compact_thresholds(&self) -> CompactThresholds {
893 let window = self.effective_context_window();
894
895 // For `hard`, an absolute threshold takes precedence; otherwise, use `ratio *
896 // window`.
897 let hard = self.config.compact_threshold_tokens.or_else(|| {
898 let ratio = self.config.compact_ratio?;
899 ratio_threshold(window?, ratio)
900 });
901 // micro/soft can only be derived from window (absolute overrides apply only to
902 // hard).
903 let from_ratio =
904 |ratio: Option<f64>| ratio.and_then(|r| window.and_then(|w| ratio_threshold(w, r)));
905 CompactThresholds {
906 micro: from_ratio(self.config.microcompact_ratio),
907 soft: from_ratio(self.config.compact_soft_ratio),
908 hard,
909 }
910 }
911
912 pub(super) fn hook_ctx(&self) -> HookCtx<'_> {
913 HookCtx::new(self.session_id, self.cwd, self.cancel.clone())
914 }
915}
916
917// ----- internal types -----
918
919#[derive(Clone, Copy)]
920struct TurnOutcome {
921 reason: AcpStopReason,
922 usage: Usage,
923}
924
925/// Three-tier compaction watermarks (in tokens). Each `None` means that tier is not
926/// triggered this turn.
927#[derive(Clone, Copy)]
928struct CompactThresholds {
929 micro: Option<u64>,
930 soft: Option<u64>,
931 hard: Option<u64>,
932}
933
934impl CompactThresholds {
935 /// All three thresholds absent — no proactive compaction this turn.
936 fn is_empty(&self) -> bool {
937 self.micro.is_none() && self.soft.is_none() && self.hard.is_none()
938 }
939}
940
941/// `context_window * ratio` rounded down. `ratio` is in `(0, 1]`. `0` → `None` (no
942/// trigger).
943fn ratio_threshold(context_window: u64, ratio: f64) -> Option<u64> {
944 let threshold = (context_window as f64 * ratio).floor() as u64;
945 (threshold > 0).then_some(threshold)
946}
947
948/// Conservative context window assumed when the provider exposes none and the user has not
949/// configured an explicit ceiling. Sized to the smallest window common across current
950/// models so compaction errs toward triggering early rather than overflowing — see
951/// [`TurnRunner::effective_context_window`]. Users should declare the real value via the
952/// model's `context_window` (e.g. `[providers.<name>] models = [{ id = "...",
953/// context_window = 200000 }]`) or `[turn].compact_threshold_tokens`.
954const FALLBACK_CONTEXT_WINDOW: u64 = 128_000;
955
956/// Emits a one-time (per model id) warning that the context window is unknown and the
957/// fallback is in effect. The hot turn loop calls `effective_context_window` every
958/// iteration, so the dedupe set prevents log spam.
959fn warn_missing_context_window(model: &str) {
960 use std::collections::HashSet;
961 use std::sync::Mutex;
962 use std::sync::OnceLock;
963
964 static WARNED: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
965 let mut warned = WARNED
966 .get_or_init(|| Mutex::new(HashSet::new()))
967 .lock()
968 .expect("warn-once mutex poisoned");
969 if warned.insert(model.to_string()) {
970 tracing::warn!(
971 model,
972 fallback = FALLBACK_CONTEXT_WINDOW,
973 "model exposes no context_window; assuming a conservative fallback for compaction. \
974 Declare the real value via the model's `context_window` in config or set \
975 `[turn].compact_threshold_tokens` to silence this."
976 );
977 }
978}
979
980/// Default upper limit for forced continuations in the `before turn-end` hook. Can be
981/// overridden by [`TurnConfig::max_hook_continues`] (config key
982/// `[turn].max_hook_continues`). See docs on hook step context exit semantics.
983pub(crate) const DEFAULT_MAX_HOOK_CONTINUES: u32 = 3;
984
985/// Default upper bound for subagent vertical recursion depth. Counted from the top-level
986/// turn: N levels means the top turn can spawn subagents, their children can spawn
987/// further, and so on, until the Nth level (where `subagent_max_depth` reaches 0) can no
988/// longer call `spawn_agent`. The default of 1 means the main agent can spawn subagents
989/// but those subagents cannot spawn further (the common non-recursive policy); raise it
990/// via `[turn].subagent_max_depth` for orchestrations like "main agent → coordinator
991/// subagent → worker subagent". Horizontal runaway is separately guarded by
992/// `request_limit`.
993pub(crate) const DEFAULT_SUBAGENT_MAX_DEPTH: u32 = 1;
994
995struct TurnState {
996 request_count: u32,
997 usage: Usage,
998 cap: Option<u32>,
999 expand_on_progress: bool,
1000 /// How many times this turn has been extended by the `before turn-end` hook. Cap is
1001 /// [`Self::max_stop_hook_continues`].
1002 stop_hook_continues: u32,
1003 /// Hard upper limit for life-extending continues (from
1004 /// [`TurnConfig::max_hook_continues`]). Prevents hooks from `Continue`ing
1005 /// indefinitely.
1006 max_stop_hook_continues: u32,
1007}
1008
1009impl TurnState {
1010 fn new(limit: TurnRequestLimit, max_hook_continues: u32) -> Self {
1011 Self {
1012 request_count: 0,
1013 usage: Usage::default(),
1014 cap: limit.initial_cap(),
1015 expand_on_progress: limit.expand_on_progress(),
1016 stop_hook_continues: 0,
1017 max_stop_hook_continues: max_hook_continues,
1018 }
1019 }
1020
1021 fn note_progress(&mut self) {
1022 if self.expand_on_progress
1023 && let Some(cap) = self.cap.as_mut()
1024 {
1025 *cap = cap.saturating_add(1);
1026 }
1027 }
1028
1029 /// Reset the per-turn request budget back to its initial state. Called when a
1030 /// `before turn-end` hook keeps the turn alive (e.g. goal mode continuing), so the
1031 /// `request_limit` behaves as a *per-logical-turn* budget rather than a single budget
1032 /// shared across the whole multi-turn run. The cap returns to its initial value
1033 /// (re-reading the configured strategy), discarding any `expand_on_progress` growth.
1034 fn reset_request_budget(&mut self, limit: TurnRequestLimit) {
1035 self.request_count = 0;
1036 self.cap = limit.initial_cap();
1037 }
1038
1039 fn exceeded_request_cap(&self) -> bool {
1040 match self.cap {
1041 None => false,
1042 Some(cap) => self.request_count >= cap,
1043 }
1044 }
1045
1046 /// Whether the `before turn-end` hook is still allowed to continue (has not reached
1047 /// the hard limit).
1048 fn may_stop_hook_continue(&self) -> bool {
1049 self.stop_hook_continues < self.max_stop_hook_continues
1050 }
1051
1052 /// Records one stop-hook continuation.
1053 fn note_stop_hook_continue(&mut self) {
1054 self.stop_hook_continues = self.stop_hook_continues.saturating_add(1);
1055 }
1056}
1057
1058// ----- helpers -----
1059
1060fn turn_outcome(state: &TurnState, reason: AcpStopReason) -> TurnOutcome {
1061 TurnOutcome {
1062 reason,
1063 usage: state.usage,
1064 }
1065}
1066
1067#[cfg(test)]
1068mod tests;