use crate::compact::{CompactionContext, Compactor};
use crate::event::AgentEvent;
#[cfg(target_arch = "wasm32")]
use crate::tokio;
use crate::types::{AssistantBlock, Message, Usage};
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(Debug, thiserror::Error)]
pub enum CompactionError {
#[error("compaction LLM call failed: {0}")]
LlmFailed(#[from] crate::error::AgentError),
#[error("LLM returned empty summary")]
EmptySummary,
#[error("token estimation failed: {0}")]
EstimationFailed(String),
}
pub fn estimate_tokens(messages: &[Message]) -> Result<u64, CompactionError> {
let json = serde_json::to_string(messages)
.map_err(|e| CompactionError::EstimationFailed(e.to_string()))?;
Ok(json.len() as u64 / 4)
}
pub fn build_compaction_context(
messages: &[Message],
last_input_tokens: u64,
last_compaction_turn: Option<u32>,
current_turn: u32,
) -> CompactionContext {
let estimated_history_tokens = match estimate_tokens(messages) {
Ok(tokens) => tokens,
Err(err) => {
tracing::warn!("failed to estimate history tokens for compaction context: {err}");
0
}
};
CompactionContext {
last_input_tokens,
message_count: messages.len(),
estimated_history_tokens,
last_compaction_turn,
current_turn,
}
}
pub async fn run_compaction<C>(
client: &C,
compactor: &Arc<dyn Compactor>,
messages: &[Message],
last_input_tokens: u64,
current_turn: u32,
event_tx: &Option<mpsc::Sender<AgentEvent>>,
event_tap: &crate::event_tap::EventTap,
) -> Result<CompactionOutcome, CompactionError>
where
C: crate::agent::AgentLlmClient + ?Sized,
{
let estimated = estimate_tokens(messages)?;
let message_count = messages.len();
let mut event_stream_open = true;
if event_stream_open
&& !crate::event_tap::tap_emit(
event_tap,
event_tx.as_ref(),
AgentEvent::CompactionStarted {
input_tokens: last_input_tokens,
estimated_history_tokens: estimated,
message_count,
},
)
.await
{
event_stream_open = false;
tracing::warn!("compaction event stream receiver dropped before CompactionStarted");
}
let compaction_prompt = compactor.compaction_prompt();
let max_summary_tokens = compactor.max_summary_tokens();
let mut compaction_messages = messages.to_vec();
compaction_messages.push(Message::User(crate::types::UserMessage {
content: compaction_prompt.to_string(),
}));
let llm_result = client
.stream_response(&compaction_messages, &[], max_summary_tokens, None, None)
.await;
let (summary_text, summary_usage) = match llm_result {
Ok(result) => {
let mut summary = String::new();
for block in result.blocks() {
if let AssistantBlock::Text { text, .. } = block {
summary.push_str(text);
}
}
if summary.is_empty() {
if event_stream_open
&& !crate::event_tap::tap_emit(
event_tap,
event_tx.as_ref(),
AgentEvent::CompactionFailed {
error: "LLM returned empty summary".to_string(),
},
)
.await
{
tracing::warn!(
"compaction event stream receiver dropped before CompactionFailed"
);
}
return Err(CompactionError::EmptySummary);
}
(summary, result.usage().clone())
}
Err(e) => {
if event_stream_open
&& !crate::event_tap::tap_emit(
event_tap,
event_tx.as_ref(),
AgentEvent::CompactionFailed {
error: e.to_string(),
},
)
.await
{
tracing::warn!("compaction event stream receiver dropped before CompactionFailed");
}
return Err(CompactionError::LlmFailed(e));
}
};
let result = compactor.rebuild_history(messages, &summary_text);
let messages_after = result.messages.len();
if event_stream_open
&& !crate::event_tap::tap_emit(
event_tap,
event_tx.as_ref(),
AgentEvent::CompactionCompleted {
summary_tokens: summary_usage.output_tokens,
messages_before: message_count,
messages_after,
},
)
.await
{
tracing::warn!("compaction event stream receiver dropped before CompactionCompleted");
}
Ok(CompactionOutcome {
new_messages: result.messages,
discarded: result.discarded,
summary_usage,
current_turn,
})
}
pub struct CompactionOutcome {
pub new_messages: Vec<Message>,
pub discarded: Vec<Message>,
pub summary_usage: Usage,
pub current_turn: u32,
}