use std::sync::Arc;
use chrono::Utc;
use crate::agent::context_monitor::{CompactionStrategy, ContextBreakdown};
use crate::agent::session::Thread;
use crate::error::Error;
use crate::llm::{ChatMessage, CompletionRequest, LlmProvider, Reasoning};
use crate::workspace::Workspace;
#[derive(Debug)]
pub struct CompactionResult {
pub turns_removed: usize,
pub tokens_before: usize,
pub tokens_after: usize,
pub summary_written: bool,
pub summary: Option<String>,
}
pub struct ContextCompactor {
llm: Arc<dyn LlmProvider>,
}
impl ContextCompactor {
pub fn new(llm: Arc<dyn LlmProvider>) -> Self {
Self { llm }
}
pub async fn compact(
&self,
thread: &mut Thread,
strategy: CompactionStrategy,
workspace: Option<&Workspace>,
) -> Result<CompactionResult, Error> {
let messages = thread.messages();
let tokens_before = ContextBreakdown::analyze(&messages).total_tokens;
let result = match strategy {
CompactionStrategy::Summarize { keep_recent } => {
self.compact_with_summary(thread, keep_recent, workspace)
.await?
}
CompactionStrategy::Truncate { keep_recent } => {
self.compact_truncate(thread, keep_recent)
}
CompactionStrategy::MoveToWorkspace => {
self.compact_to_workspace(thread, workspace).await?
}
};
let messages_after = thread.messages();
let tokens_after = ContextBreakdown::analyze(&messages_after).total_tokens;
Ok(CompactionResult {
turns_removed: result.turns_removed,
tokens_before,
tokens_after,
summary_written: result.summary_written,
summary: result.summary,
})
}
async fn compact_with_summary(
&self,
thread: &mut Thread,
keep_recent: usize,
workspace: Option<&Workspace>,
) -> Result<CompactionPartial, Error> {
if thread.turns.len() <= keep_recent {
return Ok(CompactionPartial::empty());
}
let turns_to_remove = thread.turns.len() - keep_recent;
let old_turns = &thread.turns[..turns_to_remove];
let mut to_summarize = Vec::new();
for turn in old_turns {
to_summarize.push(ChatMessage::user(&turn.user_input));
if let Some(ref response) = turn.response {
to_summarize.push(ChatMessage::assistant(response));
}
}
let summary = self.generate_summary(&to_summarize).await?;
let (summary_written, turns_removed) = if let Some(ws) = workspace {
match self.write_summary_to_workspace(ws, &summary).await {
Ok(()) => {
thread.truncate_turns(keep_recent);
(true, turns_to_remove)
}
Err(e) => {
tracing::warn!("Compaction summary write failed (turns preserved): {}", e);
(false, 0)
}
}
} else {
thread.truncate_turns(keep_recent);
(false, turns_to_remove)
};
Ok(CompactionPartial {
turns_removed,
summary_written,
summary: Some(summary),
})
}
fn compact_truncate(&self, thread: &mut Thread, keep_recent: usize) -> CompactionPartial {
let turns_before = thread.turns.len();
thread.truncate_turns(keep_recent);
let turns_removed = turns_before - thread.turns.len();
CompactionPartial {
turns_removed,
summary_written: false,
summary: None,
}
}
async fn compact_to_workspace(
&self,
thread: &mut Thread,
workspace: Option<&Workspace>,
) -> Result<CompactionPartial, Error> {
let Some(ws) = workspace else {
return Ok(self.compact_truncate(thread, 5));
};
let keep_recent = 10;
if thread.turns.len() <= keep_recent {
return Ok(CompactionPartial::empty());
}
let turns_to_remove = thread.turns.len() - keep_recent;
let old_turns = &thread.turns[..turns_to_remove];
let content = format_turns_for_storage(old_turns);
let (written, turns_removed) = match self.write_context_to_workspace(ws, &content).await {
Ok(()) => {
thread.truncate_turns(keep_recent);
(true, turns_to_remove)
}
Err(e) => {
tracing::warn!("Compaction context write failed (turns preserved): {}", e);
(false, 0)
}
};
Ok(CompactionPartial {
turns_removed,
summary_written: written,
summary: None,
})
}
async fn generate_summary(&self, messages: &[ChatMessage]) -> Result<String, Error> {
let prompt = ChatMessage::system(
r#"Summarize the following conversation concisely. Focus on:
- Key decisions made
- Important information exchanged
- Actions taken
- Outcomes achieved
Be brief but capture all important details. Use bullet points."#,
);
let mut request_messages = vec![prompt];
let formatted = messages
.iter()
.map(|m| {
let role_str = match m.role {
crate::llm::Role::User => "User",
crate::llm::Role::Assistant => "Assistant",
crate::llm::Role::System => "System",
crate::llm::Role::Tool => {
return format!(
"Tool {}: {}",
m.name.as_deref().unwrap_or("unknown"),
m.content
);
}
};
format!("{}: {}", role_str, m.content)
})
.collect::<Vec<_>>()
.join("\n\n");
request_messages.push(ChatMessage::user(format!(
"Please summarize this conversation:\n\n{}",
formatted
)));
let request = CompletionRequest::new(request_messages)
.with_max_tokens(1024)
.with_temperature(0.3);
let reasoning =
Reasoning::new(self.llm.clone()).with_model_name(self.llm.active_model_name());
let (text, _) = reasoning.complete(request).await?;
Ok(text)
}
async fn write_summary_to_workspace(
&self,
workspace: &Workspace,
summary: &str,
) -> Result<(), Error> {
let date = Utc::now().format("%Y-%m-%d");
let entry = format!(
"\n## Context Summary ({})\n\n{}\n",
Utc::now().format("%H:%M UTC"),
summary
);
workspace
.append(&format!("daily/{}.md", date), &entry)
.await?;
Ok(())
}
async fn write_context_to_workspace(
&self,
workspace: &Workspace,
content: &str,
) -> Result<(), Error> {
let date = Utc::now().format("%Y-%m-%d");
let entry = format!(
"\n## Archived Context ({})\n\n{}\n",
Utc::now().format("%H:%M UTC"),
content
);
workspace
.append(&format!("daily/{}.md", date), &entry)
.await?;
Ok(())
}
}
struct CompactionPartial {
turns_removed: usize,
summary_written: bool,
summary: Option<String>,
}
impl CompactionPartial {
fn empty() -> Self {
Self {
turns_removed: 0,
summary_written: false,
summary: None,
}
}
}
fn format_turns_for_storage(turns: &[crate::agent::session::Turn]) -> String {
turns
.iter()
.map(|turn| {
let mut s = format!("**Turn {}**\n", turn.turn_number + 1);
s.push_str(&format!("User: {}\n", turn.user_input));
if let Some(ref response) = turn.response {
s.push_str(&format!("Agent: {}\n", response));
}
if !turn.tool_calls.is_empty() {
s.push_str("Tools: ");
let tools: Vec<_> = turn.tool_calls.iter().map(|t| t.name.as_str()).collect();
s.push_str(&tools.join(", "));
s.push('\n');
}
s
})
.collect::<Vec<_>>()
.join("\n")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent::session::Thread;
use uuid::Uuid;
#[test]
fn test_format_turns() {
let mut thread = Thread::new(Uuid::new_v4());
thread.start_turn("Hello");
thread.complete_turn("Hi there");
thread.start_turn("How are you?");
thread.complete_turn("I'm good!");
let formatted = format_turns_for_storage(&thread.turns);
assert!(formatted.contains("Turn 1"));
assert!(formatted.contains("Hello"));
assert!(formatted.contains("Turn 2"));
}
#[test]
fn test_compaction_partial_empty() {
let partial = CompactionPartial::empty();
assert_eq!(partial.turns_removed, 0);
assert!(!partial.summary_written);
}
use crate::agent::context_monitor::CompactionStrategy;
use crate::testing::StubLlm;
fn make_compactor(llm: Arc<StubLlm>) -> ContextCompactor {
ContextCompactor::new(llm)
}
fn make_thread(n: usize) -> Thread {
let mut thread = Thread::new(Uuid::new_v4());
for i in 0..n {
thread.start_turn(format!("msg-{}", i));
thread.complete_turn(format!("resp-{}", i));
}
thread
}
#[cfg(feature = "libsql")]
async fn make_unmigrated_workspace() -> crate::workspace::Workspace {
use crate::db::Database;
use crate::db::libsql::LibSqlBackend;
let backend = LibSqlBackend::new_memory()
.await
.expect("should create in-memory libsql backend");
let db: Arc<dyn Database> = Arc::new(backend);
crate::workspace::Workspace::new_with_db("compaction-test", db)
}
#[tokio::test]
async fn test_compact_truncate_keeps_last_n() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm);
let mut thread = make_thread(10);
assert_eq!(thread.turns.len(), 10);
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Truncate { keep_recent: 3 },
None,
)
.await
.expect("compact should succeed");
assert_eq!(thread.turns.len(), 3);
assert_eq!(thread.turns[0].user_input, "msg-7");
assert_eq!(thread.turns[1].user_input, "msg-8");
assert_eq!(thread.turns[2].user_input, "msg-9");
assert_eq!(thread.turns[0].turn_number, 0);
assert_eq!(thread.turns[1].turn_number, 1);
assert_eq!(thread.turns[2].turn_number, 2);
assert_eq!(result.turns_removed, 7);
assert!(!result.summary_written);
assert!(result.summary.is_none());
assert!(result.tokens_before > 0);
assert!(result.tokens_after > 0);
assert!(result.tokens_before > result.tokens_after);
}
#[tokio::test]
async fn test_compact_truncate_with_fewer_turns_than_limit() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm);
let mut thread = make_thread(2);
let original_inputs: Vec<String> =
thread.turns.iter().map(|t| t.user_input.clone()).collect();
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Truncate { keep_recent: 5 },
None,
)
.await
.expect("compact should succeed");
assert_eq!(thread.turns.len(), 2);
assert_eq!(thread.turns[0].user_input, original_inputs[0]);
assert_eq!(thread.turns[1].user_input, original_inputs[1]);
assert_eq!(result.turns_removed, 0);
assert!(!result.summary_written);
assert!(result.summary.is_none());
}
#[tokio::test]
async fn test_compact_truncate_empty_turns() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm);
let mut thread = Thread::new(Uuid::new_v4());
assert!(thread.turns.is_empty());
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Truncate { keep_recent: 3 },
None,
)
.await
.expect("compact should succeed on empty turns");
assert!(thread.turns.is_empty());
assert_eq!(result.turns_removed, 0);
assert_eq!(result.tokens_before, 0);
assert_eq!(result.tokens_after, 0);
}
#[tokio::test]
async fn test_compact_with_summary_produces_summary_turn() {
let canned_summary =
"- User greeted the agent\n- Agent responded warmly\n- Five exchanges completed";
let llm = Arc::new(StubLlm::new(canned_summary));
let compactor = make_compactor(llm.clone());
let mut thread = make_thread(5);
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Summarize { keep_recent: 2 },
None,
)
.await
.expect("compact with summary should succeed");
assert_eq!(thread.turns.len(), 2);
assert_eq!(thread.turns[0].user_input, "msg-3");
assert_eq!(thread.turns[1].user_input, "msg-4");
assert_eq!(result.turns_removed, 3);
assert!(result.summary.is_some());
let summary = result.summary.unwrap();
assert!(summary.contains("User greeted the agent"));
assert!(summary.contains("Five exchanges completed"));
assert!(!result.summary_written);
assert_eq!(llm.calls(), 1);
}
#[tokio::test]
async fn test_compact_with_summary_llm_failure() {
let llm = Arc::new(StubLlm::failing("broken-llm"));
let compactor = make_compactor(llm.clone());
let mut thread = make_thread(8);
let original_len = thread.turns.len();
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Summarize { keep_recent: 3 },
None,
)
.await;
assert!(result.is_err());
assert_eq!(thread.turns.len(), original_len);
}
#[tokio::test]
async fn test_compact_with_summary_fewer_turns_than_keep() {
let llm = Arc::new(StubLlm::new("should not be called"));
let compactor = make_compactor(llm.clone());
let mut thread = make_thread(3);
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Summarize { keep_recent: 5 },
None,
)
.await
.expect("compact should succeed");
assert_eq!(thread.turns.len(), 3);
assert_eq!(result.turns_removed, 0);
assert!(result.summary.is_none());
assert_eq!(llm.calls(), 0);
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_compact_with_summary_preserves_turns_when_workspace_write_fails() {
let llm = Arc::new(StubLlm::new("summary"));
let compactor = make_compactor(llm.clone());
let mut thread = make_thread(8);
let original_inputs: Vec<String> =
thread.turns.iter().map(|t| t.user_input.clone()).collect();
let workspace = make_unmigrated_workspace().await;
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Summarize { keep_recent: 3 },
Some(&workspace),
)
.await
.expect("compact should succeed even when workspace write fails");
assert_eq!(thread.turns.len(), 8);
assert_eq!(
thread
.turns
.iter()
.map(|t| t.user_input.as_str())
.collect::<Vec<_>>(),
original_inputs
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
);
assert_eq!(result.turns_removed, 0);
assert!(!result.summary_written);
assert_eq!(llm.calls(), 1);
}
#[tokio::test]
async fn test_compact_to_workspace_without_workspace_falls_back() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm);
let mut thread = make_thread(20);
let result = compactor
.compact(&mut thread, CompactionStrategy::MoveToWorkspace, None)
.await
.expect("compact should succeed");
assert_eq!(thread.turns.len(), 5);
assert_eq!(result.turns_removed, 15);
assert_eq!(thread.turns[0].user_input, "msg-15");
assert_eq!(thread.turns[4].user_input, "msg-19");
}
#[tokio::test]
async fn test_compact_to_workspace_fewer_turns_noop() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm);
let mut thread = make_thread(4);
let result = compactor
.compact(&mut thread, CompactionStrategy::MoveToWorkspace, None)
.await
.expect("compact should succeed");
assert_eq!(thread.turns.len(), 4);
assert_eq!(result.turns_removed, 0);
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_compact_to_workspace_preserves_turns_when_workspace_write_fails() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm.clone());
let mut thread = make_thread(20);
let original_inputs: Vec<String> =
thread.turns.iter().map(|t| t.user_input.clone()).collect();
let workspace = make_unmigrated_workspace().await;
let result = compactor
.compact(
&mut thread,
CompactionStrategy::MoveToWorkspace,
Some(&workspace),
)
.await
.expect("compact should succeed even when workspace write fails");
assert_eq!(thread.turns.len(), 20);
assert_eq!(
thread
.turns
.iter()
.map(|t| t.user_input.as_str())
.collect::<Vec<_>>(),
original_inputs
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
);
assert_eq!(result.turns_removed, 0);
assert!(!result.summary_written);
assert_eq!(llm.calls(), 0);
}
#[test]
fn test_format_turns_for_storage_with_tool_calls() {
let mut thread = Thread::new(Uuid::new_v4());
thread.start_turn("Search for X");
if let Some(turn) = thread.turns.last_mut() {
turn.record_tool_call("search", serde_json::json!({"query": "X"}));
}
thread.complete_turn("Found X");
let formatted = format_turns_for_storage(&thread.turns);
assert!(formatted.contains("Turn 1"));
assert!(formatted.contains("Search for X"));
assert!(formatted.contains("Found X"));
assert!(formatted.contains("Tools: search"));
}
#[test]
fn test_format_turns_for_storage_incomplete_turn() {
let mut thread = Thread::new(Uuid::new_v4());
thread.start_turn("In progress message");
let formatted = format_turns_for_storage(&thread.turns);
assert!(formatted.contains("Turn 1"));
assert!(formatted.contains("In progress message"));
assert!(!formatted.contains("Agent:"));
}
#[test]
fn test_format_turns_for_storage_empty() {
let formatted = format_turns_for_storage(&[]);
assert!(formatted.is_empty());
}
#[tokio::test]
async fn test_tokens_decrease_after_compaction() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm);
let mut thread = make_thread(20);
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Truncate { keep_recent: 5 },
None,
)
.await
.expect("compact should succeed");
assert!(
result.tokens_after < result.tokens_before,
"tokens_after ({}) should be less than tokens_before ({})",
result.tokens_after,
result.tokens_before
);
}
#[tokio::test]
async fn test_compact_truncate_keep_zero() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm);
let mut thread = make_thread(5);
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Truncate { keep_recent: 0 },
None,
)
.await
.expect("compact should succeed");
assert!(thread.turns.is_empty());
assert_eq!(result.turns_removed, 5);
assert_eq!(result.tokens_after, 0);
}
#[tokio::test]
async fn test_compact_with_summary_keep_zero() {
let llm = Arc::new(StubLlm::new("Summary of all turns"));
let compactor = make_compactor(llm.clone());
let mut thread = make_thread(5);
let result = compactor
.compact(
&mut thread,
CompactionStrategy::Summarize { keep_recent: 0 },
None,
)
.await
.expect("compact should succeed");
assert!(thread.turns.is_empty());
assert_eq!(result.turns_removed, 5);
assert!(result.summary.is_some());
assert_eq!(result.summary.unwrap(), "Summary of all turns");
assert_eq!(llm.calls(), 1);
}
#[tokio::test]
async fn test_messages_coherent_after_compaction() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm);
let mut thread = make_thread(10);
compactor
.compact(
&mut thread,
CompactionStrategy::Truncate { keep_recent: 3 },
None,
)
.await
.expect("compact should succeed");
let messages = thread.messages();
assert_eq!(messages.len(), 6);
for (i, msg) in messages.iter().enumerate() {
if i % 2 == 0 {
assert_eq!(msg.role, crate::llm::Role::User);
} else {
assert_eq!(msg.role, crate::llm::Role::Assistant);
}
}
assert_eq!(messages[0].content, "msg-7");
assert_eq!(messages[1].content, "resp-7");
assert_eq!(messages[4].content, "msg-9");
assert_eq!(messages[5].content, "resp-9");
}
#[tokio::test]
async fn test_sequential_compactions() {
let llm = Arc::new(StubLlm::new("unused"));
let compactor = make_compactor(llm);
let mut thread = make_thread(20);
let r1 = compactor
.compact(
&mut thread,
CompactionStrategy::Truncate { keep_recent: 10 },
None,
)
.await
.expect("first compact");
assert_eq!(thread.turns.len(), 10);
assert_eq!(r1.turns_removed, 10);
let r2 = compactor
.compact(
&mut thread,
CompactionStrategy::Truncate { keep_recent: 3 },
None,
)
.await
.expect("second compact");
assert_eq!(thread.turns.len(), 3);
assert_eq!(r2.turns_removed, 7);
assert_eq!(thread.turns[0].user_input, "msg-17");
assert_eq!(thread.turns[1].user_input, "msg-18");
assert_eq!(thread.turns[2].user_input, "msg-19");
}
}