Skip to main content

defect_agent/session/
turn.rs

1//! Turn main loop.
2//!
3//! Turn main loop — the "heart" of the agent. This file implements the state machine.
4//!
5//! Key dependencies:
6//! - [`History`]: read/write message history
7//! - [`ToolRegistry`]: tool lookup
8//! - [`LlmProvider`]: LLM invocation
9//! - [`EventEmitter`]: event emission (shared via `Arc` so tool tasks can also emit)
10//! - [`PermissionGate`]: wait for permission requests
11
12use std::path::PathBuf;
13use std::sync::Arc;
14
15use agent_client_protocol_schema::{ContentBlock, SessionId, StopReason as AcpStopReason};
16use tokio_util::sync::CancellationToken;
17
18use crate::event::AgentEvent;
19use crate::fs::FsBackend;
20use crate::hooks::{HookCtx, HookEngine};
21use crate::http::HttpClient;
22use crate::llm::{
23    CompletionRequest, HostedCapabilities, LlmProvider, Message, MessageContent, Role,
24    SamplingParams, StopReason as LlmStopReason, ToolChoice, Usage,
25};
26use crate::policy::SandboxPolicy;
27use crate::session::events::EventEmitter;
28use crate::session::permissions::PermissionGate;
29use crate::session::{History, ToolRegistry, TurnError};
30use crate::shell::ShellBackend;
31
32const DEFAULT_PROMPT_FILE: &str = "AGENTS.md";
33
34mod request_audit;
35
36mod compact;
37
38mod microcompact;
39
40mod compaction_slot;
41
42pub use compaction_slot::CompactionSlot;
43
44mod sanitize;
45
46mod content;
47
48mod llm_drive;
49
50mod tools;
51
52mod hooks;
53
54use content::content_block_to_message_content;
55use hooks::UserPromptHookFlow;
56use llm_drive::{assistant_message, real_input_tokens};
57use tools::{
58    Approved, DecisionFlow, approved_tool_name, reject_oversized_results, tool_results_message,
59};
60
61pub(crate) use request_audit::RequestAuditTracker;
62// Out-of-band `/compact` slash command reuses the same synchronous compaction primitive
63// as the turn loop's hard-watermark fallback, so the two share boundary selection and
64// summary logic instead of forking a second compaction path.
65pub(crate) use compact::{CompactionCtx, run_sync as run_sync_compaction};
66
67/// Strategy for capping LLM calls.
68#[derive(Debug, Clone, Copy)]
69pub enum TurnRequestLimit {
70    /// No upper limit.
71    Unbounded,
72    /// Fixed limit: returns [`AcpStopReason::MaxTurnRequests`] after N turns.
73    Fixed(u32),
74    /// Adaptive: each time a tool use is approved and executed in the current turn, that
75    /// counts as progress and the limit is automatically incremented by 1; otherwise,
76    /// termination follows [`Self::Fixed`].
77    Adaptive {
78        initial: u32,
79        expand_on_progress: bool,
80    },
81}
82
83impl TurnRequestLimit {
84    fn initial_cap(&self) -> Option<u32> {
85        match *self {
86            Self::Unbounded => None,
87            Self::Fixed(n) => Some(n),
88            Self::Adaptive { initial, .. } => Some(initial),
89        }
90    }
91
92    fn expand_on_progress(&self) -> bool {
93        matches!(
94            self,
95            Self::Adaptive {
96                expand_on_progress: true,
97                ..
98            }
99        )
100    }
101}
102
103/// Configuration for a turn.
104#[derive(Debug, Clone)]
105pub struct TurnConfig {
106    /// The selected provider vendor (the provider half of the selection key). Together
107    /// with [`Self::model`], this is used to resolve the actual provider entry in the
108    /// registry by the `(vendor, model)` pair.
109    pub provider: String,
110    pub model: String,
111    pub allowed_models: Option<Vec<String>>,
112    pub base_prompt: BasePromptConfig,
113    pub system_prompt: Option<String>,
114    pub prompt: PromptConfig,
115    pub sampling: SamplingParams,
116    pub request_limit: TurnRequestLimit,
117    /// Explicit absolute override for the compaction threshold (in tokens). When `Some`,
118    /// takes precedence over the value inferred from [`Self::compact_ratio`]. When
119    /// `None`, the threshold is automatically derived from the ratio.
120    pub compact_threshold_tokens: Option<u64>,
121    /// Compression threshold as a fraction of the model's `context_window` (e.g. `0.85` =
122    /// trigger when usage exceeds 85%). This is the **hard watermark**: when reached, if
123    /// no background compaction is in flight, the turn main loop performs **synchronous**
124    /// compaction as a fallback (blocking the current turn but guaranteeing the context
125    /// is not exceeded). `None` = no automatic ratio-based compression (and if no
126    /// absolute threshold is set either, no compression occurs for this turn). Only
127    /// effective when `compact_threshold_tokens` is `None` and the model exposes a
128    /// `context_window`. See `session/turn/compact.rs` for details.
129    pub compact_ratio: Option<f64>,
130    /// **Background full compaction** toggle. When `true`, once the soft watermark
131    /// derived from [`Self::compact_soft_ratio`] is exceeded, a summarization compaction
132    /// is started **asynchronously** (without blocking the current turn); it quietly
133    /// compresses history before the hard watermark is hit. When disabled, only
134    /// synchronous compaction at the hard watermark remains. See
135    /// `session/turn/compaction_slot.rs`.
136    pub background_compact_enabled: bool,
137    /// Soft watermark for background compaction, as a fraction of `context_window`
138    /// (default `0.7`). Must be less than `compact_ratio` (the hard watermark) to leave a
139    /// window between soft and hard for background summarization to complete. Only
140    /// effective when `background_compact_enabled` is set and a threshold can be derived.
141    pub compact_soft_ratio: Option<f64>,
142    /// Enables **micro‑compaction**. When `true`, each turn first runs a micro‑compaction
143    /// (cleans oversized `tool_result` in older turns, without calling the LLM or
144    /// deleting messages) at the water level above [`Self::microcompact_ratio`],
145    /// deferring expensive full compaction. See `session/turn/microcompact.rs`.
146    pub microcompact_enabled: bool,
147    /// Micro‑compaction watermark as a fraction of `context_window` (default `0.6`).
148    /// Typically below the soft watermark — micro‑compaction is the cheapest first line
149    /// of defense. Only effective when `microcompact_enabled` and a threshold can be
150    /// derived.
151    pub microcompact_ratio: Option<f64>,
152    pub max_llm_retries: u32,
153    /// `0` = unlimited. The default is unlimited.
154    pub max_concurrent_tools: usize,
155    /// Hard upper limit on forced continuations from the `before turn-end` hook —
156    /// prevents infinite loops from repeated hook `Continue` calls. Default: 3.
157    pub max_hook_continues: u32,
158    /// Maximum subagent nesting depth (vertical recursion limit) for this turn.
159    /// `spawn_agent` uses this to pass the "remaining depth" to tools via
160    /// [`crate::tool::ToolContext::subagent_depth`]; a child agent's nested turn receives
161    /// `subagent_max_depth` = parent's remaining depth minus one. `0` means this turn's
162    /// tool set contains no `spawn_agent`, structurally forbidding dispatch — replacing
163    /// the old hardcoded gate of "whitelist never contains `spawn_agent`". Default:
164    /// `DEFAULT_SUBAGENT_MAX_DEPTH`.
165    pub subagent_max_depth: u32,
166}
167
168impl Default for TurnConfig {
169    fn default() -> Self {
170        Self {
171            provider: String::new(),
172            model: String::new(),
173            allowed_models: None,
174            base_prompt: BasePromptConfig::default(),
175            system_prompt: None,
176            prompt: PromptConfig::default(),
177            sampling: SamplingParams::default(),
178            request_limit: TurnRequestLimit::Adaptive {
179                initial: 32,
180                expand_on_progress: true,
181            },
182            compact_threshold_tokens: None,
183            // Trigger compaction at 85% of `context_window` (hard watermark), reserving
184            // ~15% for summary output and headroom — within the reasonable range of codex
185            // (90%), Claude (~93%), and opencode (window-20k).
186            compact_ratio: Some(0.85),
187            // Background compaction enabled by default: starts async summarization at
188            // soft=0.7, aiming to finish before hard=0.85 is reached.
189            background_compact_enabled: true,
190            compact_soft_ratio: Some(0.7),
191            // Micro‑compaction enabled by default: at 0.6 it evicts old large
192            // `tool_result`s — the cheapest first line of defense.
193            microcompact_enabled: true,
194            microcompact_ratio: Some(0.6),
195            max_llm_retries: 3,
196            max_concurrent_tools: 0,
197            max_hook_continues: DEFAULT_MAX_HOOK_CONTINUES,
198            subagent_max_depth: DEFAULT_SUBAGENT_MAX_DEPTH,
199        }
200    }
201}
202
203#[derive(Debug, Clone, PartialEq, Eq, Default)]
204pub struct BasePromptConfig {
205    pub file: Option<PathBuf>,
206    pub text: Option<String>,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct PromptConfig {
211    pub file: String,
212    pub text: Option<String>,
213    pub provider_overlays: std::collections::BTreeMap<String, String>,
214    pub model_overlays: std::collections::BTreeMap<String, String>,
215}
216
217impl Default for PromptConfig {
218    fn default() -> Self {
219        Self {
220            file: DEFAULT_PROMPT_FILE.to_owned(),
221            text: None,
222            provider_overlays: std::collections::BTreeMap::new(),
223            model_overlays: std::collections::BTreeMap::new(),
224        }
225    }
226}
227/// All dependencies and accumulated state for a single turn execution.
228///
229/// This struct is constructed by [`crate::session::DefaultSession`] on each `run_turn`
230/// call,
231/// borrowing sub-components of the session and being dropped after the turn completes.
232pub struct TurnRunner<'a> {
233    pub history: &'a dyn History,
234    pub tools: &'a dyn ToolRegistry,
235    /// The same session tool pool as [`Self::tools`], but as an owned `Arc` so it can be
236    /// injected into [`crate::tool::ToolContext`] (which needs `'static`/owned). Carries
237    /// the fully assembled composite (built-in + MCP) so `spawn_agent` can build a child
238    /// agent's tool subset that includes `mcp__*` tools. `None` in legacy/test runners
239    /// that only set the borrow.
240    pub session_tools: Option<Arc<dyn ToolRegistry>>,
241    pub provider: &'a dyn LlmProvider,
242    /// The active policy for this turn's snapshot. Owned as an `Arc` rather than
243    /// borrowed: it flows with [`crate::tool::ToolContext`] into `spawn_agent`, where
244    /// child agents wrap it with
245    /// [`NonInteractivePolicy`](crate::policy::NonInteractivePolicy) — must be the
246    /// parent's actual policy at this moment.
247    pub policy: Arc<dyn SandboxPolicy>,
248    pub events: Arc<EventEmitter>,
249    pub permissions: &'a PermissionGate,
250    pub cancel: CancellationToken,
251    pub config: &'a TurnConfig,
252    /// The same turn config as [`Self::config`], as an owned `Arc` for injection into
253    /// [`crate::tool::ToolContext`] so `spawn_agent` can let a child agent inherit the
254    /// parent's turn settings. `None` in legacy/test runners that only set the borrow.
255    pub config_arc: Option<Arc<TurnConfig>>,
256    /// The system prompt resolved for this turn. `Arc<str>`: each `build_request` call
257    /// `clone`s it into `CompletionRequest.system`; the `Arc` reduces this to a reference
258    /// count increment.
259    pub system_prompt: Option<Arc<str>>,
260    pub cwd: &'a std::path::Path,
261    pub fs: Arc<dyn FsBackend>,
262    pub shell: Arc<dyn ShellBackend>,
263    pub http: Arc<dyn HttpClient>,
264    /// Hosted capabilities determined at session startup.
265    /// Reused directly on each turn when assembling requests, without re-querying.
266    pub hosted_capabilities: HostedCapabilities,
267    /// Hook engine. The turn main loop emits Sync events at four points
268    /// (`UserPromptSubmit` / `PreToolUse` / `PostToolUse` / `PostToolUseFailure`).
269    /// Waits for hooks to finish before proceeding.
270    pub hooks: &'a dyn HookEngine,
271    /// Current session ID. Injected into `HookCtx` so that hook handlers can route or
272    /// audit by session.
273    pub session_id: &'a SessionId,
274    /// Session-level background task handle. When `Some`, enables the tool's
275    /// `run_in_background` capability (injected into tools via
276    /// [`crate::tool::ToolContext::background`]); nested sub-agent turns receive `None`,
277    /// structurally preventing background task self-replication.
278    pub background: Option<crate::session::BackgroundTasks>,
279    /// Shared state for the `--goal` goal-driven loop. When `Some`, this session is
280    /// running in goal mode: it is injected into the `goal_done` tool via
281    /// [`crate::tool::ToolContext::goal`], and the `goal-gate` hook uses it in
282    /// `before_turn_end` to allow or extend the session. `None` = non-goal mode
283    /// (default).
284    pub goal: Option<Arc<crate::session::GoalState>>,
285    /// Session-level single-flight compaction slot. When `Some`, background full
286    /// compaction is available — exceeding the soft watermark triggers an async summary
287    /// compaction without blocking the current turn. Nested sub-agent turns pass `None`
288    /// (sub-agent contexts are short-lived and should not spawn background tasks).
289    /// Requires `Arc<dyn History>`/`Arc<dyn LlmProvider>` for the task to hold `'static`
290    /// references across turns, hence the accompanying `history_arc`/`provider_arc`.
291    pub compaction_slot: Option<crate::session::CompactionSlot>,
292    /// `Arc<dyn History>` for `compaction_slot` (points to the same object as the
293    /// `history` borrow). Held by the background compaction task across turns. When
294    /// `None`, background compaction is unavailable and falls back to synchronous
295    /// compaction only.
296    pub history_arc: Option<Arc<dyn History>>,
297    /// `Arc` for the provider used by `compaction_slot`. Same as above.
298    pub provider_arc: Option<Arc<dyn LlmProvider>>,
299    /// Session-level cancellation token; the background compaction task's cancellation
300    /// token is derived from this one (independent of the turn's cancel, cancelled when
301    /// the session ends). When `None`, background compaction uses the turn's `cancel`
302    /// (sub-agent path).
303    pub session_cancel: Option<CancellationToken>,
304    /// The ingestion source for this turn's input — determines the `source` field of the
305    /// `before_ingest` step envelope.
306    /// User turns use `User`; background continuation turns started by the session driver
307    /// use `Background`.
308    pub ingest_source: crate::hooks::step::IngestSource,
309    /// Request stability diagnostics: compares snapshots of two consecutive requests
310    /// actually sent to the provider, helping locate sources of high volatility in low
311    /// prompt cache hit rates.
312    pub(crate) request_audit: &'a RequestAuditTracker,
313}
314
315impl<'a> TurnRunner<'a> {
316    /// Runs a single turn.
317    pub async fn run(&self, prompt: Vec<ContentBlock>) -> Result<AcpStopReason, TurnError> {
318        // ① UserPromptSubmit hook (sync interception)
319        // Gives hooks a chance to rewrite or intercept the prompt before it lands in
320        // history.
321        let prompt = match self.fire_user_prompt_submit(prompt).await {
322            UserPromptHookFlow::Continue(p) => p,
323            UserPromptHookFlow::Refused => {
324                // Hook blocked: do not emit `UserPromptCommitted`, do not append to
325                // history; return `Refusal` directly so the ACP bridge responds with
326                // `PromptResponse`.
327                return Ok(AcpStopReason::Refusal);
328            }
329        };
330
331        // Rollback boundary: history length before this turn appends anything. If the turn
332        // fails permanently, everything appended from here on (the user prompt, hook
333        // feedback, partial assistant/tool messages) is truncated away, so a failed turn
334        // leaves no orphan in history to be replayed on reload or re-sent next request.
335        let rollback_len = self.history.len();
336
337        self.events
338            .emit(AgentEvent::UserPromptCommitted {
339                content: prompt.clone(),
340            })
341            .await;
342        self.history.append(Message {
343            role: Role::User,
344            content: prompt
345                .into_iter()
346                .map(content_block_to_message_content)
347                .collect::<Result<Vec<_>, _>>()?
348                .into_iter()
349                .flatten()
350                .collect(),
351        });
352
353        // After Ingest hook: input has been merged into history. Injection only.
354        {
355            let mut step = crate::hooks::step::AfterIngest {
356                committed_len: 1,
357                additional_context: Vec::new(),
358            };
359            let _ = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
360            if !step.additional_context.is_empty() {
361                self.append_user_feedback(step.additional_context);
362            }
363        }
364
365        self.events.emit(AgentEvent::TurnStarted).await;
366
367        // After-turn-enter hook: the turn scope has been entered. Allows injecting system
368        // context or a Break to reject the turn.
369        // Note: currently the hook point is placed after prompt ingestion (moving the hook
370        // point earlier is a deferred adjustment).
371        {
372            let mut step = crate::hooks::step::AfterTurnEnter {
373                is_subagent: false,
374                agent_type: None,
375                additional_context: Vec::new(),
376            };
377            let control = self.hooks.dispatch(&mut step, self.hook_ctx()).await;
378            if !step.additional_context.is_empty() {
379                self.append_user_feedback(step.additional_context);
380            }
381            if let crate::hooks::step::HookControl::Break { .. } = control {
382                return Ok(AcpStopReason::EndTurn);
383            }
384        }
385
386        let result = self.run_inner().await;
387
388        match &result {
389            Ok(outcome) => {
390                self.events
391                    .emit(AgentEvent::TurnEnded {
392                        reason: outcome.reason,
393                        usage: outcome.usage,
394                    })
395                    .await;
396            }
397            Err(_) => {
398                // Permanent failure (e.g. provider error after retries). Roll history back
399                // to the pre-turn boundary so the orphan user prompt does not linger; the
400                // bridge layer decides the wire response. `TurnAborted` tells storage to
401                // drop the same tail it already journaled (the `UserPromptCommitted` /
402                // `TurnStarted` records emitted before the failure), keeping the in-memory
403                // and persisted histories consistent.
404                self.history.truncate(rollback_len);
405                self.events.emit(AgentEvent::TurnAborted).await;
406            }
407        }
408        // The `Err` path does not emit `TurnEnded`; the bridge layer decides the wire
409        // response based on the future outcome.
410
411        result.map(|outcome| outcome.reason)
412    }
413
414    async fn run_inner(&self) -> Result<TurnOutcome, TurnError> {
415        let mut state = TurnState::new(self.config.request_limit, self.config.max_hook_continues);
416        loop {
417            if self.cancel.is_cancelled() {
418                return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
419            }
420
421            self.manage_context().await?;
422
423            let mut req = self.build_request();
424
425            // Before Generate hook: can modify request (model), short-circuit (fill in
426            // synthetic assistant to skip LLM), or Break.
427            let mut before_gen = crate::hooks::step::BeforeGenerate {
428                model: req.model.clone(),
429                message_count: req.messages.len(),
430                attempt: state.request_count.saturating_add(1),
431                assistant_text: None,
432            };
433            let bg_control = self.hooks.dispatch(&mut before_gen, self.hook_ctx()).await;
434            req.model = before_gen.model;
435            if let Some(text) = before_gen.assistant_text {
436                // Short-circuit: use a synthetic assistant reply to skip the real LLM
437                // call, then proceed to the before-turn-end check.
438                self.history.append(Message {
439                    role: Role::Assistant,
440                    content: vec![MessageContent::Text { text }].into(),
441                });
442                if self
443                    .decide_turn_end(&mut state, AcpStopReason::EndTurn, true)
444                    .await
445                {
446                    continue;
447                }
448                return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
449            }
450            if let crate::hooks::step::HookControl::Break { reason } = bg_control {
451                return Ok(turn_outcome(&state, reason));
452            }
453
454            let (mut stream, attempt) = self.call_llm_with_retry(&req, &mut state).await?;
455
456            let outcome = self.drain_provider_stream(&mut stream, &mut state).await?;
457
458            if outcome.cancelled {
459                return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
460            }
461
462            // The stream has been drained and all usage for this call is available — emit
463            // `LlmCallFinished` with the **per-call** actual usage (`outcome.usage`, not
464            // the turn-accumulated `state.usage`).
465            self.events
466                .emit(AgentEvent::LlmCallFinished {
467                    model: req.model.clone(),
468                    attempt,
469                    usage: outcome.usage,
470                    error: None,
471                })
472                .await;
473
474            // After the Generate hook: observe (usage / stop / error). No output to fill;
475            // to intervene, route the next turn through before-turn-end.
476            let stop_reason_for_hook = match outcome.stop {
477                LlmStopReason::EndTurn | LlmStopReason::StopSequence => AcpStopReason::EndTurn,
478                LlmStopReason::Refusal => AcpStopReason::Refusal,
479                LlmStopReason::MaxTokens => AcpStopReason::MaxTokens,
480                LlmStopReason::ToolUse => AcpStopReason::EndTurn,
481            };
482            let mut after_gen = crate::hooks::step::AfterGenerate {
483                model: req.model.clone(),
484                usage: outcome.usage,
485                stop: stop_reason_for_hook,
486                error: None,
487            };
488            let _ = self.hooks.dispatch(&mut after_gen, self.hook_ctx()).await;
489
490            // Feed the actual input token count returned by this call into `history` as
491            // the precise baseline for compaction threshold decisions (see
492            // `session/turn/compact.rs`). The messages sent in this call are
493            // `req.messages`, and their real input size is the sum of the three
494            // input-side fields in `outcome.usage`.
495            if let Some(real_input) = real_input_tokens(&outcome.usage) {
496                self.history.record_input_tokens(real_input);
497            }
498
499            let assistant = assistant_message(&outcome);
500            if !assistant.content.is_empty() {
501                self.history.append(assistant);
502            }
503
504            // Passive stop (Refusal / MaxTokens): skip the before-turn-end hook (the hook
505            // cannot extend these), exit directly.
506            match outcome.stop {
507                LlmStopReason::EndTurn | LlmStopReason::StopSequence => {
508                    // Voluntary stop → before-turn-end decision point.
509                    if self
510                        .decide_turn_end(&mut state, AcpStopReason::EndTurn, true)
511                        .await
512                    {
513                        continue;
514                    }
515                    return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
516                }
517                LlmStopReason::Refusal => {
518                    return Ok(turn_outcome(&state, AcpStopReason::Refusal));
519                }
520                LlmStopReason::MaxTokens => {
521                    return Ok(turn_outcome(&state, AcpStopReason::MaxTokens));
522                }
523                LlmStopReason::ToolUse => {}
524            }
525
526            if outcome.tool_uses.is_empty() {
527                // Voluntary stop (no tool requested) → same before-turn-end decision
528                // point.
529                if self
530                    .decide_turn_end(&mut state, AcpStopReason::EndTurn, true)
531                    .await
532                {
533                    continue;
534                }
535                return Ok(turn_outcome(&state, AcpStopReason::EndTurn));
536            }
537
538            // Before the Permission hook (currently only observes/stubs; policy still
539            // delegates to the authority).
540            for tu in &outcome.tool_uses {
541                let mut bp = crate::hooks::step::BeforePermission {
542                    tool: tu.name.clone(),
543                    decision: "ask".to_string(),
544                    resolved: None,
545                };
546                let _ = self.hooks.dispatch(&mut bp, self.hook_ctx()).await;
547            }
548
549            let approved = match self.decide_permissions(&outcome.tool_uses).await? {
550                DecisionFlow::Continue(list) => list,
551                DecisionFlow::Cancelled => {
552                    return Ok(turn_outcome(&state, AcpStopReason::Cancelled));
553                }
554            };
555
556            // After permission hook (currently only observes/stubs).
557            for a in &approved {
558                let (tool, granted) = match a {
559                    Approved::Run { .. } => (approved_tool_name(a), true),
560                    Approved::Denied { .. } | Approved::FailedArgs { .. } => {
561                        (approved_tool_name(a), false)
562                    }
563                };
564                let mut ap = crate::hooks::step::AfterPermission { tool, granted };
565                let _ = self.hooks.dispatch(&mut ap, self.hook_ctx()).await;
566            }
567
568            let progressed = approved.iter().any(|a| matches!(a, Approved::Run { .. }));
569            if progressed {
570                state.note_progress();
571            }
572
573            let mut results = self.run_tools_concurrently(approved).await;
574
575            // Reject any single tool result that exceeds the model's context window: it can
576            // never fit, so appending it as-is would only blow up the next request. Replace
577            // it with an actionable error before it enters history. See
578            // `reject_oversized_results`.
579            let rejected = reject_oversized_results(&mut results, self.effective_context_window());
580            if rejected > 0 {
581                tracing::warn!(
582                    rejected,
583                    "rejected oversized tool result(s) exceeding the context window"
584                );
585            }
586
587            // After `ToolBatch` hook: after all parallel tools finish, before the next
588            // LLM call. Allows injection or graceful break.
589            let mut batch = crate::hooks::step::AfterToolBatch {
590                results: results
591                    .iter()
592                    .map(|r| crate::hooks::step::ToolBatchEntry {
593                        tool_name: r.name.clone(),
594                        is_error: r.is_error,
595                    })
596                    .collect(),
597                additional_context: Vec::new(),
598            };
599            let batch_control = self.hooks.dispatch(&mut batch, self.hook_ctx()).await;
600
601            self.history.append(tool_results_message(results));
602            if !batch.additional_context.is_empty() {
603                self.append_user_feedback(batch.additional_context);
604            }
605            if let crate::hooks::step::HookControl::Break { reason } = batch_control {
606                return Ok(turn_outcome(&state, reason));
607            }
608
609            if state.exceeded_request_cap() {
610                // Hitting the per-turn request cap is an involuntary stop. Still consult
611                // the before-turn-end hook: in goal mode the goal gate decides whether to
612                // keep working (resetting the request budget for the next round, bounded
613                // by `max_hook_continues`). Without a continuing hook, the turn stops with
614                // `MaxTurnRequests` as before.
615                if self
616                    .decide_turn_end(&mut state, AcpStopReason::MaxTurnRequests, false)
617                    .await
618                {
619                    continue;
620                }
621                return Ok(turn_outcome(&state, AcpStopReason::MaxTurnRequests));
622            }
623        }
624    }
625
626    fn build_request(&self) -> CompletionRequest {
627        // Before sending the request, pair any orphaned `tool_use` (left over from an
628        // interruption, with no matching `tool_result`) — otherwise the provider will
629        // permanently reject the request. Only patch the copy sent to the provider; the
630        // true history remains untouched. See `sanitize`.
631        let messages = sanitize::sanitize_tool_pairing(self.history.snapshot());
632        let req = CompletionRequest {
633            model: self.config.model.clone(),
634            system: self.system_prompt.clone(),
635            messages,
636            tools: self.tools.schemas(),
637            tool_choice: ToolChoice::Auto,
638            sampling: self.config.sampling.clone(),
639            hosted_capabilities: self.hosted_capabilities,
640        };
641        self.request_audit.record(&req);
642        req
643    }
644
645    /// Layered context management: micro → soft (background) → hard (synchronous
646    /// fallback). Called at the start of each main loop iteration.
647    ///
648    /// Three water levels (see `compact_thresholds` for details):
649    /// 1. **micro** (default 0.6·window): if micro-compaction is enabled, first evict old
650    ///    large `tool_result` entries — no LLM calls, no message deletion, near-zero
651    ///    latency, deferring expensive full compaction.
652    /// 2. **soft** (default 0.7·window): if background compaction is enabled,
653    ///    **asynchronously** start a summary compaction; the turn does not block, quietly
654    ///    compacting before hitting hard (single-flight, won't re-start if one is already
655    ///    in flight).
656    /// 3. **hard** (default 0.85·window, equivalent to the old `compact_ratio`
657    ///    semantics): at this level compaction is mandatory — if a background compaction
658    ///    is already in flight, `await` its completion; otherwise compact
659    ///    **synchronously** as a fallback.
660    ///
661    /// micro/soft require the model to expose `context_window`; hard also supports an
662    /// absolute override via `compact_threshold_tokens`. If any level cannot obtain its
663    /// threshold, that level is skipped (preserving the conservative "no information, no
664    /// compaction" semantics).
665    async fn manage_context(&self) -> Result<(), TurnError> {
666        let thresholds = self.compact_thresholds();
667        // All three thresholds absent → no proactive compaction this turn (preserves the
668        // existing semantics).
669        if thresholds.is_empty() {
670            return Ok(());
671        }
672        let Some(estimate) = self.history.token_estimate() else {
673            return Ok(());
674        };
675
676        // ① micro: synchronous, cheapest. May reduce `estimate` below soft/hard
677        // thresholds, so re-fetch after compaction.
678        let estimate = if self.config.microcompact_enabled
679            && thresholds.micro.is_some_and(|t| estimate >= t)
680        {
681            self.run_microcompact().await;
682            self.history.token_estimate().unwrap_or(estimate)
683        } else {
684            estimate
685        };
686
687        // ② soft: crossing the threshold triggers an async background compaction without
688        // blocking the current round.
689        if self.config.background_compact_enabled
690            && let (Some(soft), Some(hard)) = (thresholds.soft, thresholds.hard)
691            && estimate >= soft
692            && estimate < hard
693        {
694            self.spawn_background_compaction(hard).await;
695            // Non-blocking – continue assembling requests this round; summary persistence
696            // happens in a later round (or later).
697            return Ok(());
698        }
699
700        // ③ hard: must compact.
701        if let Some(hard) = thresholds.hard
702            && estimate >= hard
703        {
704            self.compact_hard(estimate, hard).await?;
705        }
706        Ok(())
707    }
708
709    /// Run a micro-compact and write back via `replace`. Best-effort: does nothing if
710    /// there is nothing to clean up.
711    async fn run_microcompact(&self) {
712        let messages = self.history.snapshot();
713        let Some((rebuilt, report)) = microcompact::run(&messages) else {
714            return;
715        };
716        self.history.replace(rebuilt);
717        tracing::info!(
718            cleared = report.cleared,
719            tokens_before = report.tokens_before,
720            tokens_after = report.tokens_after,
721            "context microcompacted"
722        );
723        self.events
724            .emit(AgentEvent::ContextMicrocompacted {
725                tokens_before: report.tokens_before,
726                tokens_after: report.tokens_after,
727                cleared: report.cleared,
728            })
729            .await;
730    }
731
732    /// Spawns a single-flight background full compaction when the soft threshold is
733    /// exceeded. Requires a slot and `Arc` references to history and provider (only
734    /// available in the top-level turn; child agent turns silently skip this, leaving it
735    /// to the synchronous compaction at the hard threshold).
736    async fn spawn_background_compaction(&self, hard_threshold: u64) {
737        let (Some(slot), Some(history_arc), Some(provider_arc)) = (
738            self.compaction_slot.as_ref(),
739            self.history_arc.as_ref(),
740            self.provider_arc.as_ref(),
741        ) else {
742            return;
743        };
744        if slot.is_in_flight() {
745            return; // Single-flight: a compaction is already in flight, do not start another.
746        }
747
748        // The compaction cancel token is independent of the turn cancel — the summary
749        // should be allowed to finish even if the originating turn has ended; however, it
750        // is cancelled when the session ends. For sub-agent paths that have no
751        // `session_cancel`, it falls back to the turn cancel.
752        let cancel = self
753            .session_cancel
754            .clone()
755            .unwrap_or_else(|| self.cancel.clone())
756            .child_token();
757        let ctx = compact::CompactionCtx {
758            provider: provider_arc.clone(),
759            model: self.config.model.clone(),
760            sampling: self.config.sampling.clone(),
761            tools: self.tools.schemas(),
762            cancel,
763        };
764        let events = self.events.clone();
765        let on_done: Arc<
766            dyn Fn(crate::session::CompactionReport) -> futures::future::BoxFuture<'static, ()>
767                + Send
768                + Sync,
769        > = Arc::new(move |report| {
770            // Return a future so that `emit` is awaited inside the compaction task body —
771            // no detached task is spawned, and event emission is governed by the
772            // compaction task's cancel/track semantics.
773            let events = events.clone();
774            Box::pin(async move {
775                events
776                    .emit(AgentEvent::ContextCompressed {
777                        tokens_before: report.tokens_before,
778                        tokens_after: report.tokens_after,
779                    })
780                    .await;
781            })
782        });
783        let started = slot.try_spawn(history_arc.clone(), ctx, hard_threshold, on_done);
784        if started {
785            tracing::info!(hard_threshold, "background compaction started");
786        }
787    }
788
789    /// Hard threshold fallback: wait for an in-flight background compaction to finish, or
790    /// run a synchronous compaction.
791    async fn compact_hard(&self, estimate: u64, hard: u64) -> Result<(), TurnError> {
792        // A background compaction is already in flight; wait for it to finish to avoid
793        // redundant work.
794        if let Some(slot) = self.compaction_slot.as_ref()
795            && slot.is_in_flight()
796        {
797            slot.await_in_flight().await;
798            return Ok(());
799        }
800
801        // Before the compact hook: the hook may `Skip` to veto this compaction (a
802        // mutating step).
803        let mut before = crate::hooks::step::BeforeCompact {
804            token_estimate: estimate,
805            threshold: hard,
806        };
807        if let crate::hooks::step::HookControl::Skip =
808            self.hooks.dispatch(&mut before, self.hook_ctx()).await
809        {
810            tracing::info!("compaction vetoed by before-compact hook");
811            return Ok(());
812        }
813
814        let ctx = self.sync_compaction_ctx();
815        let Some(report) = compact::run_sync(self.history, &ctx, hard).await else {
816            // No safe compaction boundary (e.g., a single very long turn) — skip this
817            // round, no event emitted.
818            return Ok(());
819        };
820        self.events
821            .emit(AgentEvent::ContextCompressed {
822                tokens_before: report.tokens_before,
823                tokens_after: report.tokens_after,
824            })
825            .await;
826
827        // After the compact hook: observe and allow injection (injected content goes into
828        // history).
829        let mut after = crate::hooks::step::AfterCompact {
830            tokens_before: report.tokens_before,
831            tokens_after: report.tokens_after,
832            additional_context: Vec::new(),
833        };
834        let _ = self.hooks.dispatch(&mut after, self.hook_ctx()).await;
835        if !after.additional_context.is_empty() {
836            self.append_user_feedback(after.additional_context);
837        }
838        Ok(())
839    }
840
841    /// The [`compact::CompactionCtx`] for synchronous compaction. Wrapping a borrowed
842    /// provider in a temporary `Arc` is not feasible (a trait object borrow cannot be
843    /// `Arc`), so the synchronous path requires `provider_arc`. Falling back when it is
844    /// missing is impossible (the top-level always has one; child agents use a borrowed
845    /// `provider`—see the `sync_compaction_ctx` implementation).
846    fn sync_compaction_ctx(&self) -> compact::CompactionCtx {
847        compact::CompactionCtx {
848            provider: self
849                .provider_arc
850                .clone()
851                .expect("sync compaction requires provider_arc"),
852            model: self.config.model.clone(),
853            sampling: self.config.sampling.clone(),
854            tools: self.tools.schemas(),
855            cancel: self.cancel.clone(),
856        }
857    }
858
859    /// Parse the three-tier compaction thresholds (in tokens) for this turn. Any tier set
860    /// to `None` means that tier is not triggered.
861    /// The model's context window in tokens, exactly as the provider reports it. `None` ⇒
862    /// the provider does not expose it (notably Bedrock, whose SDK returns no model
863    /// metadata). For decisions that need a ceiling, prefer [`Self::effective_context_window`].
864    fn context_window(&self) -> Option<u64> {
865        self.provider
866            .model_info(&self.config.model)
867            .and_then(|m| m.context_window)
868    }
869
870    /// The context window to actually drive compaction / oversized-result rejection with.
871    ///
872    /// Falls back to [`FALLBACK_CONTEXT_WINDOW`] when the provider exposes no window and the
873    /// user has not configured an explicit absolute ceiling
874    /// ([`TurnConfig::compact_threshold_tokens`]) — otherwise an unknown window means no
875    /// compaction at all and the context grows until the provider hard-rejects the request.
876    /// The fallback is deliberately conservative (compacting early is safe; overshooting is
877    /// not). A one-line warning is emitted once per model so the user knows to declare the
878    /// real `context_window` in config.
879    fn effective_context_window(&self) -> Option<u64> {
880        if let Some(window) = self.context_window() {
881            return Some(window);
882        }
883        // An explicit absolute hard threshold is the user's deliberate ceiling; respect it
884        // and do not fabricate a window (micro/soft simply stay off, as before).
885        if self.config.compact_threshold_tokens.is_some() {
886            return None;
887        }
888        warn_missing_context_window(&self.config.model);
889        Some(FALLBACK_CONTEXT_WINDOW)
890    }
891
892    fn compact_thresholds(&self) -> CompactThresholds {
893        let window = self.effective_context_window();
894
895        // For `hard`, an absolute threshold takes precedence; otherwise, use `ratio *
896        // window`.
897        let hard = self.config.compact_threshold_tokens.or_else(|| {
898            let ratio = self.config.compact_ratio?;
899            ratio_threshold(window?, ratio)
900        });
901        // micro/soft can only be derived from window (absolute overrides apply only to
902        // hard).
903        let from_ratio =
904            |ratio: Option<f64>| ratio.and_then(|r| window.and_then(|w| ratio_threshold(w, r)));
905        CompactThresholds {
906            micro: from_ratio(self.config.microcompact_ratio),
907            soft: from_ratio(self.config.compact_soft_ratio),
908            hard,
909        }
910    }
911
912    pub(super) fn hook_ctx(&self) -> HookCtx<'_> {
913        HookCtx::new(self.session_id, self.cwd, self.cancel.clone())
914    }
915}
916
917// ----- internal types -----
918
919#[derive(Clone, Copy)]
920struct TurnOutcome {
921    reason: AcpStopReason,
922    usage: Usage,
923}
924
925/// Three-tier compaction watermarks (in tokens). Each `None` means that tier is not
926/// triggered this turn.
927#[derive(Clone, Copy)]
928struct CompactThresholds {
929    micro: Option<u64>,
930    soft: Option<u64>,
931    hard: Option<u64>,
932}
933
934impl CompactThresholds {
935    /// All three thresholds absent — no proactive compaction this turn.
936    fn is_empty(&self) -> bool {
937        self.micro.is_none() && self.soft.is_none() && self.hard.is_none()
938    }
939}
940
941/// `context_window * ratio` rounded down. `ratio` is in `(0, 1]`. `0` → `None` (no
942/// trigger).
943fn ratio_threshold(context_window: u64, ratio: f64) -> Option<u64> {
944    let threshold = (context_window as f64 * ratio).floor() as u64;
945    (threshold > 0).then_some(threshold)
946}
947
948/// Conservative context window assumed when the provider exposes none and the user has not
949/// configured an explicit ceiling. Sized to the smallest window common across current
950/// models so compaction errs toward triggering early rather than overflowing — see
951/// [`TurnRunner::effective_context_window`]. Users should declare the real value via the
952/// model's `context_window` (e.g. `[providers.<name>] models = [{ id = "...",
953/// context_window = 200000 }]`) or `[turn].compact_threshold_tokens`.
954const FALLBACK_CONTEXT_WINDOW: u64 = 128_000;
955
956/// Emits a one-time (per model id) warning that the context window is unknown and the
957/// fallback is in effect. The hot turn loop calls `effective_context_window` every
958/// iteration, so the dedupe set prevents log spam.
959fn warn_missing_context_window(model: &str) {
960    use std::collections::HashSet;
961    use std::sync::Mutex;
962    use std::sync::OnceLock;
963
964    static WARNED: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
965    let mut warned = WARNED
966        .get_or_init(|| Mutex::new(HashSet::new()))
967        .lock()
968        .expect("warn-once mutex poisoned");
969    if warned.insert(model.to_string()) {
970        tracing::warn!(
971            model,
972            fallback = FALLBACK_CONTEXT_WINDOW,
973            "model exposes no context_window; assuming a conservative fallback for compaction. \
974             Declare the real value via the model's `context_window` in config or set \
975             `[turn].compact_threshold_tokens` to silence this."
976        );
977    }
978}
979
980/// Default upper limit for forced continuations in the `before turn-end` hook. Can be
981/// overridden by [`TurnConfig::max_hook_continues`] (config key
982/// `[turn].max_hook_continues`). See docs on hook step context exit semantics.
983pub(crate) const DEFAULT_MAX_HOOK_CONTINUES: u32 = 3;
984
985/// Default upper bound for subagent vertical recursion depth. Counted from the top-level
986/// turn: N levels means the top turn can spawn subagents, their children can spawn
987/// further, and so on, until the Nth level (where `subagent_max_depth` reaches 0) can no
988/// longer call `spawn_agent`. The default of 1 means the main agent can spawn subagents
989/// but those subagents cannot spawn further (the common non-recursive policy); raise it
990/// via `[turn].subagent_max_depth` for orchestrations like "main agent → coordinator
991/// subagent → worker subagent". Horizontal runaway is separately guarded by
992/// `request_limit`.
993pub(crate) const DEFAULT_SUBAGENT_MAX_DEPTH: u32 = 1;
994
995struct TurnState {
996    request_count: u32,
997    usage: Usage,
998    cap: Option<u32>,
999    expand_on_progress: bool,
1000    /// How many times this turn has been extended by the `before turn-end` hook. Cap is
1001    /// [`Self::max_stop_hook_continues`].
1002    stop_hook_continues: u32,
1003    /// Hard upper limit for life-extending continues (from
1004    /// [`TurnConfig::max_hook_continues`]). Prevents hooks from `Continue`ing
1005    /// indefinitely.
1006    max_stop_hook_continues: u32,
1007}
1008
1009impl TurnState {
1010    fn new(limit: TurnRequestLimit, max_hook_continues: u32) -> Self {
1011        Self {
1012            request_count: 0,
1013            usage: Usage::default(),
1014            cap: limit.initial_cap(),
1015            expand_on_progress: limit.expand_on_progress(),
1016            stop_hook_continues: 0,
1017            max_stop_hook_continues: max_hook_continues,
1018        }
1019    }
1020
1021    fn note_progress(&mut self) {
1022        if self.expand_on_progress
1023            && let Some(cap) = self.cap.as_mut()
1024        {
1025            *cap = cap.saturating_add(1);
1026        }
1027    }
1028
1029    /// Reset the per-turn request budget back to its initial state. Called when a
1030    /// `before turn-end` hook keeps the turn alive (e.g. goal mode continuing), so the
1031    /// `request_limit` behaves as a *per-logical-turn* budget rather than a single budget
1032    /// shared across the whole multi-turn run. The cap returns to its initial value
1033    /// (re-reading the configured strategy), discarding any `expand_on_progress` growth.
1034    fn reset_request_budget(&mut self, limit: TurnRequestLimit) {
1035        self.request_count = 0;
1036        self.cap = limit.initial_cap();
1037    }
1038
1039    fn exceeded_request_cap(&self) -> bool {
1040        match self.cap {
1041            None => false,
1042            Some(cap) => self.request_count >= cap,
1043        }
1044    }
1045
1046    /// Whether the `before turn-end` hook is still allowed to continue (has not reached
1047    /// the hard limit).
1048    fn may_stop_hook_continue(&self) -> bool {
1049        self.stop_hook_continues < self.max_stop_hook_continues
1050    }
1051
1052    /// Records one stop-hook continuation.
1053    fn note_stop_hook_continue(&mut self) {
1054        self.stop_hook_continues = self.stop_hook_continues.saturating_add(1);
1055    }
1056}
1057
1058// ----- helpers -----
1059
1060fn turn_outcome(state: &TurnState, reason: AcpStopReason) -> TurnOutcome {
1061    TurnOutcome {
1062        reason,
1063        usage: state.usage,
1064    }
1065}
1066
1067#[cfg(test)]
1068mod tests;