codetether_agent/session/helper/compression.rs
1//! Session history compression via the RLM router.
2//!
3//! This module contains the context-window enforcement logic that keeps
4//! the prompt under the model's token budget. It is invoked automatically
5//! at the start of every agent step by [`Session::run_loop`](crate::session::Session).
6//!
7//! ## Strategy
8//!
9//! 1. Estimate the current request token cost (system + messages + tools).
10//! 2. If it exceeds 90% of the model's usable budget, compress the prefix
11//! of the conversation via [`RlmRouter::auto_process`], keeping the
12//! most recent `keep_last` messages verbatim.
13//! 3. Progressively shrink `keep_last` (16 → 12 → 8 → 6) until the budget
14//! is met or nothing more can be compressed.
15//!
16//! The compressed prefix is replaced by a single synthetic assistant
17//! message tagged `[AUTO CONTEXT COMPRESSION]` so the model sees a
18//! coherent summary rather than a truncated tail.
19//!
20//! ## Fallback decision table
21//!
22//! ```text
23//! ┌────────────────────────────────────┬─────────────────────────────────────┬────────────────────────────────────┐
24//! │ State after attempt │ Action │ Events emitted │
25//! ├────────────────────────────────────┼─────────────────────────────────────┼────────────────────────────────────┤
26//! │ RLM keep_last ∈ {16,12,8,6} fits │ Stop; request is ready │ CompactionStarted → Completed(Rlm) │
27//! │ RLM auto_process errors on prefix │ Fall back to chunk compression │ (internal; logged via tracing) │
28//! │ All 4 keep_last values exhausted │ Apply terminal truncation │ Completed(Truncate) + Truncated │
29//! │ Terminal truncation still over bud │ Surface error to caller │ Failed(fell_back_to = Truncate) │
30//! └────────────────────────────────────┴─────────────────────────────────────┴────────────────────────────────────┘
31//! ```
32//!
33//! Terminal truncation drops the oldest messages outright (no summary)
34//! and is deliberately a distinct event from `CompactionCompleted` so
35//! consumers can warn the user about silent context loss.
36
37use std::sync::Arc;
38
39use anyhow::Result;
40use chrono::Utc;
41use tokio::sync::mpsc;
42use uuid::Uuid;
43
44use crate::provider::{ContentPart, Message, Role, ToolDefinition};
45use crate::rlm::router::AutoProcessContext;
46use crate::rlm::{RlmChunker, RlmRouter};
47
48use super::error::messages_to_rlm_context;
49use super::token::{
50 context_window_for_model, estimate_request_tokens, session_completion_max_tokens,
51};
52use crate::session::event_compaction::{
53 CompactionFailure, CompactionOutcome, CompactionStart, ContextTruncation, FallbackStrategy,
54};
55use crate::session::{Session, SessionEvent};
56
57/// Progressively smaller `keep_last` values tried by [`enforce_context_window`].
58const KEEP_LAST_CANDIDATES: [usize; 4] = [16, 12, 8, 6];
59
60/// Number of most-recent messages retained by the terminal truncation
61/// fallback when every RLM compaction attempt has failed to bring the
62/// request under budget. Must be small enough to leave room for the
63/// active turn, but large enough to keep the model's immediate context
64/// coherent.
65const TRUNCATE_KEEP_LAST: usize = 4;
66
67/// Fraction of the usable budget we target after compression.
68const SAFETY_BUDGET_RATIO: f64 = 0.90;
69
70/// Reserve (in tokens) added on top of `session_completion_max_tokens()` for
71/// tool schemas, protocol framing, and provider-specific wrappers.
72const RESERVE_OVERHEAD_TOKENS: usize = 2048;
73
74/// Fallback chunk-compression target size as a fraction of the ctx window.
75const FALLBACK_CHUNK_RATIO: f64 = 0.25;
76
77/// Default cheap model used for RLM context compression when neither
78/// [`crate::rlm::RlmConfig::root_model`] nor the
79/// `CODETETHER_RLM_MODEL` environment variable is set.
80///
81/// RLM compaction can consume a surprisingly large share of session
82/// cost because it runs on large prefixes of history at every step
83/// past the threshold. Using the caller's main model (often Claude
84/// Opus or GPT-5) for this work quietly multiplies spend. We default
85/// to a cheap general-purpose model instead; users who want higher
86/// fidelity summaries can override via config or env.
87///
88/// # Examples
89///
90/// ```rust
91/// use codetether_agent::session::helper::compression::DEFAULT_RLM_MODEL;
92/// assert!(DEFAULT_RLM_MODEL.contains('/'));
93/// ```
94pub const DEFAULT_RLM_MODEL: &str = "zai/glm-5.1";
95
96/// Resolve the model string that should be used for RLM compaction.
97///
98/// Precedence (highest first):
99///
100/// 1. [`crate::rlm::RlmConfig::root_model`] on the session.
101/// 2. The `CODETETHER_RLM_MODEL` environment variable.
102/// 3. [`DEFAULT_RLM_MODEL`].
103///
104/// Always returns `Some(...)` — the caller's main model is only ever
105/// used as a fallback inside [`compress_messages_keep_last`] when
106/// resolution against the returned string fails at the provider
107/// registry. The `Option` is retained to leave room for a future
108/// "prefer caller's model" policy without a signature break.
109fn resolve_rlm_model(rlm_config: &crate::rlm::RlmConfig) -> Option<String> {
110 if let Some(m) = rlm_config.root_model.as_ref() {
111 return Some(m.clone());
112 }
113 if let Ok(env_model) = std::env::var("CODETETHER_RLM_MODEL")
114 && !env_model.trim().is_empty()
115 {
116 return Some(env_model);
117 }
118 Some(DEFAULT_RLM_MODEL.to_string())
119}
120
121/// Candidate model pool that [`resolve_rlm_model_bandit`] can rank
122/// when the rule-based precedence returns no winner and delegation is
123/// enabled.
124///
125/// Hand-picked per CADMAS-CTX Section 5.9 — a small set of cheap
126/// general-purpose models with stable cost profiles so the LCB scoring
127/// actually has a meaningful dynamic range. Add models here rather
128/// than at call sites so the candidate list stays reviewable.
129const RLM_MODEL_CANDIDATES: &[&str] = &[
130 "zai/glm-5.1",
131 "glm5/glm-5",
132 "openrouter/openai/gpt-oss-120b:free",
133];
134
135/// CADMAS-CTX-aware variant of [`resolve_rlm_model`] (Phase C step 30).
136///
137/// When `state.config.enabled` is `true`, ranks [`RLM_MODEL_CANDIDATES`]
138/// by the LCB score `μ − γ·√u` under the supplied `bucket` (skill
139/// key: `"rlm_compact"`) and returns the highest-scoring candidate.
140/// When delegation is disabled, this is exactly [`resolve_rlm_model`].
141///
142/// The *update* half of the bandit loop lives at the compaction call
143/// site — a `session.state.update(model, "rlm_compact", bucket,
144/// produced_under_budget)` after each pass. That wiring is deferred
145/// to a follow-up commit to keep this one scoped to the selection
146/// primitive.
147///
148/// # Arguments
149///
150/// * `rlm_config` — Session-level RLM configuration.
151/// * `state` — CADMAS-CTX sidecar. Read-only here.
152/// * `bucket` — Context bucket from the current turn's
153/// [`RelevanceMeta`](crate::session::relevance::RelevanceMeta).
154///
155/// # Returns
156///
157/// Same shape as [`resolve_rlm_model`]: `Some(model)` to target a
158/// dedicated cheap model, or `None` to signal the caller should reuse
159/// its own model.
160pub(crate) fn resolve_rlm_model_bandit(
161 rlm_config: &crate::rlm::RlmConfig,
162 state: &crate::session::delegation::DelegationState,
163 bucket: crate::session::relevance::Bucket,
164) -> Option<String> {
165 if !state.config.enabled {
166 return resolve_rlm_model(rlm_config);
167 }
168 // Explicit configuration still wins — the bandit only disambiguates
169 // the otherwise-static fallback list.
170 if let Some(m) = rlm_config.root_model.as_ref() {
171 return Some(m.clone());
172 }
173 if let Ok(env_model) = std::env::var("CODETETHER_RLM_MODEL")
174 && !env_model.trim().is_empty()
175 {
176 return Some(env_model);
177 }
178
179 let mut best: Option<(&str, f64)> = None;
180 for candidate in RLM_MODEL_CANDIDATES {
181 let score = state
182 .score(
183 candidate,
184 crate::session::delegation_skills::RLM_COMPACT,
185 bucket,
186 )
187 .unwrap_or(0.0);
188 match best {
189 Some((_, current)) if current >= score => {}
190 _ => best = Some((candidate, score)),
191 }
192 }
193 best.map(|(m, _)| m.to_string())
194 .or_else(|| Some(DEFAULT_RLM_MODEL.to_string()))
195}
196
197/// Non-buffer inputs for [`compress_messages_keep_last`] and
198/// [`enforce_on_messages`].
199///
200/// Cloned from a [`Session`] by [`CompressContext::from_session`] so the
201/// core compression functions can operate on `&mut Vec<Message>` without
202/// holding any borrow of the owning [`Session`]. This is the pivot that
203/// lets [`derive_context`](crate::session::context::derive_context) run
204/// the compression pipeline on a *clone* of the history rather than
205/// mutating it in place — Phase A of the history/context split.
206///
207/// All fields are owned (not borrowed) so the context can be constructed
208/// up front and re-used across multiple compression passes without
209/// holding an outer borrow of the session.
210#[derive(Clone)]
211pub(crate) struct CompressContext {
212 /// RLM configuration for the session (thresholds, model selectors,
213 /// iteration limits).
214 pub rlm_config: crate::rlm::RlmConfig,
215 /// UUID of the owning session, propagated to RLM traces.
216 pub session_id: String,
217 /// Optional pre-resolved subcall provider from
218 /// [`SessionMetadata::subcall_provider`](crate::session::types::SessionMetadata).
219 pub subcall_provider: Option<Arc<dyn crate::provider::Provider>>,
220 /// Model name resolved alongside [`Self::subcall_provider`].
221 pub subcall_model: Option<String>,
222}
223
224impl CompressContext {
225 /// Snapshot the non-buffer fields of `session` into an owned context.
226 pub(crate) fn from_session(session: &Session) -> Self {
227 Self {
228 rlm_config: session.metadata.rlm.clone(),
229 session_id: session.id.clone(),
230 subcall_provider: session.metadata.subcall_provider.clone(),
231 subcall_model: session.metadata.subcall_model_name.clone(),
232 }
233 }
234}
235
236/// Compress everything older than the last `keep_last` messages in
237/// `messages` into a single synthetic `[AUTO CONTEXT COMPRESSION]`
238/// assistant message, in-place.
239///
240/// This is the `&mut Vec<Message>` core that powers both the legacy
241/// [`compress_history_keep_last`] wrapper (which still takes
242/// `&mut Session`) and the Phase B
243/// [`derive_context`](crate::session::context::derive_context) pipeline
244/// (which runs on a history clone).
245///
246/// # Arguments
247///
248/// * `messages` — The message buffer to rewrite in place.
249/// * `ctx` — Session-level configuration snapshot (RLM config, session
250/// id, subcall provider/model).
251/// * `provider` — Caller's primary provider. Used as a fallback when the
252/// dedicated RLM model cannot be resolved.
253/// * `model` — Caller's primary model identifier (used for ctx-window
254/// sizing and fallback routing).
255/// * `keep_last` — Number of most-recent messages to keep verbatim. Must
256/// be ≥ 0; when `messages.len() <= keep_last` the function returns
257/// `Ok(false)` without touching the buffer.
258/// * `reason` — Short diagnostic string recorded on the RLM trace.
259///
260/// # Returns
261///
262/// `Ok(true)` if the buffer was rewritten, `Ok(false)` if it was already
263/// short enough to skip compression.
264///
265/// # Errors
266///
267/// Propagates any error from [`RlmRouter::auto_process`] that cannot be
268/// recovered by the chunk-compression fallback.
269pub(crate) async fn compress_messages_keep_last(
270 messages: &mut Vec<Message>,
271 ctx: &CompressContext,
272 provider: Arc<dyn crate::provider::Provider>,
273 model: &str,
274 keep_last: usize,
275 reason: &str,
276) -> Result<bool> {
277 if messages.len() <= keep_last {
278 return Ok(false);
279 }
280
281 let split_idx = messages.len().saturating_sub(keep_last);
282 let tail = messages.split_off(split_idx);
283 let prefix = std::mem::take(messages);
284
285 let context = messages_to_rlm_context(&prefix);
286 let ctx_window = context_window_for_model(model);
287
288 // Prefer a cheap dedicated RLM model over the caller's main model
289 // to keep compaction cost bounded. Falls back to the caller's
290 // provider/model if the dedicated model cannot be resolved.
291 let (rlm_provider, rlm_model) = match resolve_rlm_model(&ctx.rlm_config) {
292 Some(target_model) if target_model != model => {
293 match crate::provider::ProviderRegistry::shared_from_vault().await {
294 Ok(registry) => match registry.resolve_model(&target_model) {
295 Ok((p, m)) => {
296 tracing::info!(
297 rlm_model = %m,
298 caller_model = %model,
299 "RLM: using cheap dedicated model for compression"
300 );
301 (p, m)
302 }
303 Err(err) => {
304 tracing::warn!(
305 rlm_model = %target_model,
306 error = %err,
307 "RLM: dedicated model resolution failed; falling back to caller model"
308 );
309 (Arc::clone(&provider), model.to_string())
310 }
311 },
312 Err(err) => {
313 tracing::warn!(
314 error = %err,
315 "RLM: provider registry unavailable; falling back to caller model"
316 );
317 (Arc::clone(&provider), model.to_string())
318 }
319 }
320 }
321 _ => (Arc::clone(&provider), model.to_string()),
322 };
323
324 let auto_ctx = AutoProcessContext {
325 tool_id: "session_context",
326 tool_args: serde_json::json!({"reason": reason}),
327 session_id: &ctx.session_id,
328 abort: None,
329 on_progress: None,
330 provider: rlm_provider,
331 model: rlm_model.clone(),
332 bus: None,
333 trace_id: None,
334 subcall_provider: ctx.subcall_provider.clone(),
335 subcall_model: ctx.subcall_model.clone(),
336 };
337
338 let summary = match RlmRouter::auto_process(&context, auto_ctx, &ctx.rlm_config).await {
339 Ok(result) => {
340 tracing::info!(
341 reason,
342 rlm_model = %rlm_model,
343 input_tokens = result.stats.input_tokens,
344 output_tokens = result.stats.output_tokens,
345 compression_ratio = result.stats.compression_ratio,
346 "RLM: Compressed session history"
347 );
348 // Attribute RLM token spend against the RLM model so the
349 // TUI cost badge reflects it instead of silently inflating
350 // the caller's main model's reported usage.
351 crate::telemetry::TOKEN_USAGE.record_model_usage(
352 &rlm_model,
353 result.stats.input_tokens as u64,
354 result.stats.output_tokens as u64,
355 );
356 result.processed
357 }
358 Err(e) => {
359 tracing::warn!(
360 reason,
361 error = %e,
362 "RLM: Failed to compress session history; falling back to chunk compression"
363 );
364 RlmChunker::compress(
365 &context,
366 (ctx_window as f64 * FALLBACK_CHUNK_RATIO) as usize,
367 None,
368 )
369 }
370 };
371
372 let summary_msg = Message {
373 role: Role::Assistant,
374 content: vec![ContentPart::Text {
375 text: format!(
376 "[AUTO CONTEXT COMPRESSION]\nOlder conversation + tool output was compressed \
377 to fit the model context window.\n\n{summary}\n\n\
378 [RECOVERY] If you need specific details that this summary \
379 dropped (exact file paths, prior tool output, earlier user \
380 instructions, numeric values), call the `session_recall` \
381 tool with a targeted query instead of guessing or asking \
382 the user to repeat themselves."
383 ),
384 }],
385 };
386
387 let mut new_messages = Vec::with_capacity(1 + tail.len());
388 new_messages.push(summary_msg);
389 new_messages.extend(tail);
390 *messages = new_messages;
391
392 Ok(true)
393}
394
395/// Ratio of the model's context window a single "last" message can occupy
396/// before [`compress_last_message_if_oversized`] replaces it with an RLM
397/// summary.
398const OVERSIZED_LAST_MESSAGE_RATIO: f64 = 0.35;
399
400/// Character budget retained verbatim from the original message as a
401/// prefix, after RLM compression, so the model can see the literal
402/// opening of the request even when the body was summarised.
403const OVERSIZED_LAST_MESSAGE_PREFIX_CHARS: usize = 500;
404
405/// Compress the **last** message in `messages` in-place when its text
406/// content exceeds [`OVERSIZED_LAST_MESSAGE_RATIO`] of the model's
407/// context window.
408///
409/// This is the buffer-taking core that replaces the former
410/// `compress_user_message_if_oversized` helpers in `prompt.rs` and
411/// `prompt_events.rs`. Those called `session.messages.last_mut()`
412/// destructively on the canonical history; that was the last remaining
413/// mutator that broke the Phase A invariant (*history stays pure*).
414/// The derivation pipeline now calls this helper on a clone, so the
415/// original user text remains in [`Session::messages`] while the LLM
416/// sees the compressed projection.
417///
418/// Behaviour matches the legacy pair:
419///
420/// * When the last message contains ≤ threshold tokens, returns
421/// `Ok(false)` without touching the buffer.
422/// * On successful RLM compression, rewrites the last message's content
423/// to a single text part with an `[Original message: N tokens,
424/// compressed via RLM]` prefix, the compressed body, and the first
425/// [`OVERSIZED_LAST_MESSAGE_PREFIX_CHARS`] characters of the original
426/// as a literal suffix.
427/// * On RLM failure, falls back to [`RlmChunker::compress`].
428///
429/// # Arguments
430///
431/// * `messages` — The message buffer to rewrite in place. A no-op when
432/// empty.
433/// * `ctx` — Session-level configuration snapshot.
434/// * `provider` — Caller's primary provider (used as subcall fallback).
435/// * `model` — Caller's primary model identifier (governs ctx window).
436///
437/// # Returns
438///
439/// `Ok(true)` when the last message was rewritten, `Ok(false)` when no
440/// compression was needed.
441///
442/// # Errors
443///
444/// Never returns `Err` in the current implementation — RLM failures
445/// fall back to chunk-based compression, which is infallible. The
446/// `Result` return shape is preserved for forward-compatibility with
447/// future pipeline wiring.
448pub(crate) async fn compress_last_message_if_oversized(
449 messages: &mut [Message],
450 ctx: &CompressContext,
451 provider: Arc<dyn crate::provider::Provider>,
452 model: &str,
453) -> Result<bool> {
454 let Some(last) = messages.last_mut() else {
455 return Ok(false);
456 };
457
458 let original = extract_message_text(last);
459 if original.is_empty() {
460 return Ok(false);
461 }
462
463 let ctx_window = context_window_for_model(model);
464 let msg_tokens = RlmChunker::estimate_tokens(&original);
465 let threshold = (ctx_window as f64 * OVERSIZED_LAST_MESSAGE_RATIO) as usize;
466 if msg_tokens <= threshold {
467 return Ok(false);
468 }
469
470 tracing::info!(
471 msg_tokens,
472 threshold,
473 ctx_window,
474 "RLM: Last message exceeds context threshold, compressing"
475 );
476
477 let auto_ctx = AutoProcessContext {
478 tool_id: "session_context",
479 tool_args: serde_json::json!({}),
480 session_id: &ctx.session_id,
481 abort: None,
482 on_progress: None,
483 provider: Arc::clone(&provider),
484 model: model.to_string(),
485 bus: None,
486 trace_id: None,
487 subcall_provider: ctx.subcall_provider.clone(),
488 subcall_model: ctx.subcall_model.clone(),
489 };
490
491 let replacement = match RlmRouter::auto_process(&original, auto_ctx, &ctx.rlm_config).await {
492 Ok(result) => {
493 tracing::info!(
494 input_tokens = result.stats.input_tokens,
495 output_tokens = result.stats.output_tokens,
496 "RLM: Last message compressed"
497 );
498 format!(
499 "[Original message: {msg_tokens} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
500 result.processed,
501 original
502 .chars()
503 .take(OVERSIZED_LAST_MESSAGE_PREFIX_CHARS)
504 .collect::<String>()
505 )
506 }
507 Err(e) => {
508 tracing::warn!(error = %e, "RLM: Failed to compress last message, using truncation");
509 let max_chars = threshold * 4;
510 RlmChunker::compress(&original, max_chars / 4, None)
511 }
512 };
513
514 last.content = vec![ContentPart::Text { text: replacement }];
515 Ok(true)
516}
517
518/// Concatenate the textual content of `msg`, skipping non-text parts.
519///
520/// Used by [`compress_last_message_if_oversized`] to recover the string
521/// form of a message without assuming the content vector has exactly one
522/// text part.
523fn extract_message_text(msg: &Message) -> String {
524 let mut buf = String::new();
525 for part in &msg.content {
526 if let ContentPart::Text { text } = part {
527 if !buf.is_empty() {
528 buf.push('\n');
529 }
530 buf.push_str(text);
531 }
532 }
533 buf
534}
535
536/// Back-compat wrapper over [`compress_messages_keep_last`] that operates
537/// on an owning [`Session`]. Bumps [`Session::updated_at`] when the
538/// buffer was rewritten.
539///
540/// Prefer [`compress_messages_keep_last`] for new code — it lets you run
541/// compression on a history clone without mutating the canonical
542/// [`Session::messages`] buffer.
543///
544/// # Errors
545///
546/// Propagates any error from the underlying core.
547pub(crate) async fn compress_history_keep_last(
548 session: &mut Session,
549 provider: Arc<dyn crate::provider::Provider>,
550 model: &str,
551 keep_last: usize,
552 reason: &str,
553) -> Result<bool> {
554 let ctx = CompressContext::from_session(session);
555 let did = compress_messages_keep_last(
556 &mut session.messages,
557 &ctx,
558 provider,
559 model,
560 keep_last,
561 reason,
562 )
563 .await?;
564 if did {
565 session.updated_at = Utc::now();
566 }
567 Ok(did)
568}
569
570/// Ensure the estimated request token cost of `messages` fits within the
571/// model's safety budget, in-place.
572///
573/// This is the `&mut Vec<Message>` core that powers both the legacy
574/// [`enforce_context_window`] wrapper (which takes `&mut Session`) and
575/// the Phase B [`derive_context`](crate::session::context::derive_context)
576/// pipeline (which runs on a history clone and never mutates the
577/// canonical [`Session::messages`] buffer).
578///
579/// # Arguments
580///
581/// * `messages` — The message buffer to rewrite in place.
582/// * `ctx` — Session-level configuration snapshot.
583/// * `provider` — Caller's primary provider.
584/// * `model` — Caller's primary model identifier (governs ctx window).
585/// * `system_prompt` — Current system prompt, included in token estimates.
586/// * `tools` — Tool definitions, included in token estimates.
587/// * `event_tx` — Optional channel for emitting
588/// [`SessionEvent::CompactionStarted`] /
589/// [`SessionEvent::CompactionCompleted`] /
590/// [`SessionEvent::ContextTruncated`] /
591/// [`SessionEvent::CompactionFailed`]. When `None`, compaction runs
592/// silently.
593///
594/// # Errors
595///
596/// Returns `Err` only if an underlying RLM pass errors in a way the
597/// fallback cascade cannot recover from. Terminal truncation itself is
598/// infallible.
599pub(crate) async fn enforce_on_messages(
600 messages: &mut Vec<Message>,
601 ctx: &CompressContext,
602 provider: Arc<dyn crate::provider::Provider>,
603 model: &str,
604 system_prompt: &str,
605 tools: &[ToolDefinition],
606 event_tx: Option<&mpsc::Sender<SessionEvent>>,
607) -> Result<()> {
608 let ctx_window = context_window_for_model(model);
609 let reserve = session_completion_max_tokens().saturating_add(RESERVE_OVERHEAD_TOKENS);
610 let budget = ctx_window.saturating_sub(reserve);
611 let safety_budget = (budget as f64 * SAFETY_BUDGET_RATIO) as usize;
612
613 let initial_est = estimate_request_tokens(system_prompt, messages, tools);
614 let history_trigger = ctx.rlm_config.history_trigger_messages;
615 let history_over = history_trigger > 0 && messages.len() >= history_trigger;
616 if initial_est <= safety_budget && !history_over {
617 return Ok(());
618 }
619
620 let trigger_reason = if history_over && initial_est <= safety_budget {
621 "history_length"
622 } else {
623 "context_budget"
624 };
625
626 let trace_id = Uuid::new_v4();
627 emit(
628 event_tx,
629 SessionEvent::CompactionStarted(CompactionStart {
630 trace_id,
631 reason: trigger_reason.to_string(),
632 before_tokens: initial_est,
633 budget: safety_budget,
634 }),
635 )
636 .await;
637
638 for keep_last in KEEP_LAST_CANDIDATES {
639 let est = estimate_request_tokens(system_prompt, messages, tools);
640 let still_long =
641 history_trigger > 0 && messages.len() > history_trigger.saturating_sub(keep_last);
642 if est <= safety_budget && !still_long {
643 emit(
644 event_tx,
645 SessionEvent::CompactionCompleted(CompactionOutcome {
646 trace_id,
647 strategy: FallbackStrategy::Rlm,
648 before_tokens: initial_est,
649 after_tokens: est,
650 kept_messages: messages.len(),
651 }),
652 )
653 .await;
654 return Ok(());
655 }
656
657 tracing::info!(
658 est_tokens = est,
659 ctx_window,
660 safety_budget,
661 keep_last,
662 "Context window approaching limit; compressing older session history"
663 );
664
665 let did = compress_messages_keep_last(
666 messages,
667 ctx,
668 Arc::clone(&provider),
669 model,
670 keep_last,
671 trigger_reason,
672 )
673 .await?;
674
675 if !did {
676 break;
677 }
678 }
679
680 // Re-estimate one last time after the final RLM pass.
681 let last_est = estimate_request_tokens(system_prompt, messages, tools);
682 if last_est <= safety_budget {
683 emit(
684 event_tx,
685 SessionEvent::CompactionCompleted(CompactionOutcome {
686 trace_id,
687 strategy: FallbackStrategy::Rlm,
688 before_tokens: initial_est,
689 after_tokens: last_est,
690 kept_messages: messages.len(),
691 }),
692 )
693 .await;
694 return Ok(());
695 }
696
697 // Every RLM / chunk attempt still leaves us over budget.
698 // Apply terminal truncation as the last-resort fallback.
699 let dropped_tokens =
700 terminal_truncate_messages(messages, system_prompt, tools, TRUNCATE_KEEP_LAST);
701 let after_tokens = estimate_request_tokens(system_prompt, messages, tools);
702
703 tracing::warn!(
704 before_tokens = initial_est,
705 after_tokens,
706 dropped_tokens,
707 kept_messages = messages.len(),
708 safety_budget,
709 "All RLM compaction attempts exhausted; applied terminal truncation fallback"
710 );
711
712 emit(
713 event_tx,
714 SessionEvent::ContextTruncated(ContextTruncation {
715 trace_id,
716 dropped_tokens,
717 kept_messages: messages.len(),
718 archive_ref: None,
719 }),
720 )
721 .await;
722 emit(
723 event_tx,
724 SessionEvent::CompactionCompleted(CompactionOutcome {
725 trace_id,
726 strategy: FallbackStrategy::Truncate,
727 before_tokens: initial_est,
728 after_tokens,
729 kept_messages: messages.len(),
730 }),
731 )
732 .await;
733
734 if after_tokens > safety_budget {
735 tracing::error!(
736 after_tokens,
737 safety_budget,
738 "Terminal truncation still over budget; request will likely fail at the provider"
739 );
740 emit(
741 event_tx,
742 SessionEvent::CompactionFailed(CompactionFailure {
743 trace_id,
744 fell_back_to: Some(FallbackStrategy::Truncate),
745 reason: "terminal truncation still exceeds safety budget".to_string(),
746 after_tokens,
747 budget: safety_budget,
748 }),
749 )
750 .await;
751 }
752
753 Ok(())
754}
755
756/// Back-compat wrapper over [`enforce_on_messages`] that operates on an
757/// owning [`Session`]. Bumps [`Session::updated_at`] when the buffer was
758/// rewritten.
759///
760/// Prefer [`enforce_on_messages`] for new code — it lets you run
761/// context-window enforcement on a history clone without mutating the
762/// canonical [`Session::messages`] buffer (the Phase A history/context
763/// split).
764///
765/// # Errors
766///
767/// Propagates any error from the underlying core.
768pub(crate) async fn enforce_context_window(
769 session: &mut Session,
770 provider: Arc<dyn crate::provider::Provider>,
771 model: &str,
772 system_prompt: &str,
773 tools: &[ToolDefinition],
774 event_tx: Option<&mpsc::Sender<SessionEvent>>,
775) -> Result<()> {
776 let ctx = CompressContext::from_session(session);
777 let before_len = session.messages.len();
778 enforce_on_messages(
779 &mut session.messages,
780 &ctx,
781 provider,
782 model,
783 system_prompt,
784 tools,
785 event_tx,
786 )
787 .await?;
788 if session.messages.len() != before_len {
789 session.updated_at = Utc::now();
790 }
791 Ok(())
792}
793
794/// Drop everything older than the last `keep_last` messages in
795/// `messages` and return an approximate count of the tokens removed.
796///
797/// Unlike [`compress_messages_keep_last`] this keeps **no** summary of
798/// the dropped prefix — the caller is expected to have already attempted
799/// RLM-based compaction and exhausted it. A `[CONTEXT TRUNCATED]`
800/// assistant marker is prepended so the model is aware that older turns
801/// were silently removed.
802///
803/// This is the `&mut Vec<Message>` core for terminal truncation. It is
804/// called from [`enforce_on_messages`] as the last-resort fallback when
805/// every RLM / chunk attempt has left the buffer over budget.
806///
807/// # Arguments
808///
809/// * `messages` — The message buffer to rewrite in place.
810/// * `system_prompt` — Included in the before/after token estimates so
811/// the caller sees the net effect on request size, not just message
812/// count.
813/// * `tools` — Tool definitions, included in token estimates.
814/// * `keep_last` — Number of most-recent messages to retain.
815///
816/// # Returns
817///
818/// An approximate count of tokens removed (saturating subtraction).
819/// Returns `0` when `messages.len() <= keep_last` and no work is done.
820pub(crate) fn terminal_truncate_messages(
821 messages: &mut Vec<Message>,
822 system_prompt: &str,
823 tools: &[ToolDefinition],
824 keep_last: usize,
825) -> usize {
826 if messages.len() <= keep_last {
827 return 0;
828 }
829
830 let before = estimate_request_tokens(system_prompt, messages, tools);
831 let split_idx = messages.len().saturating_sub(keep_last);
832 let tail = messages.split_off(split_idx);
833
834 let marker = Message {
835 role: Role::Assistant,
836 content: vec![ContentPart::Text {
837 text: "[CONTEXT TRUNCATED]\nOlder conversation was dropped to keep the request \
838 under the model's context window. Ask for details to recall anything \
839 specific."
840 .to_string(),
841 }],
842 };
843
844 let mut new_messages = Vec::with_capacity(1 + tail.len());
845 new_messages.push(marker);
846 new_messages.extend(tail);
847 *messages = new_messages;
848
849 let after = estimate_request_tokens(system_prompt, messages, tools);
850 before.saturating_sub(after)
851}
852
853async fn emit(event_tx: Option<&mpsc::Sender<SessionEvent>>, ev: SessionEvent) {
854 if let Some(tx) = event_tx {
855 let _ = tx.send(ev).await;
856 }
857}
858
859#[cfg(test)]
860mod tests {
861 use super::*;
862
863 fn user(text: &str) -> Message {
864 Message {
865 role: Role::User,
866 content: vec![ContentPart::Text {
867 text: text.to_string(),
868 }],
869 }
870 }
871
872 #[test]
873 fn terminal_truncate_noop_when_short_enough() {
874 let mut messages = vec![user("a"), user("b")];
875 let before_len = messages.len();
876 let dropped = terminal_truncate_messages(&mut messages, "", &[], 4);
877 assert_eq!(dropped, 0);
878 assert_eq!(messages.len(), before_len);
879 }
880
881 #[test]
882 fn terminal_truncate_drops_prefix_and_prepends_marker() {
883 let mut messages: Vec<Message> = (0..10).map(|i| user(&format!("msg-{i}"))).collect();
884 let _ = terminal_truncate_messages(&mut messages, "", &[], 3);
885
886 // 1 synthetic marker + 3 most-recent messages.
887 assert_eq!(messages.len(), 4);
888
889 // First message is the [CONTEXT TRUNCATED] assistant marker.
890 assert!(matches!(messages[0].role, Role::Assistant));
891 if let ContentPart::Text { text } = &messages[0].content[0] {
892 assert!(text.starts_with("[CONTEXT TRUNCATED]"));
893 } else {
894 panic!("expected text content on the synthetic marker");
895 }
896
897 // Remaining three messages are the most-recent user messages,
898 // preserved verbatim and in their original order.
899 let kept_texts: Vec<&str> = messages[1..]
900 .iter()
901 .map(|m| match &m.content[0] {
902 ContentPart::Text { text } => text.as_str(),
903 _ => panic!("expected text content"),
904 })
905 .collect();
906 assert_eq!(kept_texts, vec!["msg-7", "msg-8", "msg-9"]);
907 }
908
909 #[test]
910 fn compress_context_from_session_snapshot_is_independent() {
911 // Changing the session's metadata after snapshotting must not
912 // leak through the captured CompressContext. This lets the
913 // Phase B derive_context pipeline hold a CompressContext
914 // alongside a `&mut Vec<Message>` borrowed from the same
915 // Session without running into borrow conflicts.
916 let session = Session {
917 id: "session-42".to_string(),
918 title: None,
919 created_at: chrono::Utc::now(),
920 updated_at: chrono::Utc::now(),
921 metadata: Default::default(),
922 agent: "test".to_string(),
923 messages: Vec::new(),
924 tool_uses: Vec::new(),
925 usage: Default::default(),
926 max_steps: None,
927 bus: None,
928 };
929 let snapshot = CompressContext::from_session(&session);
930 assert_eq!(snapshot.session_id, "session-42");
931 assert!(snapshot.subcall_provider.is_none());
932 assert!(snapshot.subcall_model.is_none());
933 }
934}