Skip to main content

codetether_agent/session/helper/
compression.rs

1//! Session history compression via the RLM router.
2//!
3//! This module contains the context-window enforcement logic that keeps
4//! the prompt under the model's token budget. It is invoked automatically
5//! at the start of every agent step by [`Session::run_loop`](crate::session::Session).
6//!
7//! ## Strategy
8//!
9//! 1. Estimate the current request token cost (system + messages + tools).
10//! 2. If it exceeds 90% of the model's usable budget, compress the prefix
11//!    of the conversation via [`RlmRouter::auto_process`], keeping the
12//!    most recent `keep_last` messages verbatim.
13//! 3. Progressively shrink `keep_last` (16 → 12 → 8 → 6) until the budget
14//!    is met or nothing more can be compressed.
15//!
16//! The compressed prefix is replaced by a single synthetic assistant
17//! message tagged `[AUTO CONTEXT COMPRESSION]` so the model sees a
18//! coherent summary rather than a truncated tail.
19//!
20//! ## Fallback decision table
21//!
22//! ```text
23//! ┌────────────────────────────────────┬─────────────────────────────────────┬────────────────────────────────────┐
24//! │ State after attempt                │ Action                              │ Events emitted                     │
25//! ├────────────────────────────────────┼─────────────────────────────────────┼────────────────────────────────────┤
26//! │ RLM keep_last ∈ {16,12,8,6} fits   │ Stop; request is ready              │ CompactionStarted → Completed(Rlm) │
27//! │ RLM auto_process errors on prefix  │ Fall back to chunk compression      │ (internal; logged via tracing)     │
28//! │ All 4 keep_last values exhausted   │ Apply terminal truncation           │ Completed(Truncate) + Truncated    │
29//! │ Terminal truncation still over bud │ Surface error to caller             │ Failed(fell_back_to = Truncate)    │
30//! └────────────────────────────────────┴─────────────────────────────────────┴────────────────────────────────────┘
31//! ```
32//!
33//! Terminal truncation drops the oldest messages outright (no summary)
34//! and is deliberately a distinct event from `CompactionCompleted` so
35//! consumers can warn the user about silent context loss.
36
37use std::sync::Arc;
38
39use anyhow::Result;
40use chrono::Utc;
41use tokio::sync::mpsc;
42use uuid::Uuid;
43
44use crate::provider::{ContentPart, Message, Role, ToolDefinition};
45use crate::rlm::router::AutoProcessContext;
46use crate::rlm::{RlmChunker, RlmRouter};
47
48use super::error::messages_to_rlm_context;
49use super::token::{
50    context_window_for_model, estimate_request_tokens, session_completion_max_tokens,
51};
52use crate::session::event_compaction::{
53    CompactionFailure, CompactionOutcome, CompactionStart, ContextTruncation, FallbackStrategy,
54};
55use crate::session::{Session, SessionEvent};
56
57/// Progressively smaller `keep_last` values tried by [`enforce_context_window`].
58const KEEP_LAST_CANDIDATES: [usize; 4] = [16, 12, 8, 6];
59
60/// Number of most-recent messages retained by the terminal truncation
61/// fallback when every RLM compaction attempt has failed to bring the
62/// request under budget. Must be small enough to leave room for the
63/// active turn, but large enough to keep the model's immediate context
64/// coherent.
65const TRUNCATE_KEEP_LAST: usize = 4;
66
67/// Fraction of the usable budget we target after compression.
68const SAFETY_BUDGET_RATIO: f64 = 0.90;
69
70/// Reserve (in tokens) added on top of `session_completion_max_tokens()` for
71/// tool schemas, protocol framing, and provider-specific wrappers.
72const RESERVE_OVERHEAD_TOKENS: usize = 2048;
73
74/// Fallback chunk-compression target size as a fraction of the ctx window.
75const FALLBACK_CHUNK_RATIO: f64 = 0.25;
76
77/// Default cheap model used for RLM context compression when neither
78/// [`crate::rlm::RlmConfig::root_model`] nor the
79/// `CODETETHER_RLM_MODEL` environment variable is set.
80///
81/// RLM compaction can consume a surprisingly large share of session
82/// cost because it runs on large prefixes of history at every step
83/// past the threshold. Using the caller's main model (often Claude
84/// Opus or GPT-5) for this work quietly multiplies spend. We default
85/// to a cheap general-purpose model instead; users who want higher
86/// fidelity summaries can override via config or env.
87///
88/// # Examples
89///
90/// ```rust
91/// use codetether_agent::session::helper::compression::DEFAULT_RLM_MODEL;
92/// assert!(DEFAULT_RLM_MODEL.contains('/'));
93/// ```
94pub const DEFAULT_RLM_MODEL: &str = "zai/glm-5.1";
95
96/// Resolve the model string that should be used for RLM compaction.
97///
98/// Precedence (highest first):
99///
100/// 1. [`crate::rlm::RlmConfig::root_model`] on the session.
101/// 2. The `CODETETHER_RLM_MODEL` environment variable.
102/// 3. [`DEFAULT_RLM_MODEL`].
103///
104/// Always returns `Some(...)` — the caller's main model is only ever
105/// used as a fallback inside [`compress_messages_keep_last`] when
106/// resolution against the returned string fails at the provider
107/// registry. The `Option` is retained to leave room for a future
108/// "prefer caller's model" policy without a signature break.
109fn resolve_rlm_model(rlm_config: &crate::rlm::RlmConfig) -> Option<String> {
110    if let Some(m) = rlm_config.root_model.as_ref() {
111        return Some(m.clone());
112    }
113    if let Ok(env_model) = std::env::var("CODETETHER_RLM_MODEL")
114        && !env_model.trim().is_empty()
115    {
116        return Some(env_model);
117    }
118    Some(DEFAULT_RLM_MODEL.to_string())
119}
120
121/// Candidate model pool that [`resolve_rlm_model_bandit`] can rank
122/// when the rule-based precedence returns no winner and delegation is
123/// enabled.
124///
125/// Hand-picked per CADMAS-CTX Section 5.9 — a small set of cheap
126/// general-purpose models with stable cost profiles so the LCB scoring
127/// actually has a meaningful dynamic range. Add models here rather
128/// than at call sites so the candidate list stays reviewable.
129const RLM_MODEL_CANDIDATES: &[&str] = &[
130    "zai/glm-5.1",
131    "glm5/glm-5",
132    "openrouter/openai/gpt-oss-120b:free",
133];
134
135/// CADMAS-CTX-aware variant of [`resolve_rlm_model`] (Phase C step 30).
136///
137/// When `state.config.enabled` is `true`, ranks [`RLM_MODEL_CANDIDATES`]
138/// by the LCB score `μ − γ·√u` under the supplied `bucket` (skill
139/// key: `"rlm_compact"`) and returns the highest-scoring candidate.
140/// When delegation is disabled, this is exactly [`resolve_rlm_model`].
141///
142/// The *update* half of the bandit loop lives at the compaction call
143/// site — a `session.state.update(model, "rlm_compact", bucket,
144/// produced_under_budget)` after each pass. That wiring is deferred
145/// to a follow-up commit to keep this one scoped to the selection
146/// primitive.
147///
148/// # Arguments
149///
150/// * `rlm_config` — Session-level RLM configuration.
151/// * `state` — CADMAS-CTX sidecar. Read-only here.
152/// * `bucket` — Context bucket from the current turn's
153///   [`RelevanceMeta`](crate::session::relevance::RelevanceMeta).
154///
155/// # Returns
156///
157/// Same shape as [`resolve_rlm_model`]: `Some(model)` to target a
158/// dedicated cheap model, or `None` to signal the caller should reuse
159/// its own model.
160pub(crate) fn resolve_rlm_model_bandit(
161    rlm_config: &crate::rlm::RlmConfig,
162    state: &crate::session::delegation::DelegationState,
163    bucket: crate::session::relevance::Bucket,
164) -> Option<String> {
165    if !state.config.enabled {
166        return resolve_rlm_model(rlm_config);
167    }
168    // Explicit configuration still wins — the bandit only disambiguates
169    // the otherwise-static fallback list.
170    if let Some(m) = rlm_config.root_model.as_ref() {
171        return Some(m.clone());
172    }
173    if let Ok(env_model) = std::env::var("CODETETHER_RLM_MODEL")
174        && !env_model.trim().is_empty()
175    {
176        return Some(env_model);
177    }
178
179    let mut best: Option<(&str, f64)> = None;
180    for candidate in RLM_MODEL_CANDIDATES {
181        let score = state
182            .score(
183                candidate,
184                crate::session::delegation_skills::RLM_COMPACT,
185                bucket,
186            )
187            .unwrap_or(0.0);
188        match best {
189            Some((_, current)) if current >= score => {}
190            _ => best = Some((candidate, score)),
191        }
192    }
193    best.map(|(m, _)| m.to_string())
194        .or_else(|| Some(DEFAULT_RLM_MODEL.to_string()))
195}
196
197/// Non-buffer inputs for [`compress_messages_keep_last`] and
198/// [`enforce_on_messages`].
199///
200/// Cloned from a [`Session`] by [`CompressContext::from_session`] so the
201/// core compression functions can operate on `&mut Vec<Message>` without
202/// holding any borrow of the owning [`Session`]. This is the pivot that
203/// lets [`derive_context`](crate::session::context::derive_context) run
204/// the compression pipeline on a *clone* of the history rather than
205/// mutating it in place — Phase A of the history/context split.
206///
207/// All fields are owned (not borrowed) so the context can be constructed
208/// up front and re-used across multiple compression passes without
209/// holding an outer borrow of the session.
210#[derive(Clone)]
211pub(crate) struct CompressContext {
212    /// RLM configuration for the session (thresholds, model selectors,
213    /// iteration limits).
214    pub rlm_config: crate::rlm::RlmConfig,
215    /// UUID of the owning session, propagated to RLM traces.
216    pub session_id: String,
217    /// Optional pre-resolved subcall provider from
218    /// [`SessionMetadata::subcall_provider`](crate::session::types::SessionMetadata).
219    pub subcall_provider: Option<Arc<dyn crate::provider::Provider>>,
220    /// Model name resolved alongside [`Self::subcall_provider`].
221    pub subcall_model: Option<String>,
222}
223
224impl CompressContext {
225    /// Snapshot the non-buffer fields of `session` into an owned context.
226    pub(crate) fn from_session(session: &Session) -> Self {
227        Self {
228            rlm_config: session.metadata.rlm.clone(),
229            session_id: session.id.clone(),
230            subcall_provider: session.metadata.subcall_provider.clone(),
231            subcall_model: session.metadata.subcall_model_name.clone(),
232        }
233    }
234}
235
236/// Compress everything older than the last `keep_last` messages in
237/// `messages` into a single synthetic `[AUTO CONTEXT COMPRESSION]`
238/// assistant message, in-place.
239///
240/// This is the `&mut Vec<Message>` core that powers both the legacy
241/// [`compress_history_keep_last`] wrapper (which still takes
242/// `&mut Session`) and the Phase B
243/// [`derive_context`](crate::session::context::derive_context) pipeline
244/// (which runs on a history clone).
245///
246/// # Arguments
247///
248/// * `messages` — The message buffer to rewrite in place.
249/// * `ctx` — Session-level configuration snapshot (RLM config, session
250///   id, subcall provider/model).
251/// * `provider` — Caller's primary provider. Used as a fallback when the
252///   dedicated RLM model cannot be resolved.
253/// * `model` — Caller's primary model identifier (used for ctx-window
254///   sizing and fallback routing).
255/// * `keep_last` — Number of most-recent messages to keep verbatim. Must
256///   be ≥ 0; when `messages.len() <= keep_last` the function returns
257///   `Ok(false)` without touching the buffer.
258/// * `reason` — Short diagnostic string recorded on the RLM trace.
259///
260/// # Returns
261///
262/// `Ok(true)` if the buffer was rewritten, `Ok(false)` if it was already
263/// short enough to skip compression.
264///
265/// # Errors
266///
267/// Propagates any error from [`RlmRouter::auto_process`] that cannot be
268/// recovered by the chunk-compression fallback.
269pub(crate) async fn compress_messages_keep_last(
270    messages: &mut Vec<Message>,
271    ctx: &CompressContext,
272    provider: Arc<dyn crate::provider::Provider>,
273    model: &str,
274    keep_last: usize,
275    reason: &str,
276) -> Result<bool> {
277    if messages.len() <= keep_last {
278        return Ok(false);
279    }
280
281    let split_idx = messages.len().saturating_sub(keep_last);
282    let tail = messages.split_off(split_idx);
283    let prefix = std::mem::take(messages);
284
285    let context = messages_to_rlm_context(&prefix);
286    let ctx_window = context_window_for_model(model);
287
288    // Prefer a cheap dedicated RLM model over the caller's main model
289    // to keep compaction cost bounded. Falls back to the caller's
290    // provider/model if the dedicated model cannot be resolved.
291    let (rlm_provider, rlm_model) = match resolve_rlm_model(&ctx.rlm_config) {
292        Some(target_model) if target_model != model => {
293            match crate::provider::ProviderRegistry::shared_from_vault().await {
294                Ok(registry) => match registry.resolve_model(&target_model) {
295                    Ok((p, m)) => {
296                        tracing::info!(
297                            rlm_model = %m,
298                            caller_model = %model,
299                            "RLM: using cheap dedicated model for compression"
300                        );
301                        (p, m)
302                    }
303                    Err(err) => {
304                        tracing::warn!(
305                            rlm_model = %target_model,
306                            error = %err,
307                            "RLM: dedicated model resolution failed; falling back to caller model"
308                        );
309                        (Arc::clone(&provider), model.to_string())
310                    }
311                },
312                Err(err) => {
313                    tracing::warn!(
314                        error = %err,
315                        "RLM: provider registry unavailable; falling back to caller model"
316                    );
317                    (Arc::clone(&provider), model.to_string())
318                }
319            }
320        }
321        _ => (Arc::clone(&provider), model.to_string()),
322    };
323
324    let auto_ctx = AutoProcessContext {
325        tool_id: "session_context",
326        tool_args: serde_json::json!({"reason": reason}),
327        session_id: &ctx.session_id,
328        abort: None,
329        on_progress: None,
330        provider: rlm_provider,
331        model: rlm_model.clone(),
332        bus: None,
333        trace_id: None,
334        subcall_provider: ctx.subcall_provider.clone(),
335        subcall_model: ctx.subcall_model.clone(),
336    };
337
338    let summary = match RlmRouter::auto_process(&context, auto_ctx, &ctx.rlm_config).await {
339        Ok(result) => {
340            tracing::info!(
341                reason,
342                rlm_model = %rlm_model,
343                input_tokens = result.stats.input_tokens,
344                output_tokens = result.stats.output_tokens,
345                compression_ratio = result.stats.compression_ratio,
346                "RLM: Compressed session history"
347            );
348            // Attribute RLM token spend against the RLM model so the
349            // TUI cost badge reflects it instead of silently inflating
350            // the caller's main model's reported usage.
351            crate::telemetry::TOKEN_USAGE.record_model_usage(
352                &rlm_model,
353                result.stats.input_tokens as u64,
354                result.stats.output_tokens as u64,
355            );
356            result.processed
357        }
358        Err(e) => {
359            tracing::warn!(
360                reason,
361                error = %e,
362                "RLM: Failed to compress session history; falling back to chunk compression"
363            );
364            RlmChunker::compress(
365                &context,
366                (ctx_window as f64 * FALLBACK_CHUNK_RATIO) as usize,
367                None,
368            )
369        }
370    };
371
372    let summary_msg = Message {
373        role: Role::Assistant,
374        content: vec![ContentPart::Text {
375            text: format!(
376                "[AUTO CONTEXT COMPRESSION]\nOlder conversation + tool output was compressed \
377                 to fit the model context window.\n\n{summary}\n\n\
378                 [RECOVERY] If you need specific details that this summary \
379                 dropped (exact file paths, prior tool output, earlier user \
380                 instructions, numeric values), call the `session_recall` \
381                 tool with a targeted query instead of guessing or asking \
382                 the user to repeat themselves."
383            ),
384        }],
385    };
386
387    let mut new_messages = Vec::with_capacity(1 + tail.len());
388    new_messages.push(summary_msg);
389    new_messages.extend(tail);
390    *messages = new_messages;
391
392    Ok(true)
393}
394
395/// Ratio of the model's context window a single "last" message can occupy
396/// before [`compress_last_message_if_oversized`] replaces it with an RLM
397/// summary.
398const OVERSIZED_LAST_MESSAGE_RATIO: f64 = 0.35;
399
400/// Character budget retained verbatim from the original message as a
401/// prefix, after RLM compression, so the model can see the literal
402/// opening of the request even when the body was summarised.
403const OVERSIZED_LAST_MESSAGE_PREFIX_CHARS: usize = 500;
404
405/// Compress the **last** message in `messages` in-place when its text
406/// content exceeds [`OVERSIZED_LAST_MESSAGE_RATIO`] of the model's
407/// context window.
408///
409/// This is the buffer-taking core that replaces the former
410/// `compress_user_message_if_oversized` helpers in `prompt.rs` and
411/// `prompt_events.rs`. Those called `session.messages.last_mut()`
412/// destructively on the canonical history; that was the last remaining
413/// mutator that broke the Phase A invariant (*history stays pure*).
414/// The derivation pipeline now calls this helper on a clone, so the
415/// original user text remains in [`Session::messages`] while the LLM
416/// sees the compressed projection.
417///
418/// Behaviour matches the legacy pair:
419///
420/// * When the last message contains ≤ threshold tokens, returns
421///   `Ok(false)` without touching the buffer.
422/// * On successful RLM compression, rewrites the last message's content
423///   to a single text part with an `[Original message: N tokens,
424///   compressed via RLM]` prefix, the compressed body, and the first
425///   [`OVERSIZED_LAST_MESSAGE_PREFIX_CHARS`] characters of the original
426///   as a literal suffix.
427/// * On RLM failure, falls back to [`RlmChunker::compress`].
428///
429/// # Arguments
430///
431/// * `messages` — The message buffer to rewrite in place. A no-op when
432///   empty.
433/// * `ctx` — Session-level configuration snapshot.
434/// * `provider` — Caller's primary provider (used as subcall fallback).
435/// * `model` — Caller's primary model identifier (governs ctx window).
436///
437/// # Returns
438///
439/// `Ok(true)` when the last message was rewritten, `Ok(false)` when no
440/// compression was needed.
441///
442/// # Errors
443///
444/// Never returns `Err` in the current implementation — RLM failures
445/// fall back to chunk-based compression, which is infallible. The
446/// `Result` return shape is preserved for forward-compatibility with
447/// future pipeline wiring.
448pub(crate) async fn compress_last_message_if_oversized(
449    messages: &mut [Message],
450    ctx: &CompressContext,
451    provider: Arc<dyn crate::provider::Provider>,
452    model: &str,
453) -> Result<bool> {
454    let Some(last) = messages.last_mut() else {
455        return Ok(false);
456    };
457
458    let original = extract_message_text(last);
459    if original.is_empty() {
460        return Ok(false);
461    }
462
463    let ctx_window = context_window_for_model(model);
464    let msg_tokens = RlmChunker::estimate_tokens(&original);
465    let threshold = (ctx_window as f64 * OVERSIZED_LAST_MESSAGE_RATIO) as usize;
466    if msg_tokens <= threshold {
467        return Ok(false);
468    }
469
470    tracing::info!(
471        msg_tokens,
472        threshold,
473        ctx_window,
474        "RLM: Last message exceeds context threshold, compressing"
475    );
476
477    let auto_ctx = AutoProcessContext {
478        tool_id: "session_context",
479        tool_args: serde_json::json!({}),
480        session_id: &ctx.session_id,
481        abort: None,
482        on_progress: None,
483        provider: Arc::clone(&provider),
484        model: model.to_string(),
485        bus: None,
486        trace_id: None,
487        subcall_provider: ctx.subcall_provider.clone(),
488        subcall_model: ctx.subcall_model.clone(),
489    };
490
491    let replacement = match RlmRouter::auto_process(&original, auto_ctx, &ctx.rlm_config).await {
492        Ok(result) => {
493            tracing::info!(
494                input_tokens = result.stats.input_tokens,
495                output_tokens = result.stats.output_tokens,
496                "RLM: Last message compressed"
497            );
498            format!(
499                "[Original message: {msg_tokens} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
500                result.processed,
501                original
502                    .chars()
503                    .take(OVERSIZED_LAST_MESSAGE_PREFIX_CHARS)
504                    .collect::<String>()
505            )
506        }
507        Err(e) => {
508            tracing::warn!(error = %e, "RLM: Failed to compress last message, using truncation");
509            let max_chars = threshold * 4;
510            RlmChunker::compress(&original, max_chars / 4, None)
511        }
512    };
513
514    last.content = vec![ContentPart::Text { text: replacement }];
515    Ok(true)
516}
517
518/// Concatenate the textual content of `msg`, skipping non-text parts.
519///
520/// Used by [`compress_last_message_if_oversized`] to recover the string
521/// form of a message without assuming the content vector has exactly one
522/// text part.
523fn extract_message_text(msg: &Message) -> String {
524    let mut buf = String::new();
525    for part in &msg.content {
526        if let ContentPart::Text { text } = part {
527            if !buf.is_empty() {
528                buf.push('\n');
529            }
530            buf.push_str(text);
531        }
532    }
533    buf
534}
535
536/// Back-compat wrapper over [`compress_messages_keep_last`] that operates
537/// on an owning [`Session`]. Bumps [`Session::updated_at`] when the
538/// buffer was rewritten.
539///
540/// Prefer [`compress_messages_keep_last`] for new code — it lets you run
541/// compression on a history clone without mutating the canonical
542/// [`Session::messages`] buffer.
543///
544/// # Errors
545///
546/// Propagates any error from the underlying core.
547pub(crate) async fn compress_history_keep_last(
548    session: &mut Session,
549    provider: Arc<dyn crate::provider::Provider>,
550    model: &str,
551    keep_last: usize,
552    reason: &str,
553) -> Result<bool> {
554    let ctx = CompressContext::from_session(session);
555    let did = compress_messages_keep_last(
556        &mut session.messages,
557        &ctx,
558        provider,
559        model,
560        keep_last,
561        reason,
562    )
563    .await?;
564    if did {
565        session.updated_at = Utc::now();
566    }
567    Ok(did)
568}
569
570/// Ensure the estimated request token cost of `messages` fits within the
571/// model's safety budget, in-place.
572///
573/// This is the `&mut Vec<Message>` core that powers both the legacy
574/// [`enforce_context_window`] wrapper (which takes `&mut Session`) and
575/// the Phase B [`derive_context`](crate::session::context::derive_context)
576/// pipeline (which runs on a history clone and never mutates the
577/// canonical [`Session::messages`] buffer).
578///
579/// # Arguments
580///
581/// * `messages` — The message buffer to rewrite in place.
582/// * `ctx` — Session-level configuration snapshot.
583/// * `provider` — Caller's primary provider.
584/// * `model` — Caller's primary model identifier (governs ctx window).
585/// * `system_prompt` — Current system prompt, included in token estimates.
586/// * `tools` — Tool definitions, included in token estimates.
587/// * `event_tx` — Optional channel for emitting
588///   [`SessionEvent::CompactionStarted`] /
589///   [`SessionEvent::CompactionCompleted`] /
590///   [`SessionEvent::ContextTruncated`] /
591///   [`SessionEvent::CompactionFailed`]. When `None`, compaction runs
592///   silently.
593///
594/// # Errors
595///
596/// Returns `Err` only if an underlying RLM pass errors in a way the
597/// fallback cascade cannot recover from. Terminal truncation itself is
598/// infallible.
599pub(crate) async fn enforce_on_messages(
600    messages: &mut Vec<Message>,
601    ctx: &CompressContext,
602    provider: Arc<dyn crate::provider::Provider>,
603    model: &str,
604    system_prompt: &str,
605    tools: &[ToolDefinition],
606    event_tx: Option<&mpsc::Sender<SessionEvent>>,
607) -> Result<()> {
608    let ctx_window = context_window_for_model(model);
609    let reserve = session_completion_max_tokens().saturating_add(RESERVE_OVERHEAD_TOKENS);
610    let budget = ctx_window.saturating_sub(reserve);
611    let safety_budget = (budget as f64 * SAFETY_BUDGET_RATIO) as usize;
612
613    let initial_est = estimate_request_tokens(system_prompt, messages, tools);
614    let history_trigger = ctx.rlm_config.history_trigger_messages;
615    let history_over = history_trigger > 0 && messages.len() >= history_trigger;
616    if initial_est <= safety_budget && !history_over {
617        return Ok(());
618    }
619
620    let trigger_reason = if history_over && initial_est <= safety_budget {
621        "history_length"
622    } else {
623        "context_budget"
624    };
625
626    let trace_id = Uuid::new_v4();
627    emit(
628        event_tx,
629        SessionEvent::CompactionStarted(CompactionStart {
630            trace_id,
631            reason: trigger_reason.to_string(),
632            before_tokens: initial_est,
633            budget: safety_budget,
634        }),
635    )
636    .await;
637
638    for keep_last in KEEP_LAST_CANDIDATES {
639        let est = estimate_request_tokens(system_prompt, messages, tools);
640        let still_long =
641            history_trigger > 0 && messages.len() > history_trigger.saturating_sub(keep_last);
642        if est <= safety_budget && !still_long {
643            emit(
644                event_tx,
645                SessionEvent::CompactionCompleted(CompactionOutcome {
646                    trace_id,
647                    strategy: FallbackStrategy::Rlm,
648                    before_tokens: initial_est,
649                    after_tokens: est,
650                    kept_messages: messages.len(),
651                }),
652            )
653            .await;
654            return Ok(());
655        }
656
657        tracing::info!(
658            est_tokens = est,
659            ctx_window,
660            safety_budget,
661            keep_last,
662            "Context window approaching limit; compressing older session history"
663        );
664
665        let did = compress_messages_keep_last(
666            messages,
667            ctx,
668            Arc::clone(&provider),
669            model,
670            keep_last,
671            trigger_reason,
672        )
673        .await?;
674
675        if !did {
676            break;
677        }
678    }
679
680    // Re-estimate one last time after the final RLM pass.
681    let last_est = estimate_request_tokens(system_prompt, messages, tools);
682    if last_est <= safety_budget {
683        emit(
684            event_tx,
685            SessionEvent::CompactionCompleted(CompactionOutcome {
686                trace_id,
687                strategy: FallbackStrategy::Rlm,
688                before_tokens: initial_est,
689                after_tokens: last_est,
690                kept_messages: messages.len(),
691            }),
692        )
693        .await;
694        return Ok(());
695    }
696
697    // Every RLM / chunk attempt still leaves us over budget.
698    // Apply terminal truncation as the last-resort fallback.
699    let dropped_tokens =
700        terminal_truncate_messages(messages, system_prompt, tools, TRUNCATE_KEEP_LAST);
701    let after_tokens = estimate_request_tokens(system_prompt, messages, tools);
702
703    tracing::warn!(
704        before_tokens = initial_est,
705        after_tokens,
706        dropped_tokens,
707        kept_messages = messages.len(),
708        safety_budget,
709        "All RLM compaction attempts exhausted; applied terminal truncation fallback"
710    );
711
712    emit(
713        event_tx,
714        SessionEvent::ContextTruncated(ContextTruncation {
715            trace_id,
716            dropped_tokens,
717            kept_messages: messages.len(),
718            archive_ref: None,
719        }),
720    )
721    .await;
722    emit(
723        event_tx,
724        SessionEvent::CompactionCompleted(CompactionOutcome {
725            trace_id,
726            strategy: FallbackStrategy::Truncate,
727            before_tokens: initial_est,
728            after_tokens,
729            kept_messages: messages.len(),
730        }),
731    )
732    .await;
733
734    if after_tokens > safety_budget {
735        tracing::error!(
736            after_tokens,
737            safety_budget,
738            "Terminal truncation still over budget; request will likely fail at the provider"
739        );
740        emit(
741            event_tx,
742            SessionEvent::CompactionFailed(CompactionFailure {
743                trace_id,
744                fell_back_to: Some(FallbackStrategy::Truncate),
745                reason: "terminal truncation still exceeds safety budget".to_string(),
746                after_tokens,
747                budget: safety_budget,
748            }),
749        )
750        .await;
751    }
752
753    Ok(())
754}
755
756/// Back-compat wrapper over [`enforce_on_messages`] that operates on an
757/// owning [`Session`]. Bumps [`Session::updated_at`] when the buffer was
758/// rewritten.
759///
760/// Prefer [`enforce_on_messages`] for new code — it lets you run
761/// context-window enforcement on a history clone without mutating the
762/// canonical [`Session::messages`] buffer (the Phase A history/context
763/// split).
764///
765/// # Errors
766///
767/// Propagates any error from the underlying core.
768pub(crate) async fn enforce_context_window(
769    session: &mut Session,
770    provider: Arc<dyn crate::provider::Provider>,
771    model: &str,
772    system_prompt: &str,
773    tools: &[ToolDefinition],
774    event_tx: Option<&mpsc::Sender<SessionEvent>>,
775) -> Result<()> {
776    let ctx = CompressContext::from_session(session);
777    let before_len = session.messages.len();
778    enforce_on_messages(
779        &mut session.messages,
780        &ctx,
781        provider,
782        model,
783        system_prompt,
784        tools,
785        event_tx,
786    )
787    .await?;
788    if session.messages.len() != before_len {
789        session.updated_at = Utc::now();
790    }
791    Ok(())
792}
793
794/// Drop everything older than the last `keep_last` messages in
795/// `messages` and return an approximate count of the tokens removed.
796///
797/// Unlike [`compress_messages_keep_last`] this keeps **no** summary of
798/// the dropped prefix — the caller is expected to have already attempted
799/// RLM-based compaction and exhausted it. A `[CONTEXT TRUNCATED]`
800/// assistant marker is prepended so the model is aware that older turns
801/// were silently removed.
802///
803/// This is the `&mut Vec<Message>` core for terminal truncation. It is
804/// called from [`enforce_on_messages`] as the last-resort fallback when
805/// every RLM / chunk attempt has left the buffer over budget.
806///
807/// # Arguments
808///
809/// * `messages` — The message buffer to rewrite in place.
810/// * `system_prompt` — Included in the before/after token estimates so
811///   the caller sees the net effect on request size, not just message
812///   count.
813/// * `tools` — Tool definitions, included in token estimates.
814/// * `keep_last` — Number of most-recent messages to retain.
815///
816/// # Returns
817///
818/// An approximate count of tokens removed (saturating subtraction).
819/// Returns `0` when `messages.len() <= keep_last` and no work is done.
820pub(crate) fn terminal_truncate_messages(
821    messages: &mut Vec<Message>,
822    system_prompt: &str,
823    tools: &[ToolDefinition],
824    keep_last: usize,
825) -> usize {
826    if messages.len() <= keep_last {
827        return 0;
828    }
829
830    let before = estimate_request_tokens(system_prompt, messages, tools);
831    let split_idx = messages.len().saturating_sub(keep_last);
832    let tail = messages.split_off(split_idx);
833
834    let marker = Message {
835        role: Role::Assistant,
836        content: vec![ContentPart::Text {
837            text: "[CONTEXT TRUNCATED]\nOlder conversation was dropped to keep the request \
838                   under the model's context window. Ask for details to recall anything \
839                   specific."
840                .to_string(),
841        }],
842    };
843
844    let mut new_messages = Vec::with_capacity(1 + tail.len());
845    new_messages.push(marker);
846    new_messages.extend(tail);
847    *messages = new_messages;
848
849    let after = estimate_request_tokens(system_prompt, messages, tools);
850    before.saturating_sub(after)
851}
852
853async fn emit(event_tx: Option<&mpsc::Sender<SessionEvent>>, ev: SessionEvent) {
854    if let Some(tx) = event_tx {
855        let _ = tx.send(ev).await;
856    }
857}
858
859#[cfg(test)]
860mod tests {
861    use super::*;
862
863    fn user(text: &str) -> Message {
864        Message {
865            role: Role::User,
866            content: vec![ContentPart::Text {
867                text: text.to_string(),
868            }],
869        }
870    }
871
872    #[test]
873    fn terminal_truncate_noop_when_short_enough() {
874        let mut messages = vec![user("a"), user("b")];
875        let before_len = messages.len();
876        let dropped = terminal_truncate_messages(&mut messages, "", &[], 4);
877        assert_eq!(dropped, 0);
878        assert_eq!(messages.len(), before_len);
879    }
880
881    #[test]
882    fn terminal_truncate_drops_prefix_and_prepends_marker() {
883        let mut messages: Vec<Message> = (0..10).map(|i| user(&format!("msg-{i}"))).collect();
884        let _ = terminal_truncate_messages(&mut messages, "", &[], 3);
885
886        // 1 synthetic marker + 3 most-recent messages.
887        assert_eq!(messages.len(), 4);
888
889        // First message is the [CONTEXT TRUNCATED] assistant marker.
890        assert!(matches!(messages[0].role, Role::Assistant));
891        if let ContentPart::Text { text } = &messages[0].content[0] {
892            assert!(text.starts_with("[CONTEXT TRUNCATED]"));
893        } else {
894            panic!("expected text content on the synthetic marker");
895        }
896
897        // Remaining three messages are the most-recent user messages,
898        // preserved verbatim and in their original order.
899        let kept_texts: Vec<&str> = messages[1..]
900            .iter()
901            .map(|m| match &m.content[0] {
902                ContentPart::Text { text } => text.as_str(),
903                _ => panic!("expected text content"),
904            })
905            .collect();
906        assert_eq!(kept_texts, vec!["msg-7", "msg-8", "msg-9"]);
907    }
908
909    #[test]
910    fn compress_context_from_session_snapshot_is_independent() {
911        // Changing the session's metadata after snapshotting must not
912        // leak through the captured CompressContext. This lets the
913        // Phase B derive_context pipeline hold a CompressContext
914        // alongside a `&mut Vec<Message>` borrowed from the same
915        // Session without running into borrow conflicts.
916        let session = Session {
917            id: "session-42".to_string(),
918            title: None,
919            created_at: chrono::Utc::now(),
920            updated_at: chrono::Utc::now(),
921            metadata: Default::default(),
922            agent: "test".to_string(),
923            messages: Vec::new(),
924            tool_uses: Vec::new(),
925            usage: Default::default(),
926            max_steps: None,
927            bus: None,
928        };
929        let snapshot = CompressContext::from_session(&session);
930        assert_eq!(snapshot.session_id, "session-42");
931        assert!(snapshot.subcall_provider.is_none());
932        assert!(snapshot.subcall_model.is_none());
933    }
934}