Skip to main content

defect_agent/session/
turn.rs

1//! Turn main loop.
2//!
3//! Turn main loop — the "heart" of the agent. This file implements the state machine.
4//!
5//! Key dependencies:
6//! - [`History`]: read/write message history
7//! - [`ToolRegistry`]: tool lookup
8//! - [`LlmProvider`]: LLM invocation
9//! - [`EventEmitter`]: event emission (shared via `Arc` so tool tasks can also emit)
10//! - [`PermissionGate`]: wait for permission requests
11
12use std::path::PathBuf;
13use std::sync::Arc;
14
15use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason as AcpStopReason};
16use tokio_util::sync::CancellationToken;
17
18use crate::event::AgentEvent;
19use crate::fs::FsBackend;
20use crate::hooks::{HookCtx, HookEngine};
21use crate::http::HttpClient;
22use crate::llm::{
23    CompletionRequest, HostedCapabilities, LlmProvider, Message, MessageContent, Role,
24    SamplingParams, StopReason as LlmStopReason, ToolChoice, Usage,
25};
26use crate::policy::SandboxPolicy;
27use crate::session::events::EventEmitter;
28use crate::session::permissions::PermissionGate;
29use crate::session::{History, ToolRegistry, TurnError};
30use crate::shell::ShellBackend;
31
32const DEFAULT_PROMPT_FILE: &str = "AGENTS.md";
33
34mod request_audit;
35
36mod compact;
37
38mod microcompact;
39
40mod compaction_slot;
41
42pub use compaction_slot::CompactionSlot;
43
44mod sanitize;
45
46mod content;
47
48mod llm_drive;
49
50mod tools;
51
52mod hooks;
53
54use content::content_block_to_message_content;
55use hooks::UserPromptHookFlow;
56use llm_drive::{assistant_message, real_input_tokens};
57use tools::{Approved, DecisionFlow, approved_tool_name, tool_results_message};
58
59pub(crate) use request_audit::RequestAuditTracker;
60
61/// Strategy for capping LLM calls.
62#[derive(Debug, Clone, Copy)]
63pub enum TurnRequestLimit {
64    /// No upper limit.
65    Unbounded,
66    /// Fixed limit: returns [`AcpStopReason::MaxTurnRequests`] after N turns.
67    Fixed(u32),
68    /// Adaptive: each time a tool use is approved and executed in the current turn, that
69    /// counts as progress and the limit is automatically incremented by 1; otherwise,
70    /// termination follows [`Self::Fixed`].
71    Adaptive {
72        initial: u32,
73        expand_on_progress: bool,
74    },
75}
76
77impl TurnRequestLimit {
78    fn initial_cap(&self) -> Option<u32> {
79        match *self {
80            Self::Unbounded => None,
81            Self::Fixed(n) => Some(n),
82            Self::Adaptive { initial, .. } => Some(initial),
83        }
84    }
85
86    fn expand_on_progress(&self) -> bool {
87        matches!(
88            self,
89            Self::Adaptive {
90                expand_on_progress: true,
91                ..
92            }
93        )
94    }
95}
96
97/// Configuration for a turn.
98#[derive(Debug, Clone)]
99pub struct TurnConfig {
100    /// The selected provider vendor (the provider half of the selection key). Together
101    /// with [`Self::model`], this is used to resolve the actual provider entry in the
102    /// registry by the `(vendor, model)` pair.
103    pub provider: String,
104    pub model: String,
105    pub allowed_models: Option<Vec<String>>,
106    pub base_prompt: BasePromptConfig,
107    pub system_prompt: Option<String>,
108    pub prompt: PromptConfig,
109    pub sampling: SamplingParams,
110    pub request_limit: TurnRequestLimit,
111    /// Explicit absolute override for the compaction threshold (in tokens). When `Some`,
112    /// takes precedence over the value inferred from [`Self::compact_ratio`]. When
113    /// `None`, the threshold is automatically derived from the ratio.
114    pub compact_threshold_tokens: Option<u64>,
115    /// Compression threshold as a fraction of the model's `context_window` (e.g. `0.85` =
116    /// trigger when usage exceeds 85%). This is the **hard watermark**: when reached, if
117    /// no background compaction is in flight, the turn main loop performs **synchronous**
118    /// compaction as a fallback (blocking the current turn but guaranteeing the context
119    /// is not exceeded). `None` = no automatic ratio-based compression (and if no
120    /// absolute threshold is set either, no compression occurs for this turn). Only
121    /// effective when `compact_threshold_tokens` is `None` and the model exposes a
122    /// `context_window`. See `session/turn/compact.rs` for details.
123    pub compact_ratio: Option<f64>,
124    /// **Background full compaction** toggle. When `true`, once the soft watermark
125    /// derived from [`Self::compact_soft_ratio`] is exceeded, a summarization compaction
126    /// is started **asynchronously** (without blocking the current turn); it quietly
127    /// compresses history before the hard watermark is hit. When disabled, only
128    /// synchronous compaction at the hard watermark remains. See
129    /// `session/turn/compaction_slot.rs`.
130    pub background_compact_enabled: bool,
131    /// Soft watermark for background compaction, as a fraction of `context_window`
132    /// (default `0.7`). Must be less than `compact_ratio` (the hard watermark) to leave a
133    /// window between soft and hard for background summarization to complete. Only
134    /// effective when `background_compact_enabled` is set and a threshold can be derived.
135    pub compact_soft_ratio: Option<f64>,
136    /// Enables **micro‑compaction**. When `true`, each turn first runs a micro‑compaction
137    /// (cleans oversized `tool_result` in older turns, without calling the LLM or
138    /// deleting messages) at the water level above [`Self::microcompact_ratio`],
139    /// deferring expensive full compaction. See `session/turn/microcompact.rs`.
140    pub microcompact_enabled: bool,
141    /// Micro‑compaction watermark as a fraction of `context_window` (default `0.6`).
142    /// Typically below the soft watermark — micro‑compaction is the cheapest first line
143    /// of defense. Only effective when `microcompact_enabled` and a threshold can be
144    /// derived.
145    pub microcompact_ratio: Option<f64>,
146    pub max_llm_retries: u32,
147    /// `0` = unlimited. The default is unlimited.
148    pub max_concurrent_tools: usize,
149    /// Hard upper limit on forced continuations from the `before turn-end` hook —
150    /// prevents infinite loops from repeated hook `Continue` calls. Default: 3.
151    pub max_hook_continues: u32,
152    /// Maximum subagent nesting depth (vertical recursion limit) for this turn.
153    /// `spawn_agent` uses this to pass the "remaining depth" to tools via
154    /// [`crate::tool::ToolContext::subagent_depth`]; a child agent's nested turn receives
155    /// `subagent_max_depth` = parent's remaining depth minus one. `0` means this turn's
156    /// tool set contains no `spawn_agent`, structurally forbidding dispatch — replacing
157    /// the old hardcoded gate of "whitelist never contains `spawn_agent`". Default:
158    /// `DEFAULT_SUBAGENT_MAX_DEPTH`.
159    pub subagent_max_depth: u32,
160}
161
162impl Default for TurnConfig {
163    fn default() -> Self {
164        Self {
165            provider: String::new(),
166            model: String::new(),
167            allowed_models: None,
168            base_prompt: BasePromptConfig::default(),
169            system_prompt: None,
170            prompt: PromptConfig::default(),
171            sampling: SamplingParams::default(),
172            request_limit: TurnRequestLimit::Adaptive {
173                initial: 32,
174                expand_on_progress: true,
175            },
176            compact_threshold_tokens: None,
177            // Trigger compaction at 85% of `context_window` (hard watermark), reserving
178            // ~15% for summary output and headroom — within the reasonable range of codex
179            // (90%), Claude (~93%), and opencode (window-20k).
180            compact_ratio: Some(0.85),
181            // Background compaction enabled by default: starts async summarization at
182            // soft=0.7, aiming to finish before hard=0.85 is reached.
183            background_compact_enabled: true,
184            compact_soft_ratio: Some(0.7),
185            // Micro‑compaction enabled by default: at 0.6 it evicts old large
186            // `tool_result`s — the cheapest first line of defense.
187            microcompact_enabled: true,
188            microcompact_ratio: Some(0.6),
189            max_llm_retries: 3,
190            max_concurrent_tools: 0,
191            max_hook_continues: DEFAULT_MAX_HOOK_CONTINUES,
192            subagent_max_depth: DEFAULT_SUBAGENT_MAX_DEPTH,
193        }
194    }
195}
196
197#[derive(Debug, Clone, PartialEq, Eq, Default)]
198pub struct BasePromptConfig {
199    pub file: Option<PathBuf>,
200    pub text: Option<String>,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct PromptConfig {
205    pub file: String,
206    pub text: Option<String>,
207    pub provider_overlays: std::collections::BTreeMap<String, String>,
208    pub model_overlays: std::collections::BTreeMap<String, String>,
209}
210
211impl Default for PromptConfig {
212    fn default() -> Self {
213        Self {
214            file: DEFAULT_PROMPT_FILE.to_owned(),
215            text: None,
216            provider_overlays: std::collections::BTreeMap::new(),
217            model_overlays: std::collections::BTreeMap::new(),
218        }
219    }
220}
221/// All dependencies and accumulated state for a single turn execution.
222///
223/// This struct is constructed by [`crate::session::DefaultSession`] on each `run_turn`
224/// call,
225/// borrowing sub-components of the session and being dropped after the turn completes.
226pub struct TurnRunner<'a> {
227    pub history: &'a dyn History,
228    pub tools: &'a dyn ToolRegistry,
229    pub provider: &'a dyn LlmProvider,
230    /// The active policy for this turn's snapshot. Owned as an `Arc` rather than
231    /// borrowed: it flows with [`crate::tool::ToolContext`] into `spawn_agent`, where
232    /// child agents wrap it with
233    /// [`NonInteractivePolicy`](crate::policy::NonInteractivePolicy) — must be the
234    /// parent's actual policy at this moment.
235    pub policy: Arc<dyn SandboxPolicy>,
236    pub events: Arc<EventEmitter>,
237    pub permissions: &'a PermissionGate,
238    pub cancel: CancellationToken,
239    pub config: &'a TurnConfig,
240    /// The system prompt resolved for this turn. `Arc<str>`: each `build_request` call
241    /// `clone`s it into `CompletionRequest.system`; the `Arc` reduces this to a reference
242    /// count increment.
243    pub system_prompt: Option<Arc<str>>,
244    pub cwd: &'a std::path::Path,
245    pub fs: Arc<dyn FsBackend>,
246    pub shell: Arc<dyn ShellBackend>,
247    pub http: Arc<dyn HttpClient>,
248    /// Hosted capabilities determined at session startup.
249    /// Reused directly on each turn when assembling requests, without re-querying.
250    pub hosted_capabilities: HostedCapabilities,
251    /// Hook engine. The turn main loop emits Sync events at four points
252    /// (`UserPromptSubmit` / `PreToolUse` / `PostToolUse` / `PostToolUseFailure`).
253    /// Waits for hooks to finish before proceeding.
254    pub hooks: &'a dyn HookEngine,
255    /// Current session ID. Injected into `HookCtx` so that hook handlers can route or
256    /// audit by session.
257    pub session_id: &'a SessionId,
258    /// Session-level background task handle. When `Some`, enables the tool's
259    /// `run_in_background` capability (injected into tools via
260    /// [`crate::tool::ToolContext::background`]); nested sub-agent turns receive `None`,
261    /// structurally preventing background task self-replication.
262    pub background: Option<crate::session::BackgroundTasks>,
263    /// Shared state for the `--goal` goal-driven loop. When `Some`, this session is
264    /// running in goal mode: it is injected into the `goal_done` tool via
265    /// [`crate::tool::ToolContext::goal`], and the `goal-gate` hook uses it in
266    /// `before_turn_end` to allow or extend the session. `None` = non-goal mode
267    /// (default).
268    pub goal: Option<Arc<crate::session::GoalState>>,
269    /// Session-level single-flight compaction slot. When `Some`, background full
270    /// compaction is available — exceeding the soft watermark triggers an async summary
271    /// compaction without blocking the current turn. Nested sub-agent turns pass `None`
272    /// (sub-agent contexts are short-lived and should not spawn background tasks).
273    /// Requires `Arc<dyn History>`/`Arc<dyn LlmProvider>` for the task to hold `'static`
274    /// references across turns, hence the accompanying `history_arc`/`provider_arc`.
275    pub compaction_slot: Option<crate::session::CompactionSlot>,
276    /// `Arc<dyn History>` for `compaction_slot` (points to the same object as the
277    /// `history` borrow). Held by the background compaction task across turns. When
278    /// `None`, background compaction is unavailable and falls back to synchronous
279    /// compaction only.
280    pub history_arc: Option<Arc<dyn History>>,
281    /// `Arc` for the provider used by `compaction_slot`. Same as above.
282    pub provider_arc: Option<Arc<dyn LlmProvider>>,
283    /// Session-level cancellation token; the background compaction task's cancellation
284    /// token is derived from this one (independent of the turn's cancel, cancelled when
285    /// the session ends). When `None`, background compaction uses the turn's `cancel`
286    /// (sub-agent path).
287    pub session_cancel: Option<CancellationToken>,
288    /// The ingestion source for this turn's input — determines the `source` field of the
289    /// `before_ingest` step envelope.
290    /// User turns use `User`; background continuation turns started by the session driver
291    /// use `Background`.
292    pub ingest_source: crate::hooks::step::IngestSource,
293    /// Request stability diagnostics: compares snapshots of two consecutive requests
294    /// actually sent to the provider, helping locate sources of high volatility in low
295    /// prompt cache hit rates.
296    pub(crate) request_audit: &'a RequestAuditTracker,
297}
298
299impl<'a> TurnRunner<'a> {
300    /// Runs a single turn.
301    pub async fn run(&self, prompt: Vec<ContentBlock>) -> Result<AcpStopReason, TurnError> {
302        // ① UserPromptSubmit hook (sync interception)
303        // Gives hooks a chance to rewrite or intercept the prompt before it lands in
304        // history.
305        let prompt = match self.fire_user_prompt_submit(prompt).await {
306            UserPromptHookFlow::Continue(p) => p,
307            UserPromptHookFlow::Refused => {
308                // Hook blocked: do not emit `UserPromptCommitted`, do not append to
309                // history; return `Refusal` directly so the ACP bridge responds with
310                // `PromptResponse`.
311                return Ok(AcpStopReason::Refusal);
312            }
313        };
314
315        self.events
316            .emit(AgentEvent::UserPromptCommitted {
317                content: prompt.clone(),
318            })
319            .await;
320        self.history.append(Message {
321            role: Role::User,
322            content: prompt
323                .into_iter()
324                .map(content_block_to_message_content)
325                .collect::<Result<Vec<_>, _>>()?
326                .into_iter()
327                .flatten()
328                .collect(),
329        });
330
331        // After Ingest hook: input has been merged into history. Injection only.
332        {
333            let mut step = crate::hooks::step::AfterIngest {
334                committed_len: 1,
335                additional_context: Vec::new(),
336            };
337            let _ = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
338            if !step.additional_context.is_empty() {
339                self.append_user_feedback(step.additional_context);
340            }
341        }
342
343        self.events.emit(AgentEvent::TurnStarted).await;
344
345        // After-turn-enter hook: the turn scope has been entered. Allows injecting system
346        // context or a Break to reject the turn.
347        // Note: currently the hook point is placed after prompt ingestion (moving the hook
348        // point earlier is a deferred adjustment).
349        {
350            let mut step = crate::hooks::step::AfterTurnEnter {
351                is_subagent: false,
352                agent_type: None,
353                additional_context: Vec::new(),
354            };
355            let control = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
356            if !step.additional_context.is_empty() {
357                self.append_user_feedback(step.additional_context);
358            }
359            if let crate::hooks::step::HookControl::Break { .. } = control {
360                return Ok(AcpStopReason::EndTurn);
361            }
362        }
363
364        let result = self.run_inner().await;
365
366        if let Ok(outcome) = &result {
367            self.events
368                .emit(AgentEvent::TurnEnded {
369                    reason: outcome.reason,
370                    usage: outcome.usage,
371                })
372                .await;
373        }
374        // The `Err` path does not emit `TurnEnded`; the bridge layer decides the wire
375        // response based on the future outcome.
376
377        result.map(|outcome| outcome.reason)
378    }
379
380    async fn run_inner(&self) -> Result<TurnOutcome, TurnError> {
381        let mut state = TurnState::new(self.config.request_limit, self.config.max_hook_continues);
382        loop {
383            if self.cancel.is_cancelled() {
384                return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
385            }
386
387            self.manage_context().await?;
388
389            let mut req = self.build_request();
390
391            // Before Generate hook: can modify request (model), short-circuit (fill in
392            // synthetic assistant to skip LLM), or Break.
393            let mut before_gen = crate::hooks::step::BeforeGenerate {
394                model: req.model.clone(),
395                message_count: req.messages.len(),
396                attempt: state.request_count.saturating_add(1),
397                assistant_text: None,
398            };
399            let bg_control = self.hooks.dispatch(&mut before_gen, self.hook_ctx()).await;
400            req.model = before_gen.model;
401            if let Some(text) = before_gen.assistant_text {
402                // Short-circuit: use a synthetic assistant reply to skip the real LLM
403                // call, then proceed to the before-turn-end check.
404                self.history.append(Message {
405                    role: Role::Assistant,
406                    content: vec![MessageContent::Text { text }].into(),
407                });
408                if self.decide_turn_end(&mut state).await {
409                    continue;
410                }
411                return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
412            }
413            if let crate::hooks::step::HookControl::Break { reason } = bg_control {
414                return Ok(turn_outcome(&state, reason));
415            }
416
417            let (mut stream, attempt) = self.call_llm_with_retry(&req, &mut state).await?;
418
419            let outcome = self.drain_provider_stream(&mut stream, &mut state).await?;
420
421            if outcome.cancelled {
422                return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
423            }
424
425            // The stream has been drained and all usage for this call is available — emit
426            // `LlmCallFinished` with the **per-call** actual usage (`outcome.usage`, not
427            // the turn-accumulated `state.usage`).
428            self.events
429                .emit(AgentEvent::LlmCallFinished {
430                    model: req.model.clone(),
431                    attempt,
432                    usage: outcome.usage,
433                    error: None,
434                })
435                .await;
436
437            // After the Generate hook: observe (usage / stop / error). No output to fill;
438            // to intervene, route the next turn through before-turn-end.
439            let stop_reason_for_hook = match outcome.stop {
440                LlmStopReason::EndTurn | LlmStopReason::StopSequence => AcpStopReason::EndTurn,
441                LlmStopReason::Refusal => AcpStopReason::Refusal,
442                LlmStopReason::MaxTokens => AcpStopReason::MaxTokens,
443                LlmStopReason::ToolUse => AcpStopReason::EndTurn,
444            };
445            let mut after_gen = crate::hooks::step::AfterGenerate {
446                model: req.model.clone(),
447                usage: outcome.usage,
448                stop: stop_reason_for_hook,
449                error: None,
450            };
451            let _ = self.hooks.dispatch(&mut after_gen, self.hook_ctx()).await;
452
453            // Feed the actual input token count returned by this call into `history` as
454            // the precise baseline for compaction threshold decisions (see
455            // `session/turn/compact.rs`). The messages sent in this call are
456            // `req.messages`, and their real input size is the sum of the three
457            // input-side fields in `outcome.usage`.
458            if let Some(real_input) = real_input_tokens(&outcome.usage) {
459                self.history.record_input_tokens(real_input);
460            }
461
462            let assistant = assistant_message(&outcome);
463            if !assistant.content.is_empty() {
464                self.history.append(assistant);
465            }
466
467            // Passive stop (Refusal / MaxTokens): skip the before-turn-end hook (the hook
468            // cannot extend these), exit directly.
469            match outcome.stop {
470                LlmStopReason::EndTurn | LlmStopReason::StopSequence => {
471                    // Voluntary stop → before-turn-end decision point.
472                    if self.decide_turn_end(&mut state).await {
473                        continue;
474                    }
475                    return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
476                }
477                LlmStopReason::Refusal => {
478                    return Ok(turn_outcome(&state, AcpStopReason::Refusal));
479                }
480                LlmStopReason::MaxTokens => {
481                    return Ok(turn_outcome(&state, AcpStopReason::MaxTokens));
482                }
483                LlmStopReason::ToolUse => {}
484            }
485
486            if outcome.tool_uses.is_empty() {
487                // Voluntary stop (no tool requested) → same before-turn-end decision
488                // point.
489                if self.decide_turn_end(&mut state).await {
490                    continue;
491                }
492                return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
493            }
494
495            // Before the Permission hook (currently only observes/stubs; policy still
496            // delegates to the authority).
497            for tu in &outcome.tool_uses {
498                let mut bp = crate::hooks::step::BeforePermission {
499                    tool: tu.name.clone(),
500                    decision: "ask".to_string(),
501                    resolved: None,
502                };
503                let _ = self.hooks.dispatch(&mut bp, self.hook_ctx()).await;
504            }
505
506            let approved = match self.decide_permissions(&outcome.tool_uses).await? {
507                DecisionFlow::Continue(list) => list,
508                DecisionFlow::Cancelled => {
509                    return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
510                }
511            };
512
513            // After permission hook (currently only observes/stubs).
514            for a in &approved {
515                let (tool, granted) = match a {
516                    Approved::Run { .. } => (approved_tool_name(a), true),
517                    Approved::Denied { .. } | Approved::FailedArgs { .. } => {
518                        (approved_tool_name(a), false)
519                    }
520                };
521                let mut ap = crate::hooks::step::AfterPermission { tool, granted };
522                let _ = self.hooks.dispatch(&mut ap, self.hook_ctx()).await;
523            }
524
525            let progressed = approved.iter().any(|a| matches!(a, Approved::Run { .. }));
526            if progressed {
527                state.note_progress();
528            }
529
530            let results = self.run_tools_concurrently(approved).await;
531
532            // After `ToolBatch` hook: after all parallel tools finish, before the next
533            // LLM call. Allows injection or graceful break.
534            let mut batch = crate::hooks::step::AfterToolBatch {
535                results: results
536                    .iter()
537                    .map(|r| crate::hooks::step::ToolBatchEntry {
538                        tool_name: r.name.clone(),
539                        is_error: r.is_error,
540                    })
541                    .collect(),
542                additional_context: Vec::new(),
543            };
544            let batch_control = self.hooks.dispatch(&mut batch, self.hook_ctx()).await;
545
546            self.history.append(tool_results_message(results));
547            if !batch.additional_context.is_empty() {
548                self.append_user_feedback(batch.additional_context);
549            }
550            if let crate::hooks::step::HookControl::Break { reason } = batch_control {
551                return Ok(turn_outcome(&state, reason));
552            }
553
554            if state.exceeded_request_cap() {
555                return Ok(turn_outcome(&state, AcpStopReason::MaxTurnRequests));
556            }
557        }
558    }
559
560    fn build_request(&self) -> CompletionRequest {
561        // Before sending the request, pair any orphaned `tool_use` (left over from an
562        // interruption, with no matching `tool_result`) — otherwise the provider will
563        // permanently reject the request. Only patch the copy sent to the provider; the
564        // true history remains untouched. See `sanitize`.
565        let messages = sanitize::sanitize_tool_pairing(self.history.snapshot());
566        let req = CompletionRequest {
567            model: self.config.model.clone(),
568            system: self.system_prompt.clone(),
569            messages,
570            tools: self.tools.schemas(),
571            tool_choice: ToolChoice::Auto,
572            sampling: self.config.sampling.clone(),
573            hosted_capabilities: self.hosted_capabilities,
574        };
575        self.request_audit.record(&req);
576        req
577    }
578
579    /// Layered context management: micro → soft (background) → hard (synchronous
580    /// fallback). Called at the start of each main loop iteration.
581    ///
582    /// Three water levels (see `compact_thresholds` for details):
583    /// 1. **micro** (default 0.6·window): if micro-compaction is enabled, first evict old
584    ///    large `tool_result` entries — no LLM calls, no message deletion, near-zero
585    ///    latency, deferring expensive full compaction.
586    /// 2. **soft** (default 0.7·window): if background compaction is enabled,
587    ///    **asynchronously** start a summary compaction; the turn does not block, quietly
588    ///    compacting before hitting hard (single-flight, won't re-start if one is already
589    ///    in flight).
590    /// 3. **hard** (default 0.85·window, equivalent to the old `compact_ratio`
591    ///    semantics): at this level compaction is mandatory — if a background compaction
592    ///    is already in flight, `await` its completion; otherwise compact
593    ///    **synchronously** as a fallback.
594    ///
595    /// micro/soft require the model to expose `context_window`; hard also supports an
596    /// absolute override via `compact_threshold_tokens`. If any level cannot obtain its
597    /// threshold, that level is skipped (preserving the conservative "no information, no
598    /// compaction" semantics).
599    async fn manage_context(&self) -> Result<(), TurnError> {
600        let thresholds = self.compact_thresholds();
601        // All three thresholds absent → no proactive compaction this turn (preserves the
602        // existing semantics).
603        if thresholds.is_empty() {
604            return Ok(());
605        }
606        let Some(estimate) = self.history.token_estimate() else {
607            return Ok(());
608        };
609
610        // ① micro: synchronous, cheapest. May reduce `estimate` below soft/hard
611        // thresholds, so re-fetch after compaction.
612        let estimate = if self.config.microcompact_enabled
613            && thresholds.micro.is_some_and(|t| estimate >= t)
614        {
615            self.run_microcompact().await;
616            self.history.token_estimate().unwrap_or(estimate)
617        } else {
618            estimate
619        };
620
621        // ② soft: crossing the threshold triggers an async background compaction without
622        // blocking the current round.
623        if self.config.background_compact_enabled
624            && let (Some(soft), Some(hard)) = (thresholds.soft, thresholds.hard)
625            && estimate >= soft
626            && estimate < hard
627        {
628            self.spawn_background_compaction(hard).await;
629            // Non-blocking – continue assembling requests this round; summary persistence
630            // happens in a later round (or later).
631            return Ok(());
632        }
633
634        // ③ hard: must compact.
635        if let Some(hard) = thresholds.hard
636            && estimate >= hard
637        {
638            self.compact_hard(estimate, hard).await?;
639        }
640        Ok(())
641    }
642
643    /// Run a micro-compact and write back via `replace`. Best-effort: does nothing if
644    /// there is nothing to clean up.
645    async fn run_microcompact(&self) {
646        let messages = self.history.snapshot();
647        let Some((rebuilt, report)) = microcompact::run(&messages) else {
648            return;
649        };
650        self.history.replace(rebuilt);
651        tracing::info!(
652            cleared = report.cleared,
653            tokens_before = report.tokens_before,
654            tokens_after = report.tokens_after,
655            "context microcompacted"
656        );
657        self.events
658            .emit(AgentEvent::ContextMicrocompacted {
659                tokens_before: report.tokens_before,
660                tokens_after: report.tokens_after,
661                cleared: report.cleared,
662            })
663            .await;
664    }
665
666    /// Spawns a single-flight background full compaction when the soft threshold is
667    /// exceeded. Requires a slot and `Arc` references to history and provider (only
668    /// available in the top-level turn; child agent turns silently skip this, leaving it
669    /// to the synchronous compaction at the hard threshold).
670    async fn spawn_background_compaction(&self, hard_threshold: u64) {
671        let (Some(slot), Some(history_arc), Some(provider_arc)) = (
672            self.compaction_slot.as_ref(),
673            self.history_arc.as_ref(),
674            self.provider_arc.as_ref(),
675        ) else {
676            return;
677        };
678        if slot.is_in_flight() {
679            return; // Single-flight: a compaction is already in flight, do not start another.
680        }
681
682        // The compaction cancel token is independent of the turn cancel — the summary
683        // should be allowed to finish even if the originating turn has ended; however, it
684        // is cancelled when the session ends. For sub-agent paths that have no
685        // `session_cancel`, it falls back to the turn cancel.
686        let cancel = self
687            .session_cancel
688            .clone()
689            .unwrap_or_else(|| self.cancel.clone())
690            .child_token();
691        let ctx = compact::CompactionCtx {
692            provider: provider_arc.clone(),
693            model: self.config.model.clone(),
694            sampling: self.config.sampling.clone(),
695            tools: self.tools.schemas(),
696            cancel,
697        };
698        let events = self.events.clone();
699        let on_done: Arc<
700            dyn Fn(crate::session::CompactionReport) -> futures::future::BoxFuture<'static, ()>
701                + Send
702                + Sync,
703        > = Arc::new(move |report| {
704            // Return a future so that `emit` is awaited inside the compaction task body —
705            // no detached task is spawned, and event emission is governed by the
706            // compaction task's cancel/track semantics.
707            let events = events.clone();
708            Box::pin(async move {
709                events
710                    .emit(AgentEvent::ContextCompressed {
711                        tokens_before: report.tokens_before,
712                        tokens_after: report.tokens_after,
713                    })
714                    .await;
715            })
716        });
717        let started = slot.try_spawn(history_arc.clone(), ctx, hard_threshold, on_done);
718        if started {
719            tracing::info!(hard_threshold, "background compaction started");
720        }
721    }
722
723    /// Hard threshold fallback: wait for an in-flight background compaction to finish, or
724    /// run a synchronous compaction.
725    async fn compact_hard(&self, estimate: u64, hard: u64) -> Result<(), TurnError> {
726        // A background compaction is already in flight; wait for it to finish to avoid
727        // redundant work.
728        if let Some(slot) = self.compaction_slot.as_ref()
729            && slot.is_in_flight()
730        {
731            slot.await_in_flight().await;
732            return Ok(());
733        }
734
735        // Before the compact hook: the hook may `Skip` to veto this compaction (a
736        // mutating step).
737        let mut before = crate::hooks::step::BeforeCompact {
738            token_estimate: estimate,
739            threshold: hard,
740        };
741        if let crate::hooks::step::HookControl::Skip =
742            self.hooks.dispatch(&mut before, self.hook_ctx()).await
743        {
744            tracing::info!("compaction vetoed by before-compact hook");
745            return Ok(());
746        }
747
748        let ctx = self.sync_compaction_ctx();
749        let Some(report) = compact::run_sync(self.history, &ctx, hard).await else {
750            // No safe compaction boundary (e.g., a single very long turn) — skip this
751            // round, no event emitted.
752            return Ok(());
753        };
754        self.events
755            .emit(AgentEvent::ContextCompressed {
756                tokens_before: report.tokens_before,
757                tokens_after: report.tokens_after,
758            })
759            .await;
760
761        // After the compact hook: observe and allow injection (injected content goes into
762        // history).
763        let mut after = crate::hooks::step::AfterCompact {
764            tokens_before: report.tokens_before,
765            tokens_after: report.tokens_after,
766            additional_context: Vec::new(),
767        };
768        let _ = self.hooks.dispatch(&mut after, self.hook_ctx()).await;
769        if !after.additional_context.is_empty() {
770            self.append_user_feedback(after.additional_context);
771        }
772        Ok(())
773    }
774
775    /// The [`compact::CompactionCtx`] for synchronous compaction. Wrapping a borrowed
776    /// provider in a temporary `Arc` is not feasible (a trait object borrow cannot be
777    /// `Arc`), so the synchronous path requires `provider_arc`. Falling back when it is
778    /// missing is impossible (the top-level always has one; child agents use a borrowed
779    /// `provider`—see the `sync_compaction_ctx` implementation).
780    fn sync_compaction_ctx(&self) -> compact::CompactionCtx {
781        compact::CompactionCtx {
782            provider: self
783                .provider_arc
784                .clone()
785                .expect("sync compaction requires provider_arc"),
786            model: self.config.model.clone(),
787            sampling: self.config.sampling.clone(),
788            tools: self.tools.schemas(),
789            cancel: self.cancel.clone(),
790        }
791    }
792
793    /// Parse the three-tier compaction thresholds (in tokens) for this turn. Any tier set
794    /// to `None` means that tier is not triggered.
795    fn compact_thresholds(&self) -> CompactThresholds {
796        let window = self
797            .provider
798            .model_info(&self.config.model)
799            .and_then(|m| m.context_window);
800
801        // For `hard`, an absolute threshold takes precedence; otherwise, use `ratio *
802        // window`.
803        let hard = self.config.compact_threshold_tokens.or_else(|| {
804            let ratio = self.config.compact_ratio?;
805            ratio_threshold(window?, ratio)
806        });
807        // micro/soft can only be derived from window (absolute overrides apply only to
808        // hard).
809        let from_ratio =
810            |ratio: Option<f64>| ratio.and_then(|r| window.and_then(|w| ratio_threshold(w, r)));
811        CompactThresholds {
812            micro: from_ratio(self.config.microcompact_ratio),
813            soft: from_ratio(self.config.compact_soft_ratio),
814            hard,
815        }
816    }
817
818    pub(super) fn hook_ctx(&self) -> HookCtx<'_> {
819        HookCtx::new(self.session_id, self.cwd, self.cancel.clone())
820    }
821}
822
823// ----- internal types -----
824
825#[derive(Clone, Copy)]
826struct TurnOutcome {
827    reason: AcpStopReason,
828    usage: Usage,
829}
830
831/// Three-tier compaction watermarks (in tokens). Each `None` means that tier is not
832/// triggered this turn.
833#[derive(Clone, Copy)]
834struct CompactThresholds {
835    micro: Option<u64>,
836    soft: Option<u64>,
837    hard: Option<u64>,
838}
839
840impl CompactThresholds {
841    /// All three thresholds absent — no proactive compaction this turn.
842    fn is_empty(&self) -> bool {
843        self.micro.is_none() && self.soft.is_none() && self.hard.is_none()
844    }
845}
846
847/// `context_window * ratio` rounded down. `ratio` is in `(0, 1]`. `0` → `None` (no
848/// trigger).
849fn ratio_threshold(context_window: u64, ratio: f64) -> Option<u64> {
850    let threshold = (context_window as f64 * ratio).floor() as u64;
851    (threshold > 0).then_some(threshold)
852}
853
854/// Default upper limit for forced continuations in the `before turn-end` hook. Can be
855/// overridden by [`TurnConfig::max_hook_continues`] (config key
856/// `[turn].max_hook_continues`). See docs on hook step context exit semantics.
857pub(crate) const DEFAULT_MAX_HOOK_CONTINUES: u32 = 3;
858
859/// Default upper bound for subagent vertical recursion depth. Counted from the top-level
860/// turn: N levels means the top turn can spawn subagents, their children can spawn
861/// further, and so on, until the Nth level (where `subagent_max_depth` reaches 0) can no
862/// longer call `spawn_agent`. The default of 4 leaves room for orchestrations like "main
863/// agent → coordinator subagent → worker subagent" while preventing runaway vertical
864/// growth. Horizontal runaway is separately guarded by `request_limit`.
865pub(crate) const DEFAULT_SUBAGENT_MAX_DEPTH: u32 = 1;
866
867struct TurnState {
868    request_count: u32,
869    usage: Usage,
870    cap: Option<u32>,
871    expand_on_progress: bool,
872    /// How many times this turn has been extended by the `before turn-end` hook. Cap is
873    /// [`Self::max_stop_hook_continues`].
874    stop_hook_continues: u32,
875    /// Hard upper limit for life-extending continues (from
876    /// [`TurnConfig::max_hook_continues`]). Prevents hooks from `Continue`ing
877    /// indefinitely.
878    max_stop_hook_continues: u32,
879}
880
881impl TurnState {
882    fn new(limit: TurnRequestLimit, max_hook_continues: u32) -> Self {
883        Self {
884            request_count: 0,
885            usage: Usage::default(),
886            cap: limit.initial_cap(),
887            expand_on_progress: limit.expand_on_progress(),
888            stop_hook_continues: 0,
889            max_stop_hook_continues: max_hook_continues,
890        }
891    }
892
893    fn note_progress(&mut self) {
894        if self.expand_on_progress
895            && let Some(cap) = self.cap.as_mut()
896        {
897            *cap = cap.saturating_add(1);
898        }
899    }
900
901    fn exceeded_request_cap(&self) -> bool {
902        match self.cap {
903            None => false,
904            Some(cap) => self.request_count >= cap,
905        }
906    }
907
908    /// Whether the `before turn-end` hook is still allowed to continue (has not reached
909    /// the hard limit).
910    fn may_stop_hook_continue(&self) -> bool {
911        self.stop_hook_continues < self.max_stop_hook_continues
912    }
913
914    /// Records one stop-hook continuation.
915    fn note_stop_hook_continue(&mut self) {
916        self.stop_hook_continues = self.stop_hook_continues.saturating_add(1);
917    }
918}
919
920// ----- helpers -----
921
922fn turn_outcome(state: &TurnState, reason: AcpStopReason) -> TurnOutcome {
923    TurnOutcome {
924        reason,
925        usage: state.usage,
926    }
927}
928
929#[cfg(test)]
930mod tests;