defect_agent/session/turn.rs
1//! Turn main loop.
2//!
3//! Turn main loop — the "heart" of the agent. This file implements the state machine.
4//!
5//! Key dependencies:
6//! - [`History`]: read/write message history
7//! - [`ToolRegistry`]: tool lookup
8//! - [`LlmProvider`]: LLM invocation
9//! - [`EventEmitter`]: event emission (shared via `Arc` so tool tasks can also emit)
10//! - [`PermissionGate`]: wait for permission requests
11
12use std::path::PathBuf;
13use std::sync::Arc;
14
15use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason as AcpStopReason};
16use tokio_util::sync::CancellationToken;
17
18use crate::event::AgentEvent;
19use crate::fs::FsBackend;
20use crate::hooks::{HookCtx, HookEngine};
21use crate::http::HttpClient;
22use crate::llm::{
23 CompletionRequest, HostedCapabilities, LlmProvider, Message, MessageContent, Role,
24 SamplingParams, StopReason as LlmStopReason, ToolChoice, Usage,
25};
26use crate::policy::SandboxPolicy;
27use crate::session::events::EventEmitter;
28use crate::session::permissions::PermissionGate;
29use crate::session::{History, ToolRegistry, TurnError};
30use crate::shell::ShellBackend;
31
32const DEFAULT_PROMPT_FILE: &str = "AGENTS.md";
33
34mod request_audit;
35
36mod compact;
37
38mod microcompact;
39
40mod compaction_slot;
41
42pub use compaction_slot::CompactionSlot;
43
44mod sanitize;
45
46mod content;
47
48mod llm_drive;
49
50mod tools;
51
52mod hooks;
53
54use content::content_block_to_message_content;
55use hooks::UserPromptHookFlow;
56use llm_drive::{assistant_message, real_input_tokens};
57use tools::{
58 Approved, DecisionFlow, approved_tool_name, reject_oversized_results, tool_results_message,
59};
60
61pub(crate) use request_audit::RequestAuditTracker;
62// Out-of-band `/compact` slash command reuses the same synchronous compaction primitive
63// as the turn loop's hard-watermark fallback, so the two share boundary selection and
64// summary logic instead of forking a second compaction path.
65pub(crate) use compact::{CompactionCtx, run_sync as run_sync_compaction};
66
67/// Strategy for capping LLM calls.
68#[derive(Debug, Clone, Copy)]
69pub enum TurnRequestLimit {
70 /// No upper limit.
71 Unbounded,
72 /// Fixed limit: returns [`AcpStopReason::MaxTurnRequests`] after N turns.
73 Fixed(u32),
74 /// Adaptive: each time a tool use is approved and executed in the current turn, that
75 /// counts as progress and the limit is automatically incremented by 1; otherwise,
76 /// termination follows [`Self::Fixed`].
77 Adaptive {
78 initial: u32,
79 expand_on_progress: bool,
80 },
81}
82
83impl TurnRequestLimit {
84 fn initial_cap(&self) -> Option<u32> {
85 match *self {
86 Self::Unbounded => None,
87 Self::Fixed(n) => Some(n),
88 Self::Adaptive { initial, .. } => Some(initial),
89 }
90 }
91
92 fn expand_on_progress(&self) -> bool {
93 matches!(
94 self,
95 Self::Adaptive {
96 expand_on_progress: true,
97 ..
98 }
99 )
100 }
101}
102
103/// Configuration for a turn.
104#[derive(Debug, Clone)]
105pub struct TurnConfig {
106 /// The selected provider vendor (the provider half of the selection key). Together
107 /// with [`Self::model`], this is used to resolve the actual provider entry in the
108 /// registry by the `(vendor, model)` pair.
109 pub provider: String,
110 pub model: String,
111 pub allowed_models: Option<Vec<String>>,
112 pub base_prompt: BasePromptConfig,
113 pub system_prompt: Option<String>,
114 pub prompt: PromptConfig,
115 pub sampling: SamplingParams,
116 pub request_limit: TurnRequestLimit,
117 /// Explicit absolute override for the compaction threshold (in tokens). When `Some`,
118 /// takes precedence over the value inferred from [`Self::compact_ratio`]. When
119 /// `None`, the threshold is automatically derived from the ratio.
120 pub compact_threshold_tokens: Option<u64>,
121 /// Compression threshold as a fraction of the model's `context_window` (e.g. `0.85` =
122 /// trigger when usage exceeds 85%). This is the **hard watermark**: when reached, if
123 /// no background compaction is in flight, the turn main loop performs **synchronous**
124 /// compaction as a fallback (blocking the current turn but guaranteeing the context
125 /// is not exceeded). `None` = no automatic ratio-based compression (and if no
126 /// absolute threshold is set either, no compression occurs for this turn). Only
127 /// effective when `compact_threshold_tokens` is `None` and the model exposes a
128 /// `context_window`. See `session/turn/compact.rs` for details.
129 pub compact_ratio: Option<f64>,
130 /// **Background full compaction** toggle. When `true`, once the soft watermark
131 /// derived from [`Self::compact_soft_ratio`] is exceeded, a summarization compaction
132 /// is started **asynchronously** (without blocking the current turn); it quietly
133 /// compresses history before the hard watermark is hit. When disabled, only
134 /// synchronous compaction at the hard watermark remains. See
135 /// `session/turn/compaction_slot.rs`.
136 pub background_compact_enabled: bool,
137 /// Soft watermark for background compaction, as a fraction of `context_window`
138 /// (default `0.7`). Must be less than `compact_ratio` (the hard watermark) to leave a
139 /// window between soft and hard for background summarization to complete. Only
140 /// effective when `background_compact_enabled` is set and a threshold can be derived.
141 pub compact_soft_ratio: Option<f64>,
142 /// Enables **micro‑compaction**. When `true`, each turn first runs a micro‑compaction
143 /// (cleans oversized `tool_result` in older turns, without calling the LLM or
144 /// deleting messages) at the water level above [`Self::microcompact_ratio`],
145 /// deferring expensive full compaction. See `session/turn/microcompact.rs`.
146 pub microcompact_enabled: bool,
147 /// Micro‑compaction watermark as a fraction of `context_window` (default `0.6`).
148 /// Typically below the soft watermark — micro‑compaction is the cheapest first line
149 /// of defense. Only effective when `microcompact_enabled` and a threshold can be
150 /// derived.
151 pub microcompact_ratio: Option<f64>,
152 pub max_llm_retries: u32,
153 /// `0` = unlimited. The default is unlimited.
154 pub max_concurrent_tools: usize,
155 /// Hard upper limit on forced continuations from the `before turn-end` hook —
156 /// prevents infinite loops from repeated hook `Continue` calls. Default: 3.
157 pub max_hook_continues: u32,
158 /// Maximum subagent nesting depth (vertical recursion limit) for this turn.
159 /// `spawn_agent` uses this to pass the "remaining depth" to tools via
160 /// [`crate::tool::ToolContext::subagent_depth`]; a child agent's nested turn receives
161 /// `subagent_max_depth` = parent's remaining depth minus one. `0` means this turn's
162 /// tool set contains no `spawn_agent`, structurally forbidding dispatch — replacing
163 /// the old hardcoded gate of "whitelist never contains `spawn_agent`". Default:
164 /// `DEFAULT_SUBAGENT_MAX_DEPTH`.
165 pub subagent_max_depth: u32,
166}
167
168impl Default for TurnConfig {
169 fn default() -> Self {
170 Self {
171 provider: String::new(),
172 model: String::new(),
173 allowed_models: None,
174 base_prompt: BasePromptConfig::default(),
175 system_prompt: None,
176 prompt: PromptConfig::default(),
177 sampling: SamplingParams::default(),
178 request_limit: TurnRequestLimit::Adaptive {
179 initial: 32,
180 expand_on_progress: true,
181 },
182 compact_threshold_tokens: None,
183 // Trigger compaction at 85% of `context_window` (hard watermark), reserving
184 // ~15% for summary output and headroom — within the reasonable range of codex
185 // (90%), Claude (~93%), and opencode (window-20k).
186 compact_ratio: Some(0.85),
187 // Background compaction enabled by default: starts async summarization at
188 // soft=0.7, aiming to finish before hard=0.85 is reached.
189 background_compact_enabled: true,
190 compact_soft_ratio: Some(0.7),
191 // Micro‑compaction enabled by default: at 0.6 it evicts old large
192 // `tool_result`s — the cheapest first line of defense.
193 microcompact_enabled: true,
194 microcompact_ratio: Some(0.6),
195 max_llm_retries: 3,
196 max_concurrent_tools: 0,
197 max_hook_continues: DEFAULT_MAX_HOOK_CONTINUES,
198 subagent_max_depth: DEFAULT_SUBAGENT_MAX_DEPTH,
199 }
200 }
201}
202
203#[derive(Debug, Clone, PartialEq, Eq, Default)]
204pub struct BasePromptConfig {
205 pub file: Option<PathBuf>,
206 pub text: Option<String>,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct PromptConfig {
211 pub file: String,
212 pub text: Option<String>,
213 pub provider_overlays: std::collections::BTreeMap<String, String>,
214 pub model_overlays: std::collections::BTreeMap<String, String>,
215}
216
217impl Default for PromptConfig {
218 fn default() -> Self {
219 Self {
220 file: DEFAULT_PROMPT_FILE.to_owned(),
221 text: None,
222 provider_overlays: std::collections::BTreeMap::new(),
223 model_overlays: std::collections::BTreeMap::new(),
224 }
225 }
226}
227/// All dependencies and accumulated state for a single turn execution.
228///
229/// This struct is constructed by [`crate::session::DefaultSession`] on each `run_turn`
230/// call,
231/// borrowing sub-components of the session and being dropped after the turn completes.
232pub struct TurnRunner<'a> {
233 pub history: &'a dyn History,
234 pub tools: &'a dyn ToolRegistry,
235 pub provider: &'a dyn LlmProvider,
236 /// The active policy for this turn's snapshot. Owned as an `Arc` rather than
237 /// borrowed: it flows with [`crate::tool::ToolContext`] into `spawn_agent`, where
238 /// child agents wrap it with
239 /// [`NonInteractivePolicy`](crate::policy::NonInteractivePolicy) — must be the
240 /// parent's actual policy at this moment.
241 pub policy: Arc<dyn SandboxPolicy>,
242 pub events: Arc<EventEmitter>,
243 pub permissions: &'a PermissionGate,
244 pub cancel: CancellationToken,
245 pub config: &'a TurnConfig,
246 /// The system prompt resolved for this turn. `Arc<str>`: each `build_request` call
247 /// `clone`s it into `CompletionRequest.system`; the `Arc` reduces this to a reference
248 /// count increment.
249 pub system_prompt: Option<Arc<str>>,
250 pub cwd: &'a std::path::Path,
251 pub fs: Arc<dyn FsBackend>,
252 pub shell: Arc<dyn ShellBackend>,
253 pub http: Arc<dyn HttpClient>,
254 /// Hosted capabilities determined at session startup.
255 /// Reused directly on each turn when assembling requests, without re-querying.
256 pub hosted_capabilities: HostedCapabilities,
257 /// Hook engine. The turn main loop emits Sync events at four points
258 /// (`UserPromptSubmit` / `PreToolUse` / `PostToolUse` / `PostToolUseFailure`).
259 /// Waits for hooks to finish before proceeding.
260 pub hooks: &'a dyn HookEngine,
261 /// Current session ID. Injected into `HookCtx` so that hook handlers can route or
262 /// audit by session.
263 pub session_id: &'a SessionId,
264 /// Session-level background task handle. When `Some`, enables the tool's
265 /// `run_in_background` capability (injected into tools via
266 /// [`crate::tool::ToolContext::background`]); nested sub-agent turns receive `None`,
267 /// structurally preventing background task self-replication.
268 pub background: Option<crate::session::BackgroundTasks>,
269 /// Shared state for the `--goal` goal-driven loop. When `Some`, this session is
270 /// running in goal mode: it is injected into the `goal_done` tool via
271 /// [`crate::tool::ToolContext::goal`], and the `goal-gate` hook uses it in
272 /// `before_turn_end` to allow or extend the session. `None` = non-goal mode
273 /// (default).
274 pub goal: Option<Arc<crate::session::GoalState>>,
275 /// Session-level single-flight compaction slot. When `Some`, background full
276 /// compaction is available — exceeding the soft watermark triggers an async summary
277 /// compaction without blocking the current turn. Nested sub-agent turns pass `None`
278 /// (sub-agent contexts are short-lived and should not spawn background tasks).
279 /// Requires `Arc<dyn History>`/`Arc<dyn LlmProvider>` for the task to hold `'static`
280 /// references across turns, hence the accompanying `history_arc`/`provider_arc`.
281 pub compaction_slot: Option<crate::session::CompactionSlot>,
282 /// `Arc<dyn History>` for `compaction_slot` (points to the same object as the
283 /// `history` borrow). Held by the background compaction task across turns. When
284 /// `None`, background compaction is unavailable and falls back to synchronous
285 /// compaction only.
286 pub history_arc: Option<Arc<dyn History>>,
287 /// `Arc` for the provider used by `compaction_slot`. Same as above.
288 pub provider_arc: Option<Arc<dyn LlmProvider>>,
289 /// Session-level cancellation token; the background compaction task's cancellation
290 /// token is derived from this one (independent of the turn's cancel, cancelled when
291 /// the session ends). When `None`, background compaction uses the turn's `cancel`
292 /// (sub-agent path).
293 pub session_cancel: Option<CancellationToken>,
294 /// The ingestion source for this turn's input — determines the `source` field of the
295 /// `before_ingest` step envelope.
296 /// User turns use `User`; background continuation turns started by the session driver
297 /// use `Background`.
298 pub ingest_source: crate::hooks::step::IngestSource,
299 /// Request stability diagnostics: compares snapshots of two consecutive requests
300 /// actually sent to the provider, helping locate sources of high volatility in low
301 /// prompt cache hit rates.
302 pub(crate) request_audit: &'a RequestAuditTracker,
303}
304
305impl<'a> TurnRunner<'a> {
306 /// Runs a single turn.
307 pub async fn run(&self, prompt: Vec<ContentBlock>) -> Result<AcpStopReason, TurnError> {
308 // ① UserPromptSubmit hook (sync interception)
309 // Gives hooks a chance to rewrite or intercept the prompt before it lands in
310 // history.
311 let prompt = match self.fire_user_prompt_submit(prompt).await {
312 UserPromptHookFlow::Continue(p) => p,
313 UserPromptHookFlow::Refused => {
314 // Hook blocked: do not emit `UserPromptCommitted`, do not append to
315 // history; return `Refusal` directly so the ACP bridge responds with
316 // `PromptResponse`.
317 return Ok(AcpStopReason::Refusal);
318 }
319 };
320
321 self.events
322 .emit(AgentEvent::UserPromptCommitted {
323 content: prompt.clone(),
324 })
325 .await;
326 self.history.append(Message {
327 role: Role::User,
328 content: prompt
329 .into_iter()
330 .map(content_block_to_message_content)
331 .collect::<Result<Vec<_>, _>>()?
332 .into_iter()
333 .flatten()
334 .collect(),
335 });
336
337 // After Ingest hook: input has been merged into history. Injection only.
338 {
339 let mut step = crate::hooks::step::AfterIngest {
340 committed_len: 1,
341 additional_context: Vec::new(),
342 };
343 let _ = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
344 if !step.additional_context.is_empty() {
345 self.append_user_feedback(step.additional_context);
346 }
347 }
348
349 self.events.emit(AgentEvent::TurnStarted).await;
350
351 // After-turn-enter hook: the turn scope has been entered. Allows injecting system
352 // context or a Break to reject the turn.
353 // Note: currently the hook point is placed after prompt ingestion (moving the hook
354 // point earlier is a deferred adjustment).
355 {
356 let mut step = crate::hooks::step::AfterTurnEnter {
357 is_subagent: false,
358 agent_type: None,
359 additional_context: Vec::new(),
360 };
361 let control = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
362 if !step.additional_context.is_empty() {
363 self.append_user_feedback(step.additional_context);
364 }
365 if let crate::hooks::step::HookControl::Break { .. } = control {
366 return Ok(AcpStopReason::EndTurn);
367 }
368 }
369
370 let result = self.run_inner().await;
371
372 if let Ok(outcome) = &result {
373 self.events
374 .emit(AgentEvent::TurnEnded {
375 reason: outcome.reason,
376 usage: outcome.usage,
377 })
378 .await;
379 }
380 // The `Err` path does not emit `TurnEnded`; the bridge layer decides the wire
381 // response based on the future outcome.
382
383 result.map(|outcome| outcome.reason)
384 }
385
386 async fn run_inner(&self) -> Result<TurnOutcome, TurnError> {
387 let mut state = TurnState::new(self.config.request_limit, self.config.max_hook_continues);
388 loop {
389 if self.cancel.is_cancelled() {
390 return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
391 }
392
393 self.manage_context().await?;
394
395 let mut req = self.build_request();
396
397 // Before Generate hook: can modify request (model), short-circuit (fill in
398 // synthetic assistant to skip LLM), or Break.
399 let mut before_gen = crate::hooks::step::BeforeGenerate {
400 model: req.model.clone(),
401 message_count: req.messages.len(),
402 attempt: state.request_count.saturating_add(1),
403 assistant_text: None,
404 };
405 let bg_control = self.hooks.dispatch(&mut before_gen, self.hook_ctx()).await;
406 req.model = before_gen.model;
407 if let Some(text) = before_gen.assistant_text {
408 // Short-circuit: use a synthetic assistant reply to skip the real LLM
409 // call, then proceed to the before-turn-end check.
410 self.history.append(Message {
411 role: Role::Assistant,
412 content: vec![MessageContent::Text { text }].into(),
413 });
414 if self.decide_turn_end(&mut state).await {
415 continue;
416 }
417 return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
418 }
419 if let crate::hooks::step::HookControl::Break { reason } = bg_control {
420 return Ok(turn_outcome(&state, reason));
421 }
422
423 let (mut stream, attempt) = self.call_llm_with_retry(&req, &mut state).await?;
424
425 let outcome = self.drain_provider_stream(&mut stream, &mut state).await?;
426
427 if outcome.cancelled {
428 return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
429 }
430
431 // The stream has been drained and all usage for this call is available — emit
432 // `LlmCallFinished` with the **per-call** actual usage (`outcome.usage`, not
433 // the turn-accumulated `state.usage`).
434 self.events
435 .emit(AgentEvent::LlmCallFinished {
436 model: req.model.clone(),
437 attempt,
438 usage: outcome.usage,
439 error: None,
440 })
441 .await;
442
443 // After the Generate hook: observe (usage / stop / error). No output to fill;
444 // to intervene, route the next turn through before-turn-end.
445 let stop_reason_for_hook = match outcome.stop {
446 LlmStopReason::EndTurn | LlmStopReason::StopSequence => AcpStopReason::EndTurn,
447 LlmStopReason::Refusal => AcpStopReason::Refusal,
448 LlmStopReason::MaxTokens => AcpStopReason::MaxTokens,
449 LlmStopReason::ToolUse => AcpStopReason::EndTurn,
450 };
451 let mut after_gen = crate::hooks::step::AfterGenerate {
452 model: req.model.clone(),
453 usage: outcome.usage,
454 stop: stop_reason_for_hook,
455 error: None,
456 };
457 let _ = self.hooks.dispatch(&mut after_gen, self.hook_ctx()).await;
458
459 // Feed the actual input token count returned by this call into `history` as
460 // the precise baseline for compaction threshold decisions (see
461 // `session/turn/compact.rs`). The messages sent in this call are
462 // `req.messages`, and their real input size is the sum of the three
463 // input-side fields in `outcome.usage`.
464 if let Some(real_input) = real_input_tokens(&outcome.usage) {
465 self.history.record_input_tokens(real_input);
466 }
467
468 let assistant = assistant_message(&outcome);
469 if !assistant.content.is_empty() {
470 self.history.append(assistant);
471 }
472
473 // Passive stop (Refusal / MaxTokens): skip the before-turn-end hook (the hook
474 // cannot extend these), exit directly.
475 match outcome.stop {
476 LlmStopReason::EndTurn | LlmStopReason::StopSequence => {
477 // Voluntary stop → before-turn-end decision point.
478 if self.decide_turn_end(&mut state).await {
479 continue;
480 }
481 return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
482 }
483 LlmStopReason::Refusal => {
484 return Ok(turn_outcome(&state, AcpStopReason::Refusal));
485 }
486 LlmStopReason::MaxTokens => {
487 return Ok(turn_outcome(&state, AcpStopReason::MaxTokens));
488 }
489 LlmStopReason::ToolUse => {}
490 }
491
492 if outcome.tool_uses.is_empty() {
493 // Voluntary stop (no tool requested) → same before-turn-end decision
494 // point.
495 if self.decide_turn_end(&mut state).await {
496 continue;
497 }
498 return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
499 }
500
501 // Before the Permission hook (currently only observes/stubs; policy still
502 // delegates to the authority).
503 for tu in &outcome.tool_uses {
504 let mut bp = crate::hooks::step::BeforePermission {
505 tool: tu.name.clone(),
506 decision: "ask".to_string(),
507 resolved: None,
508 };
509 let _ = self.hooks.dispatch(&mut bp, self.hook_ctx()).await;
510 }
511
512 let approved = match self.decide_permissions(&outcome.tool_uses).await? {
513 DecisionFlow::Continue(list) => list,
514 DecisionFlow::Cancelled => {
515 return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
516 }
517 };
518
519 // After permission hook (currently only observes/stubs).
520 for a in &approved {
521 let (tool, granted) = match a {
522 Approved::Run { .. } => (approved_tool_name(a), true),
523 Approved::Denied { .. } | Approved::FailedArgs { .. } => {
524 (approved_tool_name(a), false)
525 }
526 };
527 let mut ap = crate::hooks::step::AfterPermission { tool, granted };
528 let _ = self.hooks.dispatch(&mut ap, self.hook_ctx()).await;
529 }
530
531 let progressed = approved.iter().any(|a| matches!(a, Approved::Run { .. }));
532 if progressed {
533 state.note_progress();
534 }
535
536 let mut results = self.run_tools_concurrently(approved).await;
537
538 // Reject any single tool result that exceeds the model's context window: it can
539 // never fit, so appending it as-is would only blow up the next request. Replace
540 // it with an actionable error before it enters history. See
541 // `reject_oversized_results`.
542 let rejected = reject_oversized_results(&mut results, self.context_window());
543 if rejected > 0 {
544 tracing::warn!(
545 rejected,
546 "rejected oversized tool result(s) exceeding the context window"
547 );
548 }
549
550 // After `ToolBatch` hook: after all parallel tools finish, before the next
551 // LLM call. Allows injection or graceful break.
552 let mut batch = crate::hooks::step::AfterToolBatch {
553 results: results
554 .iter()
555 .map(|r| crate::hooks::step::ToolBatchEntry {
556 tool_name: r.name.clone(),
557 is_error: r.is_error,
558 })
559 .collect(),
560 additional_context: Vec::new(),
561 };
562 let batch_control = self.hooks.dispatch(&mut batch, self.hook_ctx()).await;
563
564 self.history.append(tool_results_message(results));
565 if !batch.additional_context.is_empty() {
566 self.append_user_feedback(batch.additional_context);
567 }
568 if let crate::hooks::step::HookControl::Break { reason } = batch_control {
569 return Ok(turn_outcome(&state, reason));
570 }
571
572 if state.exceeded_request_cap() {
573 return Ok(turn_outcome(&state, AcpStopReason::MaxTurnRequests));
574 }
575 }
576 }
577
578 fn build_request(&self) -> CompletionRequest {
579 // Before sending the request, pair any orphaned `tool_use` (left over from an
580 // interruption, with no matching `tool_result`) — otherwise the provider will
581 // permanently reject the request. Only patch the copy sent to the provider; the
582 // true history remains untouched. See `sanitize`.
583 let messages = sanitize::sanitize_tool_pairing(self.history.snapshot());
584 let req = CompletionRequest {
585 model: self.config.model.clone(),
586 system: self.system_prompt.clone(),
587 messages,
588 tools: self.tools.schemas(),
589 tool_choice: ToolChoice::Auto,
590 sampling: self.config.sampling.clone(),
591 hosted_capabilities: self.hosted_capabilities,
592 };
593 self.request_audit.record(&req);
594 req
595 }
596
597 /// Layered context management: micro → soft (background) → hard (synchronous
598 /// fallback). Called at the start of each main loop iteration.
599 ///
600 /// Three water levels (see `compact_thresholds` for details):
601 /// 1. **micro** (default 0.6·window): if micro-compaction is enabled, first evict old
602 /// large `tool_result` entries — no LLM calls, no message deletion, near-zero
603 /// latency, deferring expensive full compaction.
604 /// 2. **soft** (default 0.7·window): if background compaction is enabled,
605 /// **asynchronously** start a summary compaction; the turn does not block, quietly
606 /// compacting before hitting hard (single-flight, won't re-start if one is already
607 /// in flight).
608 /// 3. **hard** (default 0.85·window, equivalent to the old `compact_ratio`
609 /// semantics): at this level compaction is mandatory — if a background compaction
610 /// is already in flight, `await` its completion; otherwise compact
611 /// **synchronously** as a fallback.
612 ///
613 /// micro/soft require the model to expose `context_window`; hard also supports an
614 /// absolute override via `compact_threshold_tokens`. If any level cannot obtain its
615 /// threshold, that level is skipped (preserving the conservative "no information, no
616 /// compaction" semantics).
617 async fn manage_context(&self) -> Result<(), TurnError> {
618 let thresholds = self.compact_thresholds();
619 // All three thresholds absent → no proactive compaction this turn (preserves the
620 // existing semantics).
621 if thresholds.is_empty() {
622 return Ok(());
623 }
624 let Some(estimate) = self.history.token_estimate() else {
625 return Ok(());
626 };
627
628 // ① micro: synchronous, cheapest. May reduce `estimate` below soft/hard
629 // thresholds, so re-fetch after compaction.
630 let estimate = if self.config.microcompact_enabled
631 && thresholds.micro.is_some_and(|t| estimate >= t)
632 {
633 self.run_microcompact().await;
634 self.history.token_estimate().unwrap_or(estimate)
635 } else {
636 estimate
637 };
638
639 // ② soft: crossing the threshold triggers an async background compaction without
640 // blocking the current round.
641 if self.config.background_compact_enabled
642 && let (Some(soft), Some(hard)) = (thresholds.soft, thresholds.hard)
643 && estimate >= soft
644 && estimate < hard
645 {
646 self.spawn_background_compaction(hard).await;
647 // Non-blocking – continue assembling requests this round; summary persistence
648 // happens in a later round (or later).
649 return Ok(());
650 }
651
652 // ③ hard: must compact.
653 if let Some(hard) = thresholds.hard
654 && estimate >= hard
655 {
656 self.compact_hard(estimate, hard).await?;
657 }
658 Ok(())
659 }
660
661 /// Run a micro-compact and write back via `replace`. Best-effort: does nothing if
662 /// there is nothing to clean up.
663 async fn run_microcompact(&self) {
664 let messages = self.history.snapshot();
665 let Some((rebuilt, report)) = microcompact::run(&messages) else {
666 return;
667 };
668 self.history.replace(rebuilt);
669 tracing::info!(
670 cleared = report.cleared,
671 tokens_before = report.tokens_before,
672 tokens_after = report.tokens_after,
673 "context microcompacted"
674 );
675 self.events
676 .emit(AgentEvent::ContextMicrocompacted {
677 tokens_before: report.tokens_before,
678 tokens_after: report.tokens_after,
679 cleared: report.cleared,
680 })
681 .await;
682 }
683
684 /// Spawns a single-flight background full compaction when the soft threshold is
685 /// exceeded. Requires a slot and `Arc` references to history and provider (only
686 /// available in the top-level turn; child agent turns silently skip this, leaving it
687 /// to the synchronous compaction at the hard threshold).
688 async fn spawn_background_compaction(&self, hard_threshold: u64) {
689 let (Some(slot), Some(history_arc), Some(provider_arc)) = (
690 self.compaction_slot.as_ref(),
691 self.history_arc.as_ref(),
692 self.provider_arc.as_ref(),
693 ) else {
694 return;
695 };
696 if slot.is_in_flight() {
697 return; // Single-flight: a compaction is already in flight, do not start another.
698 }
699
700 // The compaction cancel token is independent of the turn cancel — the summary
701 // should be allowed to finish even if the originating turn has ended; however, it
702 // is cancelled when the session ends. For sub-agent paths that have no
703 // `session_cancel`, it falls back to the turn cancel.
704 let cancel = self
705 .session_cancel
706 .clone()
707 .unwrap_or_else(|| self.cancel.clone())
708 .child_token();
709 let ctx = compact::CompactionCtx {
710 provider: provider_arc.clone(),
711 model: self.config.model.clone(),
712 sampling: self.config.sampling.clone(),
713 tools: self.tools.schemas(),
714 cancel,
715 };
716 let events = self.events.clone();
717 let on_done: Arc<
718 dyn Fn(crate::session::CompactionReport) -> futures::future::BoxFuture<'static, ()>
719 + Send
720 + Sync,
721 > = Arc::new(move |report| {
722 // Return a future so that `emit` is awaited inside the compaction task body —
723 // no detached task is spawned, and event emission is governed by the
724 // compaction task's cancel/track semantics.
725 let events = events.clone();
726 Box::pin(async move {
727 events
728 .emit(AgentEvent::ContextCompressed {
729 tokens_before: report.tokens_before,
730 tokens_after: report.tokens_after,
731 })
732 .await;
733 })
734 });
735 let started = slot.try_spawn(history_arc.clone(), ctx, hard_threshold, on_done);
736 if started {
737 tracing::info!(hard_threshold, "background compaction started");
738 }
739 }
740
741 /// Hard threshold fallback: wait for an in-flight background compaction to finish, or
742 /// run a synchronous compaction.
743 async fn compact_hard(&self, estimate: u64, hard: u64) -> Result<(), TurnError> {
744 // A background compaction is already in flight; wait for it to finish to avoid
745 // redundant work.
746 if let Some(slot) = self.compaction_slot.as_ref()
747 && slot.is_in_flight()
748 {
749 slot.await_in_flight().await;
750 return Ok(());
751 }
752
753 // Before the compact hook: the hook may `Skip` to veto this compaction (a
754 // mutating step).
755 let mut before = crate::hooks::step::BeforeCompact {
756 token_estimate: estimate,
757 threshold: hard,
758 };
759 if let crate::hooks::step::HookControl::Skip =
760 self.hooks.dispatch(&mut before, self.hook_ctx()).await
761 {
762 tracing::info!("compaction vetoed by before-compact hook");
763 return Ok(());
764 }
765
766 let ctx = self.sync_compaction_ctx();
767 let Some(report) = compact::run_sync(self.history, &ctx, hard).await else {
768 // No safe compaction boundary (e.g., a single very long turn) — skip this
769 // round, no event emitted.
770 return Ok(());
771 };
772 self.events
773 .emit(AgentEvent::ContextCompressed {
774 tokens_before: report.tokens_before,
775 tokens_after: report.tokens_after,
776 })
777 .await;
778
779 // After the compact hook: observe and allow injection (injected content goes into
780 // history).
781 let mut after = crate::hooks::step::AfterCompact {
782 tokens_before: report.tokens_before,
783 tokens_after: report.tokens_after,
784 additional_context: Vec::new(),
785 };
786 let _ = self.hooks.dispatch(&mut after, self.hook_ctx()).await;
787 if !after.additional_context.is_empty() {
788 self.append_user_feedback(after.additional_context);
789 }
790 Ok(())
791 }
792
793 /// The [`compact::CompactionCtx`] for synchronous compaction. Wrapping a borrowed
794 /// provider in a temporary `Arc` is not feasible (a trait object borrow cannot be
795 /// `Arc`), so the synchronous path requires `provider_arc`. Falling back when it is
796 /// missing is impossible (the top-level always has one; child agents use a borrowed
797 /// `provider`—see the `sync_compaction_ctx` implementation).
798 fn sync_compaction_ctx(&self) -> compact::CompactionCtx {
799 compact::CompactionCtx {
800 provider: self
801 .provider_arc
802 .clone()
803 .expect("sync compaction requires provider_arc"),
804 model: self.config.model.clone(),
805 sampling: self.config.sampling.clone(),
806 tools: self.tools.schemas(),
807 cancel: self.cancel.clone(),
808 }
809 }
810
811 /// Parse the three-tier compaction thresholds (in tokens) for this turn. Any tier set
812 /// to `None` means that tier is not triggered.
813 /// The model's context window in tokens, if the provider exposes it. `None` ⇒ unknown
814 /// (no ceiling can be enforced for compaction or oversized-result rejection).
815 fn context_window(&self) -> Option<u64> {
816 self.provider
817 .model_info(&self.config.model)
818 .and_then(|m| m.context_window)
819 }
820
821 fn compact_thresholds(&self) -> CompactThresholds {
822 let window = self.context_window();
823
824 // For `hard`, an absolute threshold takes precedence; otherwise, use `ratio *
825 // window`.
826 let hard = self.config.compact_threshold_tokens.or_else(|| {
827 let ratio = self.config.compact_ratio?;
828 ratio_threshold(window?, ratio)
829 });
830 // micro/soft can only be derived from window (absolute overrides apply only to
831 // hard).
832 let from_ratio =
833 |ratio: Option<f64>| ratio.and_then(|r| window.and_then(|w| ratio_threshold(w, r)));
834 CompactThresholds {
835 micro: from_ratio(self.config.microcompact_ratio),
836 soft: from_ratio(self.config.compact_soft_ratio),
837 hard,
838 }
839 }
840
841 pub(super) fn hook_ctx(&self) -> HookCtx<'_> {
842 HookCtx::new(self.session_id, self.cwd, self.cancel.clone())
843 }
844}
845
846// ----- internal types -----
847
848#[derive(Clone, Copy)]
849struct TurnOutcome {
850 reason: AcpStopReason,
851 usage: Usage,
852}
853
854/// Three-tier compaction watermarks (in tokens). Each `None` means that tier is not
855/// triggered this turn.
856#[derive(Clone, Copy)]
857struct CompactThresholds {
858 micro: Option<u64>,
859 soft: Option<u64>,
860 hard: Option<u64>,
861}
862
863impl CompactThresholds {
864 /// All three thresholds absent — no proactive compaction this turn.
865 fn is_empty(&self) -> bool {
866 self.micro.is_none() && self.soft.is_none() && self.hard.is_none()
867 }
868}
869
870/// `context_window * ratio` rounded down. `ratio` is in `(0, 1]`. `0` → `None` (no
871/// trigger).
872fn ratio_threshold(context_window: u64, ratio: f64) -> Option<u64> {
873 let threshold = (context_window as f64 * ratio).floor() as u64;
874 (threshold > 0).then_some(threshold)
875}
876
877/// Default upper limit for forced continuations in the `before turn-end` hook. Can be
878/// overridden by [`TurnConfig::max_hook_continues`] (config key
879/// `[turn].max_hook_continues`). See docs on hook step context exit semantics.
880pub(crate) const DEFAULT_MAX_HOOK_CONTINUES: u32 = 3;
881
882/// Default upper bound for subagent vertical recursion depth. Counted from the top-level
883/// turn: N levels means the top turn can spawn subagents, their children can spawn
884/// further, and so on, until the Nth level (where `subagent_max_depth` reaches 0) can no
885/// longer call `spawn_agent`. The default of 4 leaves room for orchestrations like "main
886/// agent → coordinator subagent → worker subagent" while preventing runaway vertical
887/// growth. Horizontal runaway is separately guarded by `request_limit`.
888pub(crate) const DEFAULT_SUBAGENT_MAX_DEPTH: u32 = 1;
889
890struct TurnState {
891 request_count: u32,
892 usage: Usage,
893 cap: Option<u32>,
894 expand_on_progress: bool,
895 /// How many times this turn has been extended by the `before turn-end` hook. Cap is
896 /// [`Self::max_stop_hook_continues`].
897 stop_hook_continues: u32,
898 /// Hard upper limit for life-extending continues (from
899 /// [`TurnConfig::max_hook_continues`]). Prevents hooks from `Continue`ing
900 /// indefinitely.
901 max_stop_hook_continues: u32,
902}
903
904impl TurnState {
905 fn new(limit: TurnRequestLimit, max_hook_continues: u32) -> Self {
906 Self {
907 request_count: 0,
908 usage: Usage::default(),
909 cap: limit.initial_cap(),
910 expand_on_progress: limit.expand_on_progress(),
911 stop_hook_continues: 0,
912 max_stop_hook_continues: max_hook_continues,
913 }
914 }
915
916 fn note_progress(&mut self) {
917 if self.expand_on_progress
918 && let Some(cap) = self.cap.as_mut()
919 {
920 *cap = cap.saturating_add(1);
921 }
922 }
923
924 fn exceeded_request_cap(&self) -> bool {
925 match self.cap {
926 None => false,
927 Some(cap) => self.request_count >= cap,
928 }
929 }
930
931 /// Whether the `before turn-end` hook is still allowed to continue (has not reached
932 /// the hard limit).
933 fn may_stop_hook_continue(&self) -> bool {
934 self.stop_hook_continues < self.max_stop_hook_continues
935 }
936
937 /// Records one stop-hook continuation.
938 fn note_stop_hook_continue(&mut self) {
939 self.stop_hook_continues = self.stop_hook_continues.saturating_add(1);
940 }
941}
942
943// ----- helpers -----
944
945fn turn_outcome(state: &TurnState, reason: AcpStopReason) -> TurnOutcome {
946 TurnOutcome {
947 reason,
948 usage: state.usage,
949 }
950}
951
952#[cfg(test)]
953mod tests;