use std::sync::Arc;
use anyhow::Result;
use chrono::Utc;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::provider::{ContentPart, Message, Role, ToolDefinition};
use crate::rlm::RlmChunker;
use super::error::messages_to_rlm_context;
use super::token::{
context_window_for_model, estimate_request_tokens, session_completion_max_tokens,
};
use crate::session::event_compaction::{
CompactionFailure, CompactionOutcome, CompactionStart, ContextTruncation, FallbackStrategy,
};
use crate::session::{Session, SessionEvent};
const KEEP_LAST_CANDIDATES: [usize; 6] = [16, 12, 8, 6, 3, 1];
const TRUNCATE_KEEP_LAST: usize = 4;
const TERMINAL_PAYLOAD_CAPS: [usize; 8] = [
262_144, 131_072, 65_536, 32_768, 16_384, 8_192, 4_096, 2_048,
];
const TERMINAL_EMERGENCY_CAP_BYTES: usize = 1_024;
const SAFETY_BUDGET_RATIO: f64 = 0.90;
const RESERVE_OVERHEAD_TOKENS: usize = 2048;
#[derive(Clone)]
pub(crate) struct CompressContext {
pub rlm_config: crate::rlm::RlmConfig,
pub session_id: String,
}
impl CompressContext {
pub(crate) fn from_session(session: &Session) -> Self {
Self {
rlm_config: session.metadata.rlm.clone(),
session_id: session.id.clone(),
}
}
}
pub(crate) async fn compress_messages_keep_last(
messages: &mut Vec<Message>,
ctx: &CompressContext,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
keep_last: usize,
reason: &str,
) -> Result<bool> {
if messages.len() <= keep_last {
return Ok(false);
}
let split_idx = crate::session::context::active_tail::active_tail_start(messages, keep_last);
let tail = messages.split_off(split_idx);
let prefix = std::mem::take(messages);
let context = messages_to_rlm_context(&prefix);
let ctx_window = context_window_for_model(model);
if context.trim().is_empty() {
*messages = prefix;
messages.extend(tail);
return Ok(false);
}
if let Some(summary) = super::compression_defer::context_summary(
&context,
ctx_window,
&ctx.session_id,
reason,
model,
Arc::clone(&provider),
&ctx.rlm_config,
) {
super::compression_summary::install(messages, tail, summary);
return Ok(true);
}
Ok(false)
}
const OVERSIZED_LAST_MESSAGE_RATIO: f64 = 0.35;
const OVERSIZED_LAST_MESSAGE_PREFIX_CHARS: usize = 500;
pub(crate) async fn compress_last_message_if_oversized(
messages: &mut [Message],
ctx: &CompressContext,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
) -> Result<bool> {
let Some(last) = messages.last_mut() else {
return Ok(false);
};
let original = extract_message_text(last);
if original.is_empty() {
return Ok(false);
}
let ctx_window = context_window_for_model(model);
let msg_tokens = RlmChunker::estimate_tokens(&original);
let threshold = (ctx_window as f64 * OVERSIZED_LAST_MESSAGE_RATIO) as usize;
if msg_tokens <= threshold {
return Ok(false);
}
tracing::info!(
msg_tokens,
threshold,
ctx_window,
"RLM: Last message exceeds context threshold, compressing"
);
let cached = super::rlm_background::context_summary(
&original,
"oversized_last_message",
&ctx.session_id,
model,
provider,
&ctx.rlm_config,
);
let replacement = cached
.map(|summary| {
super::compression_last_message::wrap_cached(
summary,
&original,
msg_tokens,
OVERSIZED_LAST_MESSAGE_PREFIX_CHARS,
)
})
.unwrap_or_else(|| RlmChunker::compress(&original, threshold, None));
last.content = vec![ContentPart::Text { text: replacement }];
Ok(true)
}
fn extract_message_text(msg: &Message) -> String {
let mut buf = String::new();
for part in &msg.content {
if let ContentPart::Text { text } = part {
if !buf.is_empty() {
buf.push('\n');
}
buf.push_str(text);
}
}
buf
}
pub async fn compress_history_keep_last(
session: &mut Session,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
keep_last: usize,
reason: &str,
) -> Result<bool> {
let ctx = CompressContext::from_session(session);
let did = compress_messages_keep_last(
&mut session.messages,
&ctx,
provider,
model,
keep_last,
reason,
)
.await?;
if did {
session.updated_at = Utc::now();
}
Ok(did)
}
pub(crate) async fn enforce_on_messages(
messages: &mut Vec<Message>,
ctx: &CompressContext,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
system_prompt: &str,
tools: &[ToolDefinition],
event_tx: Option<&mpsc::Sender<SessionEvent>>,
) -> Result<()> {
let ctx_window = context_window_for_model(model);
let reserve = session_completion_max_tokens().saturating_add(RESERVE_OVERHEAD_TOKENS);
let budget = ctx_window.saturating_sub(reserve);
let safety_budget = (budget as f64 * SAFETY_BUDGET_RATIO) as usize;
let initial_est = estimate_request_tokens(system_prompt, messages, tools);
let history_trigger = ctx.rlm_config.history_trigger_messages;
let history_over = history_trigger > 0 && messages.len() >= history_trigger;
if initial_est <= safety_budget && !history_over {
return Ok(());
}
let trigger_reason = if history_over && initial_est <= safety_budget {
"history_length"
} else {
"context_budget"
};
let trace_id = Uuid::new_v4();
emit(
event_tx,
SessionEvent::CompactionStarted(CompactionStart {
trace_id,
reason: trigger_reason.to_string(),
before_tokens: initial_est,
budget: safety_budget,
}),
)
.await;
for keep_last in KEEP_LAST_CANDIDATES {
let est = estimate_request_tokens(system_prompt, messages, tools);
let still_long =
history_trigger > 0 && messages.len() > history_trigger.saturating_sub(keep_last);
if est <= safety_budget && !still_long {
emit(
event_tx,
SessionEvent::CompactionCompleted(CompactionOutcome {
trace_id,
strategy: FallbackStrategy::Rlm,
before_tokens: initial_est,
after_tokens: est,
kept_messages: messages.len(),
}),
)
.await;
return Ok(());
}
tracing::info!(
est_tokens = est,
ctx_window,
safety_budget,
keep_last,
"Context window approaching limit; compressing older session history"
);
let did = compress_messages_keep_last(
messages,
ctx,
Arc::clone(&provider),
model,
keep_last,
trigger_reason,
)
.await?;
if !did {
break;
}
}
let last_est = estimate_request_tokens(system_prompt, messages, tools);
if last_est <= safety_budget {
emit(
event_tx,
SessionEvent::CompactionCompleted(CompactionOutcome {
trace_id,
strategy: FallbackStrategy::Rlm,
before_tokens: initial_est,
after_tokens: last_est,
kept_messages: messages.len(),
}),
)
.await;
return Ok(());
}
let dropped_tokens = terminal_truncate_messages(
messages,
system_prompt,
tools,
TRUNCATE_KEEP_LAST,
safety_budget,
);
let after_tokens = estimate_request_tokens(system_prompt, messages, tools);
tracing::warn!(
before_tokens = initial_est,
after_tokens,
dropped_tokens,
kept_messages = messages.len(),
safety_budget,
"All RLM compaction attempts exhausted; applied terminal truncation fallback"
);
emit(
event_tx,
SessionEvent::ContextTruncated(ContextTruncation {
trace_id,
dropped_tokens,
kept_messages: messages.len(),
archive_ref: None,
}),
)
.await;
emit(
event_tx,
SessionEvent::CompactionCompleted(CompactionOutcome {
trace_id,
strategy: FallbackStrategy::Truncate,
before_tokens: initial_est,
after_tokens,
kept_messages: messages.len(),
}),
)
.await;
if after_tokens > safety_budget {
tracing::error!(
after_tokens,
safety_budget,
"Terminal truncation still over budget; request will likely fail at the provider"
);
emit(
event_tx,
SessionEvent::CompactionFailed(CompactionFailure {
trace_id,
fell_back_to: Some(FallbackStrategy::Truncate),
reason: "terminal truncation still exceeds safety budget".to_string(),
after_tokens,
budget: safety_budget,
}),
)
.await;
}
Ok(())
}
pub async fn enforce_context_window(
session: &mut Session,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
system_prompt: &str,
tools: &[ToolDefinition],
event_tx: Option<&mpsc::Sender<SessionEvent>>,
) -> Result<()> {
let ctx = CompressContext::from_session(session);
let before_len = session.messages.len();
enforce_on_messages(
&mut session.messages,
&ctx,
provider,
model,
system_prompt,
tools,
event_tx,
)
.await?;
if session.messages.len() != before_len {
session.updated_at = Utc::now();
}
Ok(())
}
pub(crate) fn terminal_truncate_messages(
messages: &mut Vec<Message>,
system_prompt: &str,
tools: &[ToolDefinition],
keep_last: usize,
target_tokens: usize,
) -> usize {
let before = estimate_request_tokens(system_prompt, messages, tools);
if messages.len() <= keep_last && before <= target_tokens {
return 0;
}
let split_idx = if messages.len() > keep_last {
crate::session::context::active_tail::active_tail_start(messages, keep_last)
} else {
0
};
let dropped_prefix = split_idx > 0;
let tail = if dropped_prefix {
messages.split_off(split_idx)
} else {
std::mem::take(messages)
};
let marker = Message {
role: Role::Assistant,
content: vec![ContentPart::Text {
text: if dropped_prefix {
"[CONTEXT TRUNCATED]\nOlder conversation was dropped to keep the request \
under the model's context window. Some retained tool output may also be \
shortened with head/tail snippets."
.to_string()
} else {
"[CONTEXT TRUNCATED]\nRecent tool output was too large for the model's \
context window and was shortened with head/tail snippets."
.to_string()
},
}],
};
let mut new_messages = Vec::with_capacity(1 + tail.len());
new_messages.push(marker);
new_messages.extend(tail);
*messages = new_messages;
let shrunk_parts =
shrink_retained_payloads_to_budget(messages, system_prompt, tools, target_tokens);
if shrunk_parts > 0 {
tracing::warn!(
shrunk_parts,
target_tokens,
after_tokens = estimate_request_tokens(system_prompt, messages, tools),
"Terminal truncation shortened retained message payloads"
);
}
let after = estimate_request_tokens(system_prompt, messages, tools);
before.saturating_sub(after)
}
fn shrink_retained_payloads_to_budget(
messages: &mut [Message],
system_prompt: &str,
tools: &[ToolDefinition],
target_tokens: usize,
) -> usize {
if target_tokens == usize::MAX
|| estimate_request_tokens(system_prompt, messages, tools) <= target_tokens
{
return 0;
}
let mut changed_parts = 0;
for cap in TERMINAL_PAYLOAD_CAPS {
changed_parts += shrink_payloads_over_cap(messages, cap, false);
if estimate_request_tokens(system_prompt, messages, tools) <= target_tokens {
return changed_parts;
}
}
changed_parts += shrink_payloads_over_cap(messages, TERMINAL_EMERGENCY_CAP_BYTES, true);
changed_parts
}
fn shrink_payloads_over_cap(
messages: &mut [Message],
cap_bytes: usize,
include_latest_user: bool,
) -> usize {
let message_count = messages.len();
let mut changed = 0;
for (msg_idx, msg) in messages.iter_mut().enumerate() {
let role = msg.role;
let is_latest_user = matches!(role, Role::User) && msg_idx + 1 == message_count;
for part in &mut msg.content {
let cap = match part {
ContentPart::ToolResult { .. } | ContentPart::Thinking { .. } => cap_bytes,
ContentPart::ToolCall { .. } => cap_bytes,
ContentPart::Text { text } => {
if msg_idx == 0 && text.starts_with("[CONTEXT TRUNCATED]") {
continue;
}
if is_latest_user && !include_latest_user {
cap_bytes.max(16_384)
} else {
cap_bytes
}
}
ContentPart::Image { .. } | ContentPart::File { .. } => continue,
};
if shrink_content_part(part, cap) {
changed += 1;
}
}
}
changed
}
fn shrink_content_part(part: &mut ContentPart, cap_bytes: usize) -> bool {
match part {
ContentPart::Text { text } => shrink_string_payload(text, cap_bytes, "text"),
ContentPart::ToolResult { content, .. } => {
shrink_string_payload(content, cap_bytes, "tool_result")
}
ContentPart::ToolCall { arguments, .. } => {
shrink_string_payload(arguments, cap_bytes, "tool_call_arguments")
}
ContentPart::Thinking { text } => shrink_string_payload(text, cap_bytes, "thinking"),
ContentPart::Image { .. } | ContentPart::File { .. } => false,
}
}
fn shrink_string_payload(value: &mut String, cap_bytes: usize, label: &str) -> bool {
if value.len() <= cap_bytes {
return false;
}
*value = terminal_head_tail(value, cap_bytes, label);
true
}
fn terminal_head_tail(value: &str, cap_bytes: usize, label: &str) -> String {
let marker = format!(
"\n\n[terminal context fallback truncated {label}; original_bytes={}]\n\n",
value.len()
);
if cap_bytes <= marker.len() + 32 {
return format!(
"[terminal context fallback truncated {label}; original_bytes={}]",
value.len()
);
}
let available = cap_bytes - marker.len();
let head_budget = available / 2;
let tail_budget = available - head_budget;
let head = crate::util::truncate_bytes_safe(value, head_budget);
let tail = suffix_bytes_safe(value, tail_budget);
format!("{head}{marker}{tail}")
}
fn suffix_bytes_safe(s: &str, max_bytes: usize) -> &str {
if s.len() <= max_bytes {
return s;
}
let mut start = s.len().saturating_sub(max_bytes);
while start < s.len() && !s.is_char_boundary(start) {
start += 1;
}
&s[start..]
}
async fn emit(event_tx: Option<&mpsc::Sender<SessionEvent>>, ev: SessionEvent) {
if let Some(tx) = event_tx {
let _ = tx.send(ev).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn user(text: &str) -> Message {
Message {
role: Role::User,
content: vec![ContentPart::Text {
text: text.to_string(),
}],
}
}
fn tool_result(content: String) -> Message {
Message {
role: Role::Tool,
content: vec![ContentPart::ToolResult {
tool_call_id: "call_1".to_string(),
content,
}],
}
}
#[test]
fn terminal_truncate_noop_when_short_enough() {
let mut messages = vec![user("a"), user("b")];
let before_len = messages.len();
let dropped = terminal_truncate_messages(&mut messages, "", &[], 4, usize::MAX);
assert_eq!(dropped, 0);
assert_eq!(messages.len(), before_len);
}
#[test]
fn terminal_truncate_drops_prefix_and_prepends_marker() {
let mut messages: Vec<Message> = (0..10).map(|i| user(&format!("msg-{i}"))).collect();
let _ = terminal_truncate_messages(&mut messages, "", &[], 3, usize::MAX);
assert_eq!(messages.len(), 4);
assert!(matches!(messages[0].role, Role::Assistant));
if let ContentPart::Text { text } = &messages[0].content[0] {
assert!(text.starts_with("[CONTEXT TRUNCATED]"));
} else {
panic!("expected text content on the synthetic marker");
}
let kept_texts: Vec<&str> = messages[1..]
.iter()
.map(|m| match &m.content[0] {
ContentPart::Text { text } => text.as_str(),
_ => panic!("expected text content"),
})
.collect();
assert_eq!(kept_texts, vec!["msg-7", "msg-8", "msg-9"]);
}
#[test]
fn terminal_truncate_shrinks_oversized_tail_when_message_count_is_short() {
let huge_output = format!("head\n{}\ntail", "x".repeat(200_000));
let mut messages = vec![user("inspect the logs"), tool_result(huge_output)];
let before = estimate_request_tokens("", &messages, &[]);
let dropped = terminal_truncate_messages(&mut messages, "", &[], 4, 6_000);
let after = estimate_request_tokens("", &messages, &[]);
assert!(dropped > 0);
assert!(after < before);
assert!(after <= 6_000, "after={after}");
assert_eq!(messages.len(), 3);
assert!(matches!(messages[0].role, Role::Assistant));
let ContentPart::ToolResult { content, .. } = &messages[2].content[0] else {
panic!("expected tool result");
};
assert!(content.contains("terminal context fallback truncated tool_result"));
assert!(content.contains("head"));
assert!(content.contains("tail"));
}
#[test]
fn compress_context_from_session_snapshot_is_independent() {
let session = Session {
id: "session-42".to_string(),
title: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
metadata: Default::default(),
agent: "test".to_string(),
messages: Vec::new(),
pages: Vec::new(),
summary_index: crate::session::index::SummaryIndex::new(),
tool_uses: Vec::new(),
usage: Default::default(),
max_steps: None,
bus: None,
};
let snapshot = CompressContext::from_session(&session);
assert_eq!(snapshot.session_id, "session-42");
}
}