use std::sync::Arc;
use anyhow::Result;
use chrono::Utc;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::provider::{ContentPart, Message, Role, ToolDefinition};
use crate::rlm::router::AutoProcessContext;
use crate::rlm::{RlmChunker, RlmRouter};
use super::error::messages_to_rlm_context;
use super::token::{
context_window_for_model, estimate_request_tokens, session_completion_max_tokens,
};
use crate::session::event_compaction::{
CompactionFailure, CompactionOutcome, CompactionStart, ContextTruncation, FallbackStrategy,
};
use crate::session::{Session, SessionEvent};
const KEEP_LAST_CANDIDATES: [usize; 4] = [16, 12, 8, 6];
const TRUNCATE_KEEP_LAST: usize = 4;
const SAFETY_BUDGET_RATIO: f64 = 0.90;
const RESERVE_OVERHEAD_TOKENS: usize = 2048;
const FALLBACK_CHUNK_RATIO: f64 = 0.25;
pub(crate) async fn compress_history_keep_last(
session: &mut Session,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
keep_last: usize,
reason: &str,
) -> Result<bool> {
if session.messages.len() <= keep_last {
return Ok(false);
}
let split_idx = session.messages.len().saturating_sub(keep_last);
let tail = session.messages.split_off(split_idx);
let prefix = std::mem::take(&mut session.messages);
let context = messages_to_rlm_context(&prefix);
let ctx_window = context_window_for_model(model);
let rlm_config = session.metadata.rlm.clone();
let auto_ctx = AutoProcessContext {
tool_id: "session_context",
tool_args: serde_json::json!({"reason": reason}),
session_id: &session.id,
abort: None,
on_progress: None,
provider,
model: model.to_string(),
bus: None,
trace_id: None,
subcall_provider: session.metadata.subcall_provider.clone(),
subcall_model: session.metadata.subcall_model_name.clone(),
};
let summary = match RlmRouter::auto_process(&context, auto_ctx, &rlm_config).await {
Ok(result) => {
tracing::info!(
reason,
input_tokens = result.stats.input_tokens,
output_tokens = result.stats.output_tokens,
compression_ratio = result.stats.compression_ratio,
"RLM: Compressed session history"
);
result.processed
}
Err(e) => {
tracing::warn!(
reason,
error = %e,
"RLM: Failed to compress session history; falling back to chunk compression"
);
RlmChunker::compress(
&context,
(ctx_window as f64 * FALLBACK_CHUNK_RATIO) as usize,
None,
)
}
};
let summary_msg = Message {
role: Role::Assistant,
content: vec![ContentPart::Text {
text: format!(
"[AUTO CONTEXT COMPRESSION]\nOlder conversation + tool output was compressed \
to fit the model context window.\n\n{summary}"
),
}],
};
let mut new_messages = Vec::with_capacity(1 + tail.len());
new_messages.push(summary_msg);
new_messages.extend(tail);
session.messages = new_messages;
session.updated_at = Utc::now();
Ok(true)
}
pub(crate) async fn enforce_context_window(
session: &mut Session,
provider: Arc<dyn crate::provider::Provider>,
model: &str,
system_prompt: &str,
tools: &[ToolDefinition],
event_tx: Option<&mpsc::Sender<SessionEvent>>,
) -> Result<()> {
let ctx_window = context_window_for_model(model);
let reserve = session_completion_max_tokens().saturating_add(RESERVE_OVERHEAD_TOKENS);
let budget = ctx_window.saturating_sub(reserve);
let safety_budget = (budget as f64 * SAFETY_BUDGET_RATIO) as usize;
let initial_est = estimate_request_tokens(system_prompt, &session.messages, tools);
if initial_est <= safety_budget {
return Ok(());
}
let trace_id = Uuid::new_v4();
emit(
event_tx,
SessionEvent::CompactionStarted(CompactionStart {
trace_id,
reason: "context_budget".to_string(),
before_tokens: initial_est,
budget: safety_budget,
}),
)
.await;
for keep_last in KEEP_LAST_CANDIDATES {
let est = estimate_request_tokens(system_prompt, &session.messages, tools);
if est <= safety_budget {
emit(
event_tx,
SessionEvent::CompactionCompleted(CompactionOutcome {
trace_id,
strategy: FallbackStrategy::Rlm,
before_tokens: initial_est,
after_tokens: est,
kept_messages: session.messages.len(),
}),
)
.await;
return Ok(());
}
tracing::info!(
est_tokens = est,
ctx_window,
safety_budget,
keep_last,
"Context window approaching limit; compressing older session history"
);
let did = compress_history_keep_last(
session,
Arc::clone(&provider),
model,
keep_last,
"context_budget",
)
.await?;
if !did {
break;
}
}
let last_est = estimate_request_tokens(system_prompt, &session.messages, tools);
if last_est <= safety_budget {
emit(
event_tx,
SessionEvent::CompactionCompleted(CompactionOutcome {
trace_id,
strategy: FallbackStrategy::Rlm,
before_tokens: initial_est,
after_tokens: last_est,
kept_messages: session.messages.len(),
}),
)
.await;
return Ok(());
}
let dropped_tokens =
terminal_truncate_history(session, system_prompt, tools, TRUNCATE_KEEP_LAST);
let after_tokens = estimate_request_tokens(system_prompt, &session.messages, tools);
tracing::warn!(
before_tokens = initial_est,
after_tokens,
dropped_tokens,
kept_messages = session.messages.len(),
safety_budget,
"All RLM compaction attempts exhausted; applied terminal truncation fallback"
);
emit(
event_tx,
SessionEvent::ContextTruncated(ContextTruncation {
trace_id,
dropped_tokens,
kept_messages: session.messages.len(),
archive_ref: None,
}),
)
.await;
emit(
event_tx,
SessionEvent::CompactionCompleted(CompactionOutcome {
trace_id,
strategy: FallbackStrategy::Truncate,
before_tokens: initial_est,
after_tokens,
kept_messages: session.messages.len(),
}),
)
.await;
if after_tokens > safety_budget {
tracing::error!(
after_tokens,
safety_budget,
"Terminal truncation still over budget; request will likely fail at the provider"
);
emit(
event_tx,
SessionEvent::CompactionFailed(CompactionFailure {
trace_id,
fell_back_to: Some(FallbackStrategy::Truncate),
reason: "terminal truncation still exceeds safety budget".to_string(),
after_tokens,
budget: safety_budget,
}),
)
.await;
}
Ok(())
}
fn terminal_truncate_history(
session: &mut Session,
system_prompt: &str,
tools: &[ToolDefinition],
keep_last: usize,
) -> usize {
if session.messages.len() <= keep_last {
return 0;
}
let before = estimate_request_tokens(system_prompt, &session.messages, tools);
let split_idx = session.messages.len().saturating_sub(keep_last);
let tail = session.messages.split_off(split_idx);
let marker = Message {
role: Role::Assistant,
content: vec![ContentPart::Text {
text: "[CONTEXT TRUNCATED]\nOlder conversation was dropped to keep the request \
under the model's context window. Ask for details to recall anything \
specific."
.to_string(),
}],
};
let mut new_messages = Vec::with_capacity(1 + tail.len());
new_messages.push(marker);
new_messages.extend(tail);
session.messages = new_messages;
session.updated_at = Utc::now();
let after = estimate_request_tokens(system_prompt, &session.messages, tools);
before.saturating_sub(after)
}
async fn emit(event_tx: Option<&mpsc::Sender<SessionEvent>>, ev: SessionEvent) {
if let Some(tx) = event_tx {
let _ = tx.send(ev).await;
}
}