defect_agent/session/turn.rs
1//! Turn main loop.
2//!
3//! Turn main loop — the "heart" of the agent. This file implements the state machine.
4//!
5//! Key dependencies:
6//! - [`History`]: read/write message history
7//! - [`ToolRegistry`]: tool lookup
8//! - [`LlmProvider`]: LLM invocation
9//! - [`EventEmitter`]: event emission (shared via `Arc` so tool tasks can also emit)
10//! - [`PermissionGate`]: wait for permission requests
11
12use std::path::PathBuf;
13use std::sync::Arc;
14
15use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason as AcpStopReason};
16use tokio_util::sync::CancellationToken;
17
18use crate::event::AgentEvent;
19use crate::fs::FsBackend;
20use crate::hooks::{HookCtx, HookEngine};
21use crate::http::HttpClient;
22use crate::llm::{
23 CompletionRequest, HostedCapabilities, LlmProvider, Message, MessageContent, Role,
24 SamplingParams, StopReason as LlmStopReason, ToolChoice, Usage,
25};
26use crate::policy::SandboxPolicy;
27use crate::session::events::EventEmitter;
28use crate::session::permissions::PermissionGate;
29use crate::session::{History, ToolRegistry, TurnError};
30use crate::shell::ShellBackend;
31
32const DEFAULT_PROMPT_FILE: &str = "AGENTS.md";
33
34mod request_audit;
35
36mod compact;
37
38mod microcompact;
39
40mod compaction_slot;
41
42pub use compaction_slot::CompactionSlot;
43
44mod sanitize;
45
46mod content;
47
48mod llm_drive;
49
50mod tools;
51
52mod hooks;
53
54use content::content_block_to_message_content;
55use hooks::UserPromptHookFlow;
56use llm_drive::{assistant_message, real_input_tokens};
57use tools::{Approved, DecisionFlow, approved_tool_name, tool_results_message};
58
59pub(crate) use request_audit::RequestAuditTracker;
60
61/// Strategy for capping LLM calls.
62#[derive(Debug, Clone, Copy)]
63pub enum TurnRequestLimit {
64 /// No upper limit.
65 Unbounded,
66 /// Fixed limit: returns [`AcpStopReason::MaxTurnRequests`] after N turns.
67 Fixed(u32),
68 /// Adaptive: each time a tool use is approved and executed in the current turn, that
69 /// counts as progress and the limit is automatically incremented by 1; otherwise,
70 /// termination follows [`Self::Fixed`].
71 Adaptive {
72 initial: u32,
73 expand_on_progress: bool,
74 },
75}
76
77impl TurnRequestLimit {
78 fn initial_cap(&self) -> Option<u32> {
79 match *self {
80 Self::Unbounded => None,
81 Self::Fixed(n) => Some(n),
82 Self::Adaptive { initial, .. } => Some(initial),
83 }
84 }
85
86 fn expand_on_progress(&self) -> bool {
87 matches!(
88 self,
89 Self::Adaptive {
90 expand_on_progress: true,
91 ..
92 }
93 )
94 }
95}
96
97/// Configuration for a turn.
98#[derive(Debug, Clone)]
99pub struct TurnConfig {
100 /// The selected provider vendor (the provider half of the selection key). Together
101 /// with [`Self::model`], this is used to resolve the actual provider entry in the
102 /// registry by the `(vendor, model)` pair.
103 pub provider: String,
104 pub model: String,
105 pub allowed_models: Option<Vec<String>>,
106 pub base_prompt: BasePromptConfig,
107 pub system_prompt: Option<String>,
108 pub prompt: PromptConfig,
109 pub sampling: SamplingParams,
110 pub request_limit: TurnRequestLimit,
111 /// Explicit absolute override for the compaction threshold (in tokens). When `Some`,
112 /// takes precedence over the value inferred from [`Self::compact_ratio`]. When
113 /// `None`, the threshold is automatically derived from the ratio.
114 pub compact_threshold_tokens: Option<u64>,
115 /// Compression threshold as a fraction of the model's `context_window` (e.g. `0.85` =
116 /// trigger when usage exceeds 85%). This is the **hard watermark**: when reached, if
117 /// no background compaction is in flight, the turn main loop performs **synchronous**
118 /// compaction as a fallback (blocking the current turn but guaranteeing the context
119 /// is not exceeded). `None` = no automatic ratio-based compression (and if no
120 /// absolute threshold is set either, no compression occurs for this turn). Only
121 /// effective when `compact_threshold_tokens` is `None` and the model exposes a
122 /// `context_window`. See `session/turn/compact.rs` for details.
123 pub compact_ratio: Option<f64>,
124 /// **Background full compaction** toggle. When `true`, once the soft watermark
125 /// derived from [`Self::compact_soft_ratio`] is exceeded, a summarization compaction
126 /// is started **asynchronously** (without blocking the current turn); it quietly
127 /// compresses history before the hard watermark is hit. When disabled, only
128 /// synchronous compaction at the hard watermark remains. See
129 /// `session/turn/compaction_slot.rs`.
130 pub background_compact_enabled: bool,
131 /// Soft watermark for background compaction, as a fraction of `context_window`
132 /// (default `0.7`). Must be less than `compact_ratio` (the hard watermark) to leave a
133 /// window between soft and hard for background summarization to complete. Only
134 /// effective when `background_compact_enabled` is set and a threshold can be derived.
135 pub compact_soft_ratio: Option<f64>,
136 /// Enables **micro‑compaction**. When `true`, each turn first runs a micro‑compaction
137 /// (cleans oversized `tool_result` in older turns, without calling the LLM or
138 /// deleting messages) at the water level above [`Self::microcompact_ratio`],
139 /// deferring expensive full compaction. See `session/turn/microcompact.rs`.
140 pub microcompact_enabled: bool,
141 /// Micro‑compaction watermark as a fraction of `context_window` (default `0.6`).
142 /// Typically below the soft watermark — micro‑compaction is the cheapest first line
143 /// of defense. Only effective when `microcompact_enabled` and a threshold can be
144 /// derived.
145 pub microcompact_ratio: Option<f64>,
146 pub max_llm_retries: u32,
147 /// `0` = unlimited. The default is unlimited.
148 pub max_concurrent_tools: usize,
149 /// Hard upper limit on forced continuations from the `before turn-end` hook —
150 /// prevents infinite loops from repeated hook `Continue` calls. Default: 3.
151 pub max_hook_continues: u32,
152 /// Maximum subagent nesting depth (vertical recursion limit) for this turn.
153 /// `spawn_agent` uses this to pass the "remaining depth" to tools via
154 /// [`crate::tool::ToolContext::subagent_depth`]; a child agent's nested turn receives
155 /// `subagent_max_depth` = parent's remaining depth minus one. `0` means this turn's
156 /// tool set contains no `spawn_agent`, structurally forbidding dispatch — replacing
157 /// the old hardcoded gate of "whitelist never contains `spawn_agent`". Default:
158 /// `DEFAULT_SUBAGENT_MAX_DEPTH`.
159 pub subagent_max_depth: u32,
160}
161
162impl Default for TurnConfig {
163 fn default() -> Self {
164 Self {
165 provider: String::new(),
166 model: String::new(),
167 allowed_models: None,
168 base_prompt: BasePromptConfig::default(),
169 system_prompt: None,
170 prompt: PromptConfig::default(),
171 sampling: SamplingParams::default(),
172 request_limit: TurnRequestLimit::Adaptive {
173 initial: 32,
174 expand_on_progress: true,
175 },
176 compact_threshold_tokens: None,
177 // Trigger compaction at 85% of `context_window` (hard watermark), reserving
178 // ~15% for summary output and headroom — within the reasonable range of codex
179 // (90%), Claude (~93%), and opencode (window-20k).
180 compact_ratio: Some(0.85),
181 // Background compaction enabled by default: starts async summarization at
182 // soft=0.7, aiming to finish before hard=0.85 is reached.
183 background_compact_enabled: true,
184 compact_soft_ratio: Some(0.7),
185 // Micro‑compaction enabled by default: at 0.6 it evicts old large
186 // `tool_result`s — the cheapest first line of defense.
187 microcompact_enabled: true,
188 microcompact_ratio: Some(0.6),
189 max_llm_retries: 3,
190 max_concurrent_tools: 0,
191 max_hook_continues: DEFAULT_MAX_HOOK_CONTINUES,
192 subagent_max_depth: DEFAULT_SUBAGENT_MAX_DEPTH,
193 }
194 }
195}
196
197#[derive(Debug, Clone, PartialEq, Eq, Default)]
198pub struct BasePromptConfig {
199 pub file: Option<PathBuf>,
200 pub text: Option<String>,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct PromptConfig {
205 pub file: String,
206 pub text: Option<String>,
207 pub provider_overlays: std::collections::BTreeMap<String, String>,
208 pub model_overlays: std::collections::BTreeMap<String, String>,
209}
210
211impl Default for PromptConfig {
212 fn default() -> Self {
213 Self {
214 file: DEFAULT_PROMPT_FILE.to_owned(),
215 text: None,
216 provider_overlays: std::collections::BTreeMap::new(),
217 model_overlays: std::collections::BTreeMap::new(),
218 }
219 }
220}
221/// All dependencies and accumulated state for a single turn execution.
222///
223/// This struct is constructed by [`crate::session::DefaultSession`] on each `run_turn`
224/// call,
225/// borrowing sub-components of the session and being dropped after the turn completes.
226pub struct TurnRunner<'a> {
227 pub history: &'a dyn History,
228 pub tools: &'a dyn ToolRegistry,
229 pub provider: &'a dyn LlmProvider,
230 /// The active policy for this turn's snapshot. Owned as an `Arc` rather than
231 /// borrowed: it flows with [`crate::tool::ToolContext`] into `spawn_agent`, where
232 /// child agents wrap it with
233 /// [`NonInteractivePolicy`](crate::policy::NonInteractivePolicy) — must be the
234 /// parent's actual policy at this moment.
235 pub policy: Arc<dyn SandboxPolicy>,
236 pub events: Arc<EventEmitter>,
237 pub permissions: &'a PermissionGate,
238 pub cancel: CancellationToken,
239 pub config: &'a TurnConfig,
240 /// The system prompt resolved for this turn. `Arc<str>`: each `build_request` call
241 /// `clone`s it into `CompletionRequest.system`; the `Arc` reduces this to a reference
242 /// count increment.
243 pub system_prompt: Option<Arc<str>>,
244 pub cwd: &'a std::path::Path,
245 pub fs: Arc<dyn FsBackend>,
246 pub shell: Arc<dyn ShellBackend>,
247 pub http: Arc<dyn HttpClient>,
248 /// Hosted capabilities determined at session startup.
249 /// Reused directly on each turn when assembling requests, without re-querying.
250 pub hosted_capabilities: HostedCapabilities,
251 /// Hook engine. The turn main loop emits Sync events at four points
252 /// (`UserPromptSubmit` / `PreToolUse` / `PostToolUse` / `PostToolUseFailure`).
253 /// Waits for hooks to finish before proceeding.
254 pub hooks: &'a dyn HookEngine,
255 /// Current session ID. Injected into `HookCtx` so that hook handlers can route or
256 /// audit by session.
257 pub session_id: &'a SessionId,
258 /// Session-level background task handle. When `Some`, enables the tool's
259 /// `run_in_background` capability (injected into tools via
260 /// [`crate::tool::ToolContext::background`]); nested sub-agent turns receive `None`,
261 /// structurally preventing background task self-replication.
262 pub background: Option<crate::session::BackgroundTasks>,
263 /// Shared state for the `--goal` goal-driven loop. When `Some`, this session is
264 /// running in goal mode: it is injected into the `goal_done` tool via
265 /// [`crate::tool::ToolContext::goal`], and the `goal-gate` hook uses it in
266 /// `before_turn_end` to allow or extend the session. `None` = non-goal mode
267 /// (default).
268 pub goal: Option<Arc<crate::session::GoalState>>,
269 /// Session-level single-flight compaction slot. When `Some`, background full
270 /// compaction is available — exceeding the soft watermark triggers an async summary
271 /// compaction without blocking the current turn. Nested sub-agent turns pass `None`
272 /// (sub-agent contexts are short-lived and should not spawn background tasks).
273 /// Requires `Arc<dyn History>`/`Arc<dyn LlmProvider>` for the task to hold `'static`
274 /// references across turns, hence the accompanying `history_arc`/`provider_arc`.
275 pub compaction_slot: Option<crate::session::CompactionSlot>,
276 /// `Arc<dyn History>` for `compaction_slot` (points to the same object as the
277 /// `history` borrow). Held by the background compaction task across turns. When
278 /// `None`, background compaction is unavailable and falls back to synchronous
279 /// compaction only.
280 pub history_arc: Option<Arc<dyn History>>,
281 /// `Arc` for the provider used by `compaction_slot`. Same as above.
282 pub provider_arc: Option<Arc<dyn LlmProvider>>,
283 /// Session-level cancellation token; the background compaction task's cancellation
284 /// token is derived from this one (independent of the turn's cancel, cancelled when
285 /// the session ends). When `None`, background compaction uses the turn's `cancel`
286 /// (sub-agent path).
287 pub session_cancel: Option<CancellationToken>,
288 /// The ingestion source for this turn's input — determines the `source` field of the
289 /// `before_ingest` step envelope.
290 /// User turns use `User`; background continuation turns started by the session driver
291 /// use `Background`.
292 pub ingest_source: crate::hooks::step::IngestSource,
293 /// Request stability diagnostics: compares snapshots of two consecutive requests
294 /// actually sent to the provider, helping locate sources of high volatility in low
295 /// prompt cache hit rates.
296 pub(crate) request_audit: &'a RequestAuditTracker,
297}
298
299impl<'a> TurnRunner<'a> {
300 /// Runs a single turn.
301 pub async fn run(&self, prompt: Vec<ContentBlock>) -> Result<AcpStopReason, TurnError> {
302 // ① UserPromptSubmit hook (sync interception)
303 // Gives hooks a chance to rewrite or intercept the prompt before it lands in
304 // history.
305 let prompt = match self.fire_user_prompt_submit(prompt).await {
306 UserPromptHookFlow::Continue(p) => p,
307 UserPromptHookFlow::Refused => {
308 // Hook blocked: do not emit `UserPromptCommitted`, do not append to
309 // history; return `Refusal` directly so the ACP bridge responds with
310 // `PromptResponse`.
311 return Ok(AcpStopReason::Refusal);
312 }
313 };
314
315 self.events
316 .emit(AgentEvent::UserPromptCommitted {
317 content: prompt.clone(),
318 })
319 .await;
320 self.history.append(Message {
321 role: Role::User,
322 content: prompt
323 .into_iter()
324 .map(content_block_to_message_content)
325 .collect::<Result<Vec<_>, _>>()?
326 .into_iter()
327 .flatten()
328 .collect(),
329 });
330
331 // After Ingest hook: input has been merged into history. Injection only.
332 {
333 let mut step = crate::hooks::step::AfterIngest {
334 committed_len: 1,
335 additional_context: Vec::new(),
336 };
337 let _ = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
338 if !step.additional_context.is_empty() {
339 self.append_user_feedback(step.additional_context);
340 }
341 }
342
343 self.events.emit(AgentEvent::TurnStarted).await;
344
345 // After-turn-enter hook: the turn scope has been entered. Allows injecting system
346 // context or a Break to reject the turn.
347 // Note: currently the hook point is placed after prompt ingestion (moving the hook
348 // point earlier is a deferred adjustment).
349 {
350 let mut step = crate::hooks::step::AfterTurnEnter {
351 is_subagent: false,
352 agent_type: None,
353 additional_context: Vec::new(),
354 };
355 let control = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
356 if !step.additional_context.is_empty() {
357 self.append_user_feedback(step.additional_context);
358 }
359 if let crate::hooks::step::HookControl::Break { .. } = control {
360 return Ok(AcpStopReason::EndTurn);
361 }
362 }
363
364 let result = self.run_inner().await;
365
366 if let Ok(outcome) = &result {
367 self.events
368 .emit(AgentEvent::TurnEnded {
369 reason: outcome.reason,
370 usage: outcome.usage,
371 })
372 .await;
373 }
374 // The `Err` path does not emit `TurnEnded`; the bridge layer decides the wire
375 // response based on the future outcome.
376
377 result.map(|outcome| outcome.reason)
378 }
379
380 async fn run_inner(&self) -> Result<TurnOutcome, TurnError> {
381 let mut state = TurnState::new(self.config.request_limit, self.config.max_hook_continues);
382 loop {
383 if self.cancel.is_cancelled() {
384 return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
385 }
386
387 self.manage_context().await?;
388
389 let mut req = self.build_request();
390
391 // Before Generate hook: can modify request (model), short-circuit (fill in
392 // synthetic assistant to skip LLM), or Break.
393 let mut before_gen = crate::hooks::step::BeforeGenerate {
394 model: req.model.clone(),
395 message_count: req.messages.len(),
396 attempt: state.request_count.saturating_add(1),
397 assistant_text: None,
398 };
399 let bg_control = self.hooks.dispatch(&mut before_gen, self.hook_ctx()).await;
400 req.model = before_gen.model;
401 if let Some(text) = before_gen.assistant_text {
402 // Short-circuit: use a synthetic assistant reply to skip the real LLM
403 // call, then proceed to the before-turn-end check.
404 self.history.append(Message {
405 role: Role::Assistant,
406 content: vec![MessageContent::Text { text }].into(),
407 });
408 if self.decide_turn_end(&mut state).await {
409 continue;
410 }
411 return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
412 }
413 if let crate::hooks::step::HookControl::Break { reason } = bg_control {
414 return Ok(turn_outcome(&state, reason));
415 }
416
417 let (mut stream, attempt) = self.call_llm_with_retry(&req, &mut state).await?;
418
419 let outcome = self.drain_provider_stream(&mut stream, &mut state).await?;
420
421 if outcome.cancelled {
422 return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
423 }
424
425 // The stream has been drained and all usage for this call is available — emit
426 // `LlmCallFinished` with the **per-call** actual usage (`outcome.usage`, not
427 // the turn-accumulated `state.usage`).
428 self.events
429 .emit(AgentEvent::LlmCallFinished {
430 model: req.model.clone(),
431 attempt,
432 usage: outcome.usage,
433 error: None,
434 })
435 .await;
436
437 // After the Generate hook: observe (usage / stop / error). No output to fill;
438 // to intervene, route the next turn through before-turn-end.
439 let stop_reason_for_hook = match outcome.stop {
440 LlmStopReason::EndTurn | LlmStopReason::StopSequence => AcpStopReason::EndTurn,
441 LlmStopReason::Refusal => AcpStopReason::Refusal,
442 LlmStopReason::MaxTokens => AcpStopReason::MaxTokens,
443 LlmStopReason::ToolUse => AcpStopReason::EndTurn,
444 };
445 let mut after_gen = crate::hooks::step::AfterGenerate {
446 model: req.model.clone(),
447 usage: outcome.usage,
448 stop: stop_reason_for_hook,
449 error: None,
450 };
451 let _ = self.hooks.dispatch(&mut after_gen, self.hook_ctx()).await;
452
453 // Feed the actual input token count returned by this call into `history` as
454 // the precise baseline for compaction threshold decisions (see
455 // `session/turn/compact.rs`). The messages sent in this call are
456 // `req.messages`, and their real input size is the sum of the three
457 // input-side fields in `outcome.usage`.
458 if let Some(real_input) = real_input_tokens(&outcome.usage) {
459 self.history.record_input_tokens(real_input);
460 }
461
462 let assistant = assistant_message(&outcome);
463 if !assistant.content.is_empty() {
464 self.history.append(assistant);
465 }
466
467 // Passive stop (Refusal / MaxTokens): skip the before-turn-end hook (the hook
468 // cannot extend these), exit directly.
469 match outcome.stop {
470 LlmStopReason::EndTurn | LlmStopReason::StopSequence => {
471 // Voluntary stop → before-turn-end decision point.
472 if self.decide_turn_end(&mut state).await {
473 continue;
474 }
475 return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
476 }
477 LlmStopReason::Refusal => {
478 return Ok(turn_outcome(&state, AcpStopReason::Refusal));
479 }
480 LlmStopReason::MaxTokens => {
481 return Ok(turn_outcome(&state, AcpStopReason::MaxTokens));
482 }
483 LlmStopReason::ToolUse => {}
484 }
485
486 if outcome.tool_uses.is_empty() {
487 // Voluntary stop (no tool requested) → same before-turn-end decision
488 // point.
489 if self.decide_turn_end(&mut state).await {
490 continue;
491 }
492 return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
493 }
494
495 // Before the Permission hook (currently only observes/stubs; policy still
496 // delegates to the authority).
497 for tu in &outcome.tool_uses {
498 let mut bp = crate::hooks::step::BeforePermission {
499 tool: tu.name.clone(),
500 decision: "ask".to_string(),
501 resolved: None,
502 };
503 let _ = self.hooks.dispatch(&mut bp, self.hook_ctx()).await;
504 }
505
506 let approved = match self.decide_permissions(&outcome.tool_uses).await? {
507 DecisionFlow::Continue(list) => list,
508 DecisionFlow::Cancelled => {
509 return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
510 }
511 };
512
513 // After permission hook (currently only observes/stubs).
514 for a in &approved {
515 let (tool, granted) = match a {
516 Approved::Run { .. } => (approved_tool_name(a), true),
517 Approved::Denied { .. } | Approved::FailedArgs { .. } => {
518 (approved_tool_name(a), false)
519 }
520 };
521 let mut ap = crate::hooks::step::AfterPermission { tool, granted };
522 let _ = self.hooks.dispatch(&mut ap, self.hook_ctx()).await;
523 }
524
525 let progressed = approved.iter().any(|a| matches!(a, Approved::Run { .. }));
526 if progressed {
527 state.note_progress();
528 }
529
530 let results = self.run_tools_concurrently(approved).await;
531
532 // After `ToolBatch` hook: after all parallel tools finish, before the next
533 // LLM call. Allows injection or graceful break.
534 let mut batch = crate::hooks::step::AfterToolBatch {
535 results: results
536 .iter()
537 .map(|r| crate::hooks::step::ToolBatchEntry {
538 tool_name: r.name.clone(),
539 is_error: r.is_error,
540 })
541 .collect(),
542 additional_context: Vec::new(),
543 };
544 let batch_control = self.hooks.dispatch(&mut batch, self.hook_ctx()).await;
545
546 self.history.append(tool_results_message(results));
547 if !batch.additional_context.is_empty() {
548 self.append_user_feedback(batch.additional_context);
549 }
550 if let crate::hooks::step::HookControl::Break { reason } = batch_control {
551 return Ok(turn_outcome(&state, reason));
552 }
553
554 if state.exceeded_request_cap() {
555 return Ok(turn_outcome(&state, AcpStopReason::MaxTurnRequests));
556 }
557 }
558 }
559
560 fn build_request(&self) -> CompletionRequest {
561 // Before sending the request, pair any orphaned `tool_use` (left over from an
562 // interruption, with no matching `tool_result`) — otherwise the provider will
563 // permanently reject the request. Only patch the copy sent to the provider; the
564 // true history remains untouched. See `sanitize`.
565 let messages = sanitize::sanitize_tool_pairing(self.history.snapshot());
566 let req = CompletionRequest {
567 model: self.config.model.clone(),
568 system: self.system_prompt.clone(),
569 messages,
570 tools: self.tools.schemas(),
571 tool_choice: ToolChoice::Auto,
572 sampling: self.config.sampling.clone(),
573 hosted_capabilities: self.hosted_capabilities,
574 };
575 self.request_audit.record(&req);
576 req
577 }
578
579 /// Layered context management: micro → soft (background) → hard (synchronous
580 /// fallback). Called at the start of each main loop iteration.
581 ///
582 /// Three water levels (see `compact_thresholds` for details):
583 /// 1. **micro** (default 0.6·window): if micro-compaction is enabled, first evict old
584 /// large `tool_result` entries — no LLM calls, no message deletion, near-zero
585 /// latency, deferring expensive full compaction.
586 /// 2. **soft** (default 0.7·window): if background compaction is enabled,
587 /// **asynchronously** start a summary compaction; the turn does not block, quietly
588 /// compacting before hitting hard (single-flight, won't re-start if one is already
589 /// in flight).
590 /// 3. **hard** (default 0.85·window, equivalent to the old `compact_ratio`
591 /// semantics): at this level compaction is mandatory — if a background compaction
592 /// is already in flight, `await` its completion; otherwise compact
593 /// **synchronously** as a fallback.
594 ///
595 /// micro/soft require the model to expose `context_window`; hard also supports an
596 /// absolute override via `compact_threshold_tokens`. If any level cannot obtain its
597 /// threshold, that level is skipped (preserving the conservative "no information, no
598 /// compaction" semantics).
599 async fn manage_context(&self) -> Result<(), TurnError> {
600 let thresholds = self.compact_thresholds();
601 // All three thresholds absent → no proactive compaction this turn (preserves the
602 // existing semantics).
603 if thresholds.is_empty() {
604 return Ok(());
605 }
606 let Some(estimate) = self.history.token_estimate() else {
607 return Ok(());
608 };
609
610 // ① micro: synchronous, cheapest. May reduce `estimate` below soft/hard
611 // thresholds, so re-fetch after compaction.
612 let estimate = if self.config.microcompact_enabled
613 && thresholds.micro.is_some_and(|t| estimate >= t)
614 {
615 self.run_microcompact().await;
616 self.history.token_estimate().unwrap_or(estimate)
617 } else {
618 estimate
619 };
620
621 // ② soft: crossing the threshold triggers an async background compaction without
622 // blocking the current round.
623 if self.config.background_compact_enabled
624 && let (Some(soft), Some(hard)) = (thresholds.soft, thresholds.hard)
625 && estimate >= soft
626 && estimate < hard
627 {
628 self.spawn_background_compaction(hard).await;
629 // Non-blocking – continue assembling requests this round; summary persistence
630 // happens in a later round (or later).
631 return Ok(());
632 }
633
634 // ③ hard: must compact.
635 if let Some(hard) = thresholds.hard
636 && estimate >= hard
637 {
638 self.compact_hard(estimate, hard).await?;
639 }
640 Ok(())
641 }
642
643 /// Run a micro-compact and write back via `replace`. Best-effort: does nothing if
644 /// there is nothing to clean up.
645 async fn run_microcompact(&self) {
646 let messages = self.history.snapshot();
647 let Some((rebuilt, report)) = microcompact::run(&messages) else {
648 return;
649 };
650 self.history.replace(rebuilt);
651 tracing::info!(
652 cleared = report.cleared,
653 tokens_before = report.tokens_before,
654 tokens_after = report.tokens_after,
655 "context microcompacted"
656 );
657 self.events
658 .emit(AgentEvent::ContextMicrocompacted {
659 tokens_before: report.tokens_before,
660 tokens_after: report.tokens_after,
661 cleared: report.cleared,
662 })
663 .await;
664 }
665
666 /// Spawns a single-flight background full compaction when the soft threshold is
667 /// exceeded. Requires a slot and `Arc` references to history and provider (only
668 /// available in the top-level turn; child agent turns silently skip this, leaving it
669 /// to the synchronous compaction at the hard threshold).
670 async fn spawn_background_compaction(&self, hard_threshold: u64) {
671 let (Some(slot), Some(history_arc), Some(provider_arc)) = (
672 self.compaction_slot.as_ref(),
673 self.history_arc.as_ref(),
674 self.provider_arc.as_ref(),
675 ) else {
676 return;
677 };
678 if slot.is_in_flight() {
679 return; // Single-flight: a compaction is already in flight, do not start another.
680 }
681
682 // The compaction cancel token is independent of the turn cancel — the summary
683 // should be allowed to finish even if the originating turn has ended; however, it
684 // is cancelled when the session ends. For sub-agent paths that have no
685 // `session_cancel`, it falls back to the turn cancel.
686 let cancel = self
687 .session_cancel
688 .clone()
689 .unwrap_or_else(|| self.cancel.clone())
690 .child_token();
691 let ctx = compact::CompactionCtx {
692 provider: provider_arc.clone(),
693 model: self.config.model.clone(),
694 sampling: self.config.sampling.clone(),
695 tools: self.tools.schemas(),
696 cancel,
697 };
698 let events = self.events.clone();
699 let on_done: Arc<
700 dyn Fn(crate::session::CompactionReport) -> futures::future::BoxFuture<'static, ()>
701 + Send
702 + Sync,
703 > = Arc::new(move |report| {
704 // Return a future so that `emit` is awaited inside the compaction task body —
705 // no detached task is spawned, and event emission is governed by the
706 // compaction task's cancel/track semantics.
707 let events = events.clone();
708 Box::pin(async move {
709 events
710 .emit(AgentEvent::ContextCompressed {
711 tokens_before: report.tokens_before,
712 tokens_after: report.tokens_after,
713 })
714 .await;
715 })
716 });
717 let started = slot.try_spawn(history_arc.clone(), ctx, hard_threshold, on_done);
718 if started {
719 tracing::info!(hard_threshold, "background compaction started");
720 }
721 }
722
723 /// Hard threshold fallback: wait for an in-flight background compaction to finish, or
724 /// run a synchronous compaction.
725 async fn compact_hard(&self, estimate: u64, hard: u64) -> Result<(), TurnError> {
726 // A background compaction is already in flight; wait for it to finish to avoid
727 // redundant work.
728 if let Some(slot) = self.compaction_slot.as_ref()
729 && slot.is_in_flight()
730 {
731 slot.await_in_flight().await;
732 return Ok(());
733 }
734
735 // Before the compact hook: the hook may `Skip` to veto this compaction (a
736 // mutating step).
737 let mut before = crate::hooks::step::BeforeCompact {
738 token_estimate: estimate,
739 threshold: hard,
740 };
741 if let crate::hooks::step::HookControl::Skip =
742 self.hooks.dispatch(&mut before, self.hook_ctx()).await
743 {
744 tracing::info!("compaction vetoed by before-compact hook");
745 return Ok(());
746 }
747
748 let ctx = self.sync_compaction_ctx();
749 let Some(report) = compact::run_sync(self.history, &ctx, hard).await else {
750 // No safe compaction boundary (e.g., a single very long turn) — skip this
751 // round, no event emitted.
752 return Ok(());
753 };
754 self.events
755 .emit(AgentEvent::ContextCompressed {
756 tokens_before: report.tokens_before,
757 tokens_after: report.tokens_after,
758 })
759 .await;
760
761 // After the compact hook: observe and allow injection (injected content goes into
762 // history).
763 let mut after = crate::hooks::step::AfterCompact {
764 tokens_before: report.tokens_before,
765 tokens_after: report.tokens_after,
766 additional_context: Vec::new(),
767 };
768 let _ = self.hooks.dispatch(&mut after, self.hook_ctx()).await;
769 if !after.additional_context.is_empty() {
770 self.append_user_feedback(after.additional_context);
771 }
772 Ok(())
773 }
774
775 /// The [`compact::CompactionCtx`] for synchronous compaction. Wrapping a borrowed
776 /// provider in a temporary `Arc` is not feasible (a trait object borrow cannot be
777 /// `Arc`), so the synchronous path requires `provider_arc`. Falling back when it is
778 /// missing is impossible (the top-level always has one; child agents use a borrowed
779 /// `provider`—see the `sync_compaction_ctx` implementation).
780 fn sync_compaction_ctx(&self) -> compact::CompactionCtx {
781 compact::CompactionCtx {
782 provider: self
783 .provider_arc
784 .clone()
785 .expect("sync compaction requires provider_arc"),
786 model: self.config.model.clone(),
787 sampling: self.config.sampling.clone(),
788 tools: self.tools.schemas(),
789 cancel: self.cancel.clone(),
790 }
791 }
792
793 /// Parse the three-tier compaction thresholds (in tokens) for this turn. Any tier set
794 /// to `None` means that tier is not triggered.
795 fn compact_thresholds(&self) -> CompactThresholds {
796 let window = self
797 .provider
798 .model_info(&self.config.model)
799 .and_then(|m| m.context_window);
800
801 // For `hard`, an absolute threshold takes precedence; otherwise, use `ratio *
802 // window`.
803 let hard = self.config.compact_threshold_tokens.or_else(|| {
804 let ratio = self.config.compact_ratio?;
805 ratio_threshold(window?, ratio)
806 });
807 // micro/soft can only be derived from window (absolute overrides apply only to
808 // hard).
809 let from_ratio =
810 |ratio: Option<f64>| ratio.and_then(|r| window.and_then(|w| ratio_threshold(w, r)));
811 CompactThresholds {
812 micro: from_ratio(self.config.microcompact_ratio),
813 soft: from_ratio(self.config.compact_soft_ratio),
814 hard,
815 }
816 }
817
818 pub(super) fn hook_ctx(&self) -> HookCtx<'_> {
819 HookCtx::new(self.session_id, self.cwd, self.cancel.clone())
820 }
821}
822
823// ----- internal types -----
824
825#[derive(Clone, Copy)]
826struct TurnOutcome {
827 reason: AcpStopReason,
828 usage: Usage,
829}
830
831/// Three-tier compaction watermarks (in tokens). Each `None` means that tier is not
832/// triggered this turn.
833#[derive(Clone, Copy)]
834struct CompactThresholds {
835 micro: Option<u64>,
836 soft: Option<u64>,
837 hard: Option<u64>,
838}
839
840impl CompactThresholds {
841 /// All three thresholds absent — no proactive compaction this turn.
842 fn is_empty(&self) -> bool {
843 self.micro.is_none() && self.soft.is_none() && self.hard.is_none()
844 }
845}
846
847/// `context_window * ratio` rounded down. `ratio` is in `(0, 1]`. `0` → `None` (no
848/// trigger).
849fn ratio_threshold(context_window: u64, ratio: f64) -> Option<u64> {
850 let threshold = (context_window as f64 * ratio).floor() as u64;
851 (threshold > 0).then_some(threshold)
852}
853
854/// Default upper limit for forced continuations in the `before turn-end` hook. Can be
855/// overridden by [`TurnConfig::max_hook_continues`] (config key
856/// `[turn].max_hook_continues`). See docs on hook step context exit semantics.
857pub(crate) const DEFAULT_MAX_HOOK_CONTINUES: u32 = 3;
858
859/// Default upper bound for subagent vertical recursion depth. Counted from the top-level
860/// turn: N levels means the top turn can spawn subagents, their children can spawn
861/// further, and so on, until the Nth level (where `subagent_max_depth` reaches 0) can no
862/// longer call `spawn_agent`. The default of 4 leaves room for orchestrations like "main
863/// agent → coordinator subagent → worker subagent" while preventing runaway vertical
864/// growth. Horizontal runaway is separately guarded by `request_limit`.
865pub(crate) const DEFAULT_SUBAGENT_MAX_DEPTH: u32 = 1;
866
867struct TurnState {
868 request_count: u32,
869 usage: Usage,
870 cap: Option<u32>,
871 expand_on_progress: bool,
872 /// How many times this turn has been extended by the `before turn-end` hook. Cap is
873 /// [`Self::max_stop_hook_continues`].
874 stop_hook_continues: u32,
875 /// Hard upper limit for life-extending continues (from
876 /// [`TurnConfig::max_hook_continues`]). Prevents hooks from `Continue`ing
877 /// indefinitely.
878 max_stop_hook_continues: u32,
879}
880
881impl TurnState {
882 fn new(limit: TurnRequestLimit, max_hook_continues: u32) -> Self {
883 Self {
884 request_count: 0,
885 usage: Usage::default(),
886 cap: limit.initial_cap(),
887 expand_on_progress: limit.expand_on_progress(),
888 stop_hook_continues: 0,
889 max_stop_hook_continues: max_hook_continues,
890 }
891 }
892
893 fn note_progress(&mut self) {
894 if self.expand_on_progress
895 && let Some(cap) = self.cap.as_mut()
896 {
897 *cap = cap.saturating_add(1);
898 }
899 }
900
901 fn exceeded_request_cap(&self) -> bool {
902 match self.cap {
903 None => false,
904 Some(cap) => self.request_count >= cap,
905 }
906 }
907
908 /// Whether the `before turn-end` hook is still allowed to continue (has not reached
909 /// the hard limit).
910 fn may_stop_hook_continue(&self) -> bool {
911 self.stop_hook_continues < self.max_stop_hook_continues
912 }
913
914 /// Records one stop-hook continuation.
915 fn note_stop_hook_continue(&mut self) {
916 self.stop_hook_continues = self.stop_hook_continues.saturating_add(1);
917 }
918}
919
920// ----- helpers -----
921
922fn turn_outcome(state: &TurnState, reason: AcpStopReason) -> TurnOutcome {
923 TurnOutcome {
924 reason,
925 usage: state.usage,
926 }
927}
928
929#[cfg(test)]
930mod tests;