Skip to main content

codetether_agent/session/helper/
compression.rs

1//! Session history compression via the RLM router.
2//!
3//! This module contains the context-window enforcement logic that keeps
4//! the prompt under the model's token budget. It is invoked automatically
5//! at the start of every agent step by [`Session::run_loop`](crate::session::Session).
6//!
7//! ## Strategy
8//!
9//! 1. Estimate the current request token cost (system + messages + tools).
10//! 2. If it exceeds 90% of the model's usable budget, compress the prefix
11//!    of the conversation via [`RlmRouter::auto_process`], keeping the
12//!    most recent `keep_last` messages verbatim.
13//! 3. Progressively shrink `keep_last` (16 → 12 → 8 → 6) until the budget
14//!    is met or nothing more can be compressed.
15//!
16//! The compressed prefix is replaced by a single synthetic assistant
17//! message tagged `[AUTO CONTEXT COMPRESSION]` so the model sees a
18//! coherent summary rather than a truncated tail.
19//!
20//! ## Fallback decision table
21//!
22//! ```text
23//! ┌────────────────────────────────────┬─────────────────────────────────────┬────────────────────────────────────┐
24//! │ State after attempt                │ Action                              │ Events emitted                     │
25//! ├────────────────────────────────────┼─────────────────────────────────────┼────────────────────────────────────┤
26//! │ RLM keep_last ∈ {16,12,8,6} fits   │ Stop; request is ready              │ CompactionStarted → Completed(Rlm) │
27//! │ RLM auto_process errors on prefix  │ Fall back to chunk compression      │ (internal; logged via tracing)     │
28//! │ All 4 keep_last values exhausted   │ Apply terminal truncation           │ Completed(Truncate) + Truncated    │
29//! │ Terminal truncation still over bud │ Surface error to caller             │ Failed(fell_back_to = Truncate)    │
30//! └────────────────────────────────────┴─────────────────────────────────────┴────────────────────────────────────┘
31//! ```
32//!
33//! Terminal truncation drops the oldest messages outright (no summary)
34//! and is deliberately a distinct event from `CompactionCompleted` so
35//! consumers can warn the user about silent context loss.
36
37use std::sync::Arc;
38
39use anyhow::Result;
40use chrono::Utc;
41use tokio::sync::mpsc;
42use uuid::Uuid;
43
44use crate::provider::{ContentPart, Message, Role, ToolDefinition};
45use crate::rlm::router::AutoProcessContext;
46use crate::rlm::{RlmChunker, RlmRouter};
47
48use super::error::messages_to_rlm_context;
49use super::token::{
50    context_window_for_model, estimate_request_tokens, session_completion_max_tokens,
51};
52use crate::session::event_compaction::{
53    CompactionFailure, CompactionOutcome, CompactionStart, ContextTruncation, FallbackStrategy,
54};
55use crate::session::{Session, SessionEvent};
56
57/// Progressively smaller `keep_last` values tried by [`enforce_context_window`].
58const KEEP_LAST_CANDIDATES: [usize; 4] = [16, 12, 8, 6];
59
60/// Number of most-recent messages retained by the terminal truncation
61/// fallback when every RLM compaction attempt has failed to bring the
62/// request under budget. Must be small enough to leave room for the
63/// active turn, but large enough to keep the model's immediate context
64/// coherent.
65const TRUNCATE_KEEP_LAST: usize = 4;
66
67/// Fraction of the usable budget we target after compression.
68const SAFETY_BUDGET_RATIO: f64 = 0.90;
69
70/// Reserve (in tokens) added on top of `session_completion_max_tokens()` for
71/// tool schemas, protocol framing, and provider-specific wrappers.
72const RESERVE_OVERHEAD_TOKENS: usize = 2048;
73
74/// Fallback chunk-compression target size as a fraction of the ctx window.
75const FALLBACK_CHUNK_RATIO: f64 = 0.25;
76
77/// Compress all messages older than the last `keep_last` into a single
78/// synthetic `[AUTO CONTEXT COMPRESSION]` assistant message.
79///
80/// Returns `Ok(true)` if compression ran, `Ok(false)` if the session was
81/// already short enough to skip.
82pub(crate) async fn compress_history_keep_last(
83    session: &mut Session,
84    provider: Arc<dyn crate::provider::Provider>,
85    model: &str,
86    keep_last: usize,
87    reason: &str,
88) -> Result<bool> {
89    if session.messages.len() <= keep_last {
90        return Ok(false);
91    }
92
93    let split_idx = session.messages.len().saturating_sub(keep_last);
94    let tail = session.messages.split_off(split_idx);
95    let prefix = std::mem::take(&mut session.messages);
96
97    let context = messages_to_rlm_context(&prefix);
98    let ctx_window = context_window_for_model(model);
99
100    let rlm_config = session.metadata.rlm.clone();
101    let auto_ctx = AutoProcessContext {
102        tool_id: "session_context",
103        tool_args: serde_json::json!({"reason": reason}),
104        session_id: &session.id,
105        abort: None,
106        on_progress: None,
107        provider,
108        model: model.to_string(),
109        bus: None,
110        trace_id: None,
111        subcall_provider: session.metadata.subcall_provider.clone(),
112        subcall_model: session.metadata.subcall_model_name.clone(),
113    };
114
115    let summary = match RlmRouter::auto_process(&context, auto_ctx, &rlm_config).await {
116        Ok(result) => {
117            tracing::info!(
118                reason,
119                input_tokens = result.stats.input_tokens,
120                output_tokens = result.stats.output_tokens,
121                compression_ratio = result.stats.compression_ratio,
122                "RLM: Compressed session history"
123            );
124            result.processed
125        }
126        Err(e) => {
127            tracing::warn!(
128                reason,
129                error = %e,
130                "RLM: Failed to compress session history; falling back to chunk compression"
131            );
132            RlmChunker::compress(
133                &context,
134                (ctx_window as f64 * FALLBACK_CHUNK_RATIO) as usize,
135                None,
136            )
137        }
138    };
139
140    let summary_msg = Message {
141        role: Role::Assistant,
142        content: vec![ContentPart::Text {
143            text: format!(
144                "[AUTO CONTEXT COMPRESSION]\nOlder conversation + tool output was compressed \
145                 to fit the model context window.\n\n{summary}"
146            ),
147        }],
148    };
149
150    let mut new_messages = Vec::with_capacity(1 + tail.len());
151    new_messages.push(summary_msg);
152    new_messages.extend(tail);
153    session.messages = new_messages;
154    session.updated_at = Utc::now();
155
156    Ok(true)
157}
158
159/// Ensure the estimated request token cost fits within the model's safety budget.
160///
161/// Invokes [`compress_history_keep_last`] with progressively smaller
162/// `keep_last` values until the estimate is under budget. When every
163/// RLM attempt leaves the request over budget, falls back to
164/// [`terminal_truncate_history`] which drops the oldest messages
165/// outright.
166///
167/// When `event_tx` is `Some`, the full compaction lifecycle is emitted
168/// as [`SessionEvent`] records ([`SessionEvent::CompactionStarted`],
169/// [`SessionEvent::CompactionCompleted`],
170/// [`SessionEvent::ContextTruncated`], [`SessionEvent::CompactionFailed`]).
171/// When `None`, compaction runs silently — preserving the historical
172/// behaviour of the non-event prompt path.
173///
174/// # Errors
175///
176/// Returns `Err` only if an underlying RLM pass errors in a way the
177/// fallback cascade cannot recover from; the terminal truncation path
178/// itself is infallible (it cannot produce an over-budget request
179/// unless `TRUNCATE_KEEP_LAST` messages alone exceed the budget, in
180/// which case a [`CompactionFailure`] event is emitted and `Ok(())` is
181/// still returned so the caller can surface a clean error from the
182/// provider round-trip).
183pub(crate) async fn enforce_context_window(
184    session: &mut Session,
185    provider: Arc<dyn crate::provider::Provider>,
186    model: &str,
187    system_prompt: &str,
188    tools: &[ToolDefinition],
189    event_tx: Option<&mpsc::Sender<SessionEvent>>,
190) -> Result<()> {
191    let ctx_window = context_window_for_model(model);
192    let reserve = session_completion_max_tokens().saturating_add(RESERVE_OVERHEAD_TOKENS);
193    let budget = ctx_window.saturating_sub(reserve);
194    let safety_budget = (budget as f64 * SAFETY_BUDGET_RATIO) as usize;
195
196    let initial_est = estimate_request_tokens(system_prompt, &session.messages, tools);
197    if initial_est <= safety_budget {
198        return Ok(());
199    }
200
201    let trace_id = Uuid::new_v4();
202    emit(
203        event_tx,
204        SessionEvent::CompactionStarted(CompactionStart {
205            trace_id,
206            reason: "context_budget".to_string(),
207            before_tokens: initial_est,
208            budget: safety_budget,
209        }),
210    )
211    .await;
212
213    for keep_last in KEEP_LAST_CANDIDATES {
214        let est = estimate_request_tokens(system_prompt, &session.messages, tools);
215        if est <= safety_budget {
216            emit(
217                event_tx,
218                SessionEvent::CompactionCompleted(CompactionOutcome {
219                    trace_id,
220                    strategy: FallbackStrategy::Rlm,
221                    before_tokens: initial_est,
222                    after_tokens: est,
223                    kept_messages: session.messages.len(),
224                }),
225            )
226            .await;
227            return Ok(());
228        }
229
230        tracing::info!(
231            est_tokens = est,
232            ctx_window,
233            safety_budget,
234            keep_last,
235            "Context window approaching limit; compressing older session history"
236        );
237
238        let did = compress_history_keep_last(
239            session,
240            Arc::clone(&provider),
241            model,
242            keep_last,
243            "context_budget",
244        )
245        .await?;
246
247        if !did {
248            break;
249        }
250    }
251
252    // Re-estimate one last time after the final RLM pass.
253    let last_est = estimate_request_tokens(system_prompt, &session.messages, tools);
254    if last_est <= safety_budget {
255        emit(
256            event_tx,
257            SessionEvent::CompactionCompleted(CompactionOutcome {
258                trace_id,
259                strategy: FallbackStrategy::Rlm,
260                before_tokens: initial_est,
261                after_tokens: last_est,
262                kept_messages: session.messages.len(),
263            }),
264        )
265        .await;
266        return Ok(());
267    }
268
269    // Every RLM / chunk attempt still leaves us over budget.
270    // Apply terminal truncation as the last-resort fallback.
271    let dropped_tokens =
272        terminal_truncate_history(session, system_prompt, tools, TRUNCATE_KEEP_LAST);
273    let after_tokens = estimate_request_tokens(system_prompt, &session.messages, tools);
274
275    tracing::warn!(
276        before_tokens = initial_est,
277        after_tokens,
278        dropped_tokens,
279        kept_messages = session.messages.len(),
280        safety_budget,
281        "All RLM compaction attempts exhausted; applied terminal truncation fallback"
282    );
283
284    emit(
285        event_tx,
286        SessionEvent::ContextTruncated(ContextTruncation {
287            trace_id,
288            dropped_tokens,
289            kept_messages: session.messages.len(),
290            archive_ref: None,
291        }),
292    )
293    .await;
294    emit(
295        event_tx,
296        SessionEvent::CompactionCompleted(CompactionOutcome {
297            trace_id,
298            strategy: FallbackStrategy::Truncate,
299            before_tokens: initial_est,
300            after_tokens,
301            kept_messages: session.messages.len(),
302        }),
303    )
304    .await;
305
306    if after_tokens > safety_budget {
307        tracing::error!(
308            after_tokens,
309            safety_budget,
310            "Terminal truncation still over budget; request will likely fail at the provider"
311        );
312        emit(
313            event_tx,
314            SessionEvent::CompactionFailed(CompactionFailure {
315                trace_id,
316                fell_back_to: Some(FallbackStrategy::Truncate),
317                reason: "terminal truncation still exceeds safety budget".to_string(),
318                after_tokens,
319                budget: safety_budget,
320            }),
321        )
322        .await;
323    }
324
325    Ok(())
326}
327
328/// Drop everything older than the last `keep_last` messages and return an
329/// approximate count of the tokens removed.
330///
331/// Unlike [`compress_history_keep_last`] this keeps **no** summary of the
332/// dropped prefix — the caller is expected to have already attempted
333/// RLM-based compaction and exhausted it. A `[CONTEXT TRUNCATED]`
334/// assistant marker is prepended so the model is aware that older turns
335/// were silently removed.
336fn terminal_truncate_history(
337    session: &mut Session,
338    system_prompt: &str,
339    tools: &[ToolDefinition],
340    keep_last: usize,
341) -> usize {
342    if session.messages.len() <= keep_last {
343        return 0;
344    }
345
346    let before = estimate_request_tokens(system_prompt, &session.messages, tools);
347    let split_idx = session.messages.len().saturating_sub(keep_last);
348    let tail = session.messages.split_off(split_idx);
349
350    let marker = Message {
351        role: Role::Assistant,
352        content: vec![ContentPart::Text {
353            text: "[CONTEXT TRUNCATED]\nOlder conversation was dropped to keep the request \
354                   under the model's context window. Ask for details to recall anything \
355                   specific."
356                .to_string(),
357        }],
358    };
359
360    let mut new_messages = Vec::with_capacity(1 + tail.len());
361    new_messages.push(marker);
362    new_messages.extend(tail);
363    session.messages = new_messages;
364    session.updated_at = Utc::now();
365
366    let after = estimate_request_tokens(system_prompt, &session.messages, tools);
367    before.saturating_sub(after)
368}
369
370async fn emit(event_tx: Option<&mpsc::Sender<SessionEvent>>, ev: SessionEvent) {
371    if let Some(tx) = event_tx {
372        let _ = tx.send(ev).await;
373    }
374}