use std::sync::Arc;
use anyhow::Result;
use chrono::Utc;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::provider::{ContentPart, Message, Role, ToolDefinition};
use crate::rlm::router::AutoProcessContext;
use crate::rlm::{RlmChunker, RlmRouter};
use super::error::messages_to_rlm_context;
use super::token::{
context_window_for_model, estimate_request_tokens, session_completion_max_tokens,
};
use crate::session::event_compaction::{
CompactionFailure, CompactionOutcome, CompactionStart, ContextTruncation, FallbackStrategy,
};
use crate::session::{Session, SessionEvent};
const KEEP_LAST_CANDIDATES: [usize; 4] = [16, 12, 8, 6];
const TRUNCATE_KEEP_LAST: usize = 4;
const SAFETY_BUDGET_RATIO: f64 = 0.90;
const RESERVE_OVERHEAD_TOKENS: usize = 2048;
const FALLBACK_CHUNK_RATIO: f64 = 0.25;
pub const DEFAULT_RLM_MODEL: &str = "zai/glm-5.1";
fn resolve_rlm_model(rlm_config: &crate::rlm::RlmConfig) -> Option<String> {
if let Some(m) = rlm_config.root_model.as_ref() {
return Some(m.clone());
}
if let Ok(env_model) = std::env::var("CODETETHER_RLM_MODEL")
&& !env_model.trim().is_empty()
{
return Some(env_model);
}
Some(DEFAULT_RLM_MODEL.to_string())
}
const RLM_MODEL_CANDIDATES: &[&str] = &[
"zai/glm-5.1",
"glm5/glm-5",
"openrouter/openai/gpt-oss-120b:free",
];
pub(crate) fn resolve_rlm_model_bandit(
rlm_config: &crate::rlm::RlmConfig,
state: &crate::session::delegation::DelegationState,
bucket: crate::session::relevance::Bucket,
) -> Option<String> {
if !state.config.enabled {
return resolve_rlm_model(rlm_config);
}
if let Some(m) = rlm_config.root_model.as_ref() {
return Some(m.clone());
}
if let Ok(env_model) = std::env::var("CODETETHER_RLM_MODEL")
&& !env_model.trim().is_empty()
{
return Some(env_model);
}
let mut best: Option<(&str, f64)> = None;
for candidate in RLM_MODEL_CANDIDATES {
let score = state
.score(
candidate,
crate::session::delegation_skills::RLM_COMPACT,
bucket,
)
.unwrap_or(0.0);
match best {
Some((_, current)) if current >= score => {}
_ => best = Some((candidate, score)),
}
}
best.map(|(m, _)| m.to_string())
.or_else(|| Some(DEFAULT_RLM_MODEL.to_string()))
}
#[derive(Clone)]
pub(crate) struct CompressContext {
pub rlm_config: crate::rlm::RlmConfig,
pub session_id: String,
pub subcall_provider: Option<Arc<dyn crate::provider::Provider>>,
pub subcall_model: Option<String>,
}
impl CompressContext {
pub(crate) fn from_session(session: &Session) -> Self {
Self {
rlm_config: session.metadata.rlm.clone(),
session_id: session.id.clone(),
subcall_provider: session.metadata.subcall_provider.clone(),
subcall_model: session.metadata.subcall_model_name.clone(),
}
}
}
pub(crate) async fn compress_messages_keep_last(
messages: &mut Vec<Message>,
ctx: &CompressContext,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
keep_last: usize,
reason: &str,
) -> Result<bool> {
if messages.len() <= keep_last {
return Ok(false);
}
let split_idx = messages.len().saturating_sub(keep_last);
let tail = messages.split_off(split_idx);
let prefix = std::mem::take(messages);
let context = messages_to_rlm_context(&prefix);
let ctx_window = context_window_for_model(model);
let (rlm_provider, rlm_model) = match resolve_rlm_model(&ctx.rlm_config) {
Some(target_model) if target_model != model => {
match crate::provider::ProviderRegistry::shared_from_vault().await {
Ok(registry) => match registry.resolve_model(&target_model) {
Ok((p, m)) => {
tracing::info!(
rlm_model = %m,
caller_model = %model,
"RLM: using cheap dedicated model for compression"
);
(p, m)
}
Err(err) => {
tracing::warn!(
rlm_model = %target_model,
error = %err,
"RLM: dedicated model resolution failed; falling back to caller model"
);
(Arc::clone(&provider), model.to_string())
}
},
Err(err) => {
tracing::warn!(
error = %err,
"RLM: provider registry unavailable; falling back to caller model"
);
(Arc::clone(&provider), model.to_string())
}
}
}
_ => (Arc::clone(&provider), model.to_string()),
};
let auto_ctx = AutoProcessContext {
tool_id: "session_context",
tool_args: serde_json::json!({"reason": reason}),
session_id: &ctx.session_id,
abort: None,
on_progress: None,
provider: rlm_provider,
model: rlm_model.clone(),
bus: None,
trace_id: None,
subcall_provider: ctx.subcall_provider.clone(),
subcall_model: ctx.subcall_model.clone(),
};
let summary = match RlmRouter::auto_process(&context, auto_ctx, &ctx.rlm_config).await {
Ok(result) => {
tracing::info!(
reason,
rlm_model = %rlm_model,
input_tokens = result.stats.input_tokens,
output_tokens = result.stats.output_tokens,
compression_ratio = result.stats.compression_ratio,
"RLM: Compressed session history"
);
crate::telemetry::TOKEN_USAGE.record_model_usage(
&rlm_model,
result.stats.input_tokens as u64,
result.stats.output_tokens as u64,
);
result.processed
}
Err(e) => {
tracing::warn!(
reason,
error = %e,
"RLM: Failed to compress session history; falling back to chunk compression"
);
RlmChunker::compress(
&context,
(ctx_window as f64 * FALLBACK_CHUNK_RATIO) as usize,
None,
)
}
};
let summary_msg = Message {
role: Role::Assistant,
content: vec![ContentPart::Text {
text: format!(
"[AUTO CONTEXT COMPRESSION]\nOlder conversation + tool output was compressed \
to fit the model context window.\n\n{summary}\n\n\
[RECOVERY] If you need specific details that this summary \
dropped (exact file paths, prior tool output, earlier user \
instructions, numeric values), call the `session_recall` \
tool with a targeted query instead of guessing or asking \
the user to repeat themselves."
),
}],
};
let mut new_messages = Vec::with_capacity(1 + tail.len());
new_messages.push(summary_msg);
new_messages.extend(tail);
*messages = new_messages;
Ok(true)
}
const OVERSIZED_LAST_MESSAGE_RATIO: f64 = 0.35;
const OVERSIZED_LAST_MESSAGE_PREFIX_CHARS: usize = 500;
pub(crate) async fn compress_last_message_if_oversized(
messages: &mut [Message],
ctx: &CompressContext,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
) -> Result<bool> {
let Some(last) = messages.last_mut() else {
return Ok(false);
};
let original = extract_message_text(last);
if original.is_empty() {
return Ok(false);
}
let ctx_window = context_window_for_model(model);
let msg_tokens = RlmChunker::estimate_tokens(&original);
let threshold = (ctx_window as f64 * OVERSIZED_LAST_MESSAGE_RATIO) as usize;
if msg_tokens <= threshold {
return Ok(false);
}
tracing::info!(
msg_tokens,
threshold,
ctx_window,
"RLM: Last message exceeds context threshold, compressing"
);
let auto_ctx = AutoProcessContext {
tool_id: "session_context",
tool_args: serde_json::json!({}),
session_id: &ctx.session_id,
abort: None,
on_progress: None,
provider: Arc::clone(&provider),
model: model.to_string(),
bus: None,
trace_id: None,
subcall_provider: ctx.subcall_provider.clone(),
subcall_model: ctx.subcall_model.clone(),
};
let replacement = match RlmRouter::auto_process(&original, auto_ctx, &ctx.rlm_config).await {
Ok(result) => {
tracing::info!(
input_tokens = result.stats.input_tokens,
output_tokens = result.stats.output_tokens,
"RLM: Last message compressed"
);
format!(
"[Original message: {msg_tokens} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
result.processed,
original
.chars()
.take(OVERSIZED_LAST_MESSAGE_PREFIX_CHARS)
.collect::<String>()
)
}
Err(e) => {
tracing::warn!(error = %e, "RLM: Failed to compress last message, using truncation");
let max_chars = threshold * 4;
RlmChunker::compress(&original, max_chars / 4, None)
}
};
last.content = vec![ContentPart::Text { text: replacement }];
Ok(true)
}
fn extract_message_text(msg: &Message) -> String {
let mut buf = String::new();
for part in &msg.content {
if let ContentPart::Text { text } = part {
if !buf.is_empty() {
buf.push('\n');
}
buf.push_str(text);
}
}
buf
}
pub(crate) async fn compress_history_keep_last(
session: &mut Session,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
keep_last: usize,
reason: &str,
) -> Result<bool> {
let ctx = CompressContext::from_session(session);
let did = compress_messages_keep_last(
&mut session.messages,
&ctx,
provider,
model,
keep_last,
reason,
)
.await?;
if did {
session.updated_at = Utc::now();
}
Ok(did)
}
pub(crate) async fn enforce_on_messages(
messages: &mut Vec<Message>,
ctx: &CompressContext,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
system_prompt: &str,
tools: &[ToolDefinition],
event_tx: Option<&mpsc::Sender<SessionEvent>>,
) -> Result<()> {
let ctx_window = context_window_for_model(model);
let reserve = session_completion_max_tokens().saturating_add(RESERVE_OVERHEAD_TOKENS);
let budget = ctx_window.saturating_sub(reserve);
let safety_budget = (budget as f64 * SAFETY_BUDGET_RATIO) as usize;
let initial_est = estimate_request_tokens(system_prompt, messages, tools);
let history_trigger = ctx.rlm_config.history_trigger_messages;
let history_over = history_trigger > 0 && messages.len() >= history_trigger;
if initial_est <= safety_budget && !history_over {
return Ok(());
}
let trigger_reason = if history_over && initial_est <= safety_budget {
"history_length"
} else {
"context_budget"
};
let trace_id = Uuid::new_v4();
emit(
event_tx,
SessionEvent::CompactionStarted(CompactionStart {
trace_id,
reason: trigger_reason.to_string(),
before_tokens: initial_est,
budget: safety_budget,
}),
)
.await;
for keep_last in KEEP_LAST_CANDIDATES {
let est = estimate_request_tokens(system_prompt, messages, tools);
let still_long =
history_trigger > 0 && messages.len() > history_trigger.saturating_sub(keep_last);
if est <= safety_budget && !still_long {
emit(
event_tx,
SessionEvent::CompactionCompleted(CompactionOutcome {
trace_id,
strategy: FallbackStrategy::Rlm,
before_tokens: initial_est,
after_tokens: est,
kept_messages: messages.len(),
}),
)
.await;
return Ok(());
}
tracing::info!(
est_tokens = est,
ctx_window,
safety_budget,
keep_last,
"Context window approaching limit; compressing older session history"
);
let did = compress_messages_keep_last(
messages,
ctx,
Arc::clone(&provider),
model,
keep_last,
trigger_reason,
)
.await?;
if !did {
break;
}
}
let last_est = estimate_request_tokens(system_prompt, messages, tools);
if last_est <= safety_budget {
emit(
event_tx,
SessionEvent::CompactionCompleted(CompactionOutcome {
trace_id,
strategy: FallbackStrategy::Rlm,
before_tokens: initial_est,
after_tokens: last_est,
kept_messages: messages.len(),
}),
)
.await;
return Ok(());
}
let dropped_tokens =
terminal_truncate_messages(messages, system_prompt, tools, TRUNCATE_KEEP_LAST);
let after_tokens = estimate_request_tokens(system_prompt, messages, tools);
tracing::warn!(
before_tokens = initial_est,
after_tokens,
dropped_tokens,
kept_messages = messages.len(),
safety_budget,
"All RLM compaction attempts exhausted; applied terminal truncation fallback"
);
emit(
event_tx,
SessionEvent::ContextTruncated(ContextTruncation {
trace_id,
dropped_tokens,
kept_messages: messages.len(),
archive_ref: None,
}),
)
.await;
emit(
event_tx,
SessionEvent::CompactionCompleted(CompactionOutcome {
trace_id,
strategy: FallbackStrategy::Truncate,
before_tokens: initial_est,
after_tokens,
kept_messages: messages.len(),
}),
)
.await;
if after_tokens > safety_budget {
tracing::error!(
after_tokens,
safety_budget,
"Terminal truncation still over budget; request will likely fail at the provider"
);
emit(
event_tx,
SessionEvent::CompactionFailed(CompactionFailure {
trace_id,
fell_back_to: Some(FallbackStrategy::Truncate),
reason: "terminal truncation still exceeds safety budget".to_string(),
after_tokens,
budget: safety_budget,
}),
)
.await;
}
Ok(())
}
pub(crate) async fn enforce_context_window(
session: &mut Session,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
system_prompt: &str,
tools: &[ToolDefinition],
event_tx: Option<&mpsc::Sender<SessionEvent>>,
) -> Result<()> {
let ctx = CompressContext::from_session(session);
let before_len = session.messages.len();
enforce_on_messages(
&mut session.messages,
&ctx,
provider,
model,
system_prompt,
tools,
event_tx,
)
.await?;
if session.messages.len() != before_len {
session.updated_at = Utc::now();
}
Ok(())
}
pub(crate) fn terminal_truncate_messages(
messages: &mut Vec<Message>,
system_prompt: &str,
tools: &[ToolDefinition],
keep_last: usize,
) -> usize {
if messages.len() <= keep_last {
return 0;
}
let before = estimate_request_tokens(system_prompt, messages, tools);
let split_idx = messages.len().saturating_sub(keep_last);
let tail = messages.split_off(split_idx);
let marker = Message {
role: Role::Assistant,
content: vec![ContentPart::Text {
text: "[CONTEXT TRUNCATED]\nOlder conversation was dropped to keep the request \
under the model's context window. Ask for details to recall anything \
specific."
.to_string(),
}],
};
let mut new_messages = Vec::with_capacity(1 + tail.len());
new_messages.push(marker);
new_messages.extend(tail);
*messages = new_messages;
let after = estimate_request_tokens(system_prompt, messages, tools);
before.saturating_sub(after)
}
async fn emit(event_tx: Option<&mpsc::Sender<SessionEvent>>, ev: SessionEvent) {
if let Some(tx) = event_tx {
let _ = tx.send(ev).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn user(text: &str) -> Message {
Message {
role: Role::User,
content: vec![ContentPart::Text {
text: text.to_string(),
}],
}
}
#[test]
fn terminal_truncate_noop_when_short_enough() {
let mut messages = vec![user("a"), user("b")];
let before_len = messages.len();
let dropped = terminal_truncate_messages(&mut messages, "", &[], 4);
assert_eq!(dropped, 0);
assert_eq!(messages.len(), before_len);
}
#[test]
fn terminal_truncate_drops_prefix_and_prepends_marker() {
let mut messages: Vec<Message> = (0..10).map(|i| user(&format!("msg-{i}"))).collect();
let _ = terminal_truncate_messages(&mut messages, "", &[], 3);
assert_eq!(messages.len(), 4);
assert!(matches!(messages[0].role, Role::Assistant));
if let ContentPart::Text { text } = &messages[0].content[0] {
assert!(text.starts_with("[CONTEXT TRUNCATED]"));
} else {
panic!("expected text content on the synthetic marker");
}
let kept_texts: Vec<&str> = messages[1..]
.iter()
.map(|m| match &m.content[0] {
ContentPart::Text { text } => text.as_str(),
_ => panic!("expected text content"),
})
.collect();
assert_eq!(kept_texts, vec!["msg-7", "msg-8", "msg-9"]);
}
#[test]
fn compress_context_from_session_snapshot_is_independent() {
let session = Session {
id: "session-42".to_string(),
title: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
metadata: Default::default(),
agent: "test".to_string(),
messages: Vec::new(),
tool_uses: Vec::new(),
usage: Default::default(),
max_steps: None,
bus: None,
};
let snapshot = CompressContext::from_session(&session);
assert_eq!(snapshot.session_id, "session-42");
assert!(snapshot.subcall_provider.is_none());
assert!(snapshot.subcall_model.is_none());
}
}