codetether_agent/session/helper/
compression.rs1use std::sync::Arc;
38
39use anyhow::Result;
40use chrono::Utc;
41use tokio::sync::mpsc;
42use uuid::Uuid;
43
44use crate::provider::{ContentPart, Message, Role, ToolDefinition};
45use crate::rlm::router::AutoProcessContext;
46use crate::rlm::{RlmChunker, RlmRouter};
47
48use super::error::messages_to_rlm_context;
49use super::token::{
50 context_window_for_model, estimate_request_tokens, session_completion_max_tokens,
51};
52use crate::session::event_compaction::{
53 CompactionFailure, CompactionOutcome, CompactionStart, ContextTruncation, FallbackStrategy,
54};
55use crate::session::{Session, SessionEvent};
56
57const KEEP_LAST_CANDIDATES: [usize; 4] = [16, 12, 8, 6];
59
60const TRUNCATE_KEEP_LAST: usize = 4;
66
67const SAFETY_BUDGET_RATIO: f64 = 0.90;
69
70const RESERVE_OVERHEAD_TOKENS: usize = 2048;
73
74const FALLBACK_CHUNK_RATIO: f64 = 0.25;
76
77pub(crate) async fn compress_history_keep_last(
83 session: &mut Session,
84 provider: Arc<dyn crate::provider::Provider>,
85 model: &str,
86 keep_last: usize,
87 reason: &str,
88) -> Result<bool> {
89 if session.messages.len() <= keep_last {
90 return Ok(false);
91 }
92
93 let split_idx = session.messages.len().saturating_sub(keep_last);
94 let tail = session.messages.split_off(split_idx);
95 let prefix = std::mem::take(&mut session.messages);
96
97 let context = messages_to_rlm_context(&prefix);
98 let ctx_window = context_window_for_model(model);
99
100 let rlm_config = session.metadata.rlm.clone();
101 let auto_ctx = AutoProcessContext {
102 tool_id: "session_context",
103 tool_args: serde_json::json!({"reason": reason}),
104 session_id: &session.id,
105 abort: None,
106 on_progress: None,
107 provider,
108 model: model.to_string(),
109 bus: None,
110 trace_id: None,
111 subcall_provider: session.metadata.subcall_provider.clone(),
112 subcall_model: session.metadata.subcall_model_name.clone(),
113 };
114
115 let summary = match RlmRouter::auto_process(&context, auto_ctx, &rlm_config).await {
116 Ok(result) => {
117 tracing::info!(
118 reason,
119 input_tokens = result.stats.input_tokens,
120 output_tokens = result.stats.output_tokens,
121 compression_ratio = result.stats.compression_ratio,
122 "RLM: Compressed session history"
123 );
124 result.processed
125 }
126 Err(e) => {
127 tracing::warn!(
128 reason,
129 error = %e,
130 "RLM: Failed to compress session history; falling back to chunk compression"
131 );
132 RlmChunker::compress(
133 &context,
134 (ctx_window as f64 * FALLBACK_CHUNK_RATIO) as usize,
135 None,
136 )
137 }
138 };
139
140 let summary_msg = Message {
141 role: Role::Assistant,
142 content: vec![ContentPart::Text {
143 text: format!(
144 "[AUTO CONTEXT COMPRESSION]\nOlder conversation + tool output was compressed \
145 to fit the model context window.\n\n{summary}"
146 ),
147 }],
148 };
149
150 let mut new_messages = Vec::with_capacity(1 + tail.len());
151 new_messages.push(summary_msg);
152 new_messages.extend(tail);
153 session.messages = new_messages;
154 session.updated_at = Utc::now();
155
156 Ok(true)
157}
158
159pub(crate) async fn enforce_context_window(
184 session: &mut Session,
185 provider: Arc<dyn crate::provider::Provider>,
186 model: &str,
187 system_prompt: &str,
188 tools: &[ToolDefinition],
189 event_tx: Option<&mpsc::Sender<SessionEvent>>,
190) -> Result<()> {
191 let ctx_window = context_window_for_model(model);
192 let reserve = session_completion_max_tokens().saturating_add(RESERVE_OVERHEAD_TOKENS);
193 let budget = ctx_window.saturating_sub(reserve);
194 let safety_budget = (budget as f64 * SAFETY_BUDGET_RATIO) as usize;
195
196 let initial_est = estimate_request_tokens(system_prompt, &session.messages, tools);
197 if initial_est <= safety_budget {
198 return Ok(());
199 }
200
201 let trace_id = Uuid::new_v4();
202 emit(
203 event_tx,
204 SessionEvent::CompactionStarted(CompactionStart {
205 trace_id,
206 reason: "context_budget".to_string(),
207 before_tokens: initial_est,
208 budget: safety_budget,
209 }),
210 )
211 .await;
212
213 for keep_last in KEEP_LAST_CANDIDATES {
214 let est = estimate_request_tokens(system_prompt, &session.messages, tools);
215 if est <= safety_budget {
216 emit(
217 event_tx,
218 SessionEvent::CompactionCompleted(CompactionOutcome {
219 trace_id,
220 strategy: FallbackStrategy::Rlm,
221 before_tokens: initial_est,
222 after_tokens: est,
223 kept_messages: session.messages.len(),
224 }),
225 )
226 .await;
227 return Ok(());
228 }
229
230 tracing::info!(
231 est_tokens = est,
232 ctx_window,
233 safety_budget,
234 keep_last,
235 "Context window approaching limit; compressing older session history"
236 );
237
238 let did = compress_history_keep_last(
239 session,
240 Arc::clone(&provider),
241 model,
242 keep_last,
243 "context_budget",
244 )
245 .await?;
246
247 if !did {
248 break;
249 }
250 }
251
252 let last_est = estimate_request_tokens(system_prompt, &session.messages, tools);
254 if last_est <= safety_budget {
255 emit(
256 event_tx,
257 SessionEvent::CompactionCompleted(CompactionOutcome {
258 trace_id,
259 strategy: FallbackStrategy::Rlm,
260 before_tokens: initial_est,
261 after_tokens: last_est,
262 kept_messages: session.messages.len(),
263 }),
264 )
265 .await;
266 return Ok(());
267 }
268
269 let dropped_tokens =
272 terminal_truncate_history(session, system_prompt, tools, TRUNCATE_KEEP_LAST);
273 let after_tokens = estimate_request_tokens(system_prompt, &session.messages, tools);
274
275 tracing::warn!(
276 before_tokens = initial_est,
277 after_tokens,
278 dropped_tokens,
279 kept_messages = session.messages.len(),
280 safety_budget,
281 "All RLM compaction attempts exhausted; applied terminal truncation fallback"
282 );
283
284 emit(
285 event_tx,
286 SessionEvent::ContextTruncated(ContextTruncation {
287 trace_id,
288 dropped_tokens,
289 kept_messages: session.messages.len(),
290 archive_ref: None,
291 }),
292 )
293 .await;
294 emit(
295 event_tx,
296 SessionEvent::CompactionCompleted(CompactionOutcome {
297 trace_id,
298 strategy: FallbackStrategy::Truncate,
299 before_tokens: initial_est,
300 after_tokens,
301 kept_messages: session.messages.len(),
302 }),
303 )
304 .await;
305
306 if after_tokens > safety_budget {
307 tracing::error!(
308 after_tokens,
309 safety_budget,
310 "Terminal truncation still over budget; request will likely fail at the provider"
311 );
312 emit(
313 event_tx,
314 SessionEvent::CompactionFailed(CompactionFailure {
315 trace_id,
316 fell_back_to: Some(FallbackStrategy::Truncate),
317 reason: "terminal truncation still exceeds safety budget".to_string(),
318 after_tokens,
319 budget: safety_budget,
320 }),
321 )
322 .await;
323 }
324
325 Ok(())
326}
327
328fn terminal_truncate_history(
337 session: &mut Session,
338 system_prompt: &str,
339 tools: &[ToolDefinition],
340 keep_last: usize,
341) -> usize {
342 if session.messages.len() <= keep_last {
343 return 0;
344 }
345
346 let before = estimate_request_tokens(system_prompt, &session.messages, tools);
347 let split_idx = session.messages.len().saturating_sub(keep_last);
348 let tail = session.messages.split_off(split_idx);
349
350 let marker = Message {
351 role: Role::Assistant,
352 content: vec![ContentPart::Text {
353 text: "[CONTEXT TRUNCATED]\nOlder conversation was dropped to keep the request \
354 under the model's context window. Ask for details to recall anything \
355 specific."
356 .to_string(),
357 }],
358 };
359
360 let mut new_messages = Vec::with_capacity(1 + tail.len());
361 new_messages.push(marker);
362 new_messages.extend(tail);
363 session.messages = new_messages;
364 session.updated_at = Utc::now();
365
366 let after = estimate_request_tokens(system_prompt, &session.messages, tools);
367 before.saturating_sub(after)
368}
369
370async fn emit(event_tx: Option<&mpsc::Sender<SessionEvent>>, ev: SessionEvent) {
371 if let Some(tx) = event_tx {
372 let _ = tx.send(ev).await;
373 }
374}