1use crate::Session;
7use crate::compact::{
8 CompactionContext, Compactor, SESSION_COMPACTION_CADENCE_KEY, SessionCompactionCadence,
9};
10use crate::event::AgentEvent;
11#[cfg(target_arch = "wasm32")]
12use crate::tokio;
13use crate::types::{AssistantBlock, Message, Usage};
14use std::sync::Arc;
15use tokio::sync::mpsc;
16
17#[derive(Debug, thiserror::Error)]
19pub enum CompactionError {
20 #[error("compaction LLM call failed: {0}")]
22 LlmFailed(#[from] crate::error::AgentError),
23
24 #[error("LLM returned empty summary")]
26 EmptySummary,
27
28 #[error("token estimation failed: {0}")]
30 EstimationFailed(String),
31}
32
33const IMAGE_TOKEN_ESTIMATE: u64 = 1_600;
39
40fn estimate_inline_video_tokens(data: &str) -> u64 {
41 let len = data.len() as u64;
42 if len > 0 { (len / 4).max(1) } else { 0 }
43}
44
45fn estimate_video_duration_tokens(duration_ms: u64) -> u64 {
46 duration_ms.saturating_mul(300).div_ceil(1000)
48}
49
50pub fn estimate_tokens(messages: &[Message]) -> Result<u64, CompactionError> {
56 let mut tokens: u64 = 0;
57 for msg in messages {
58 match msg {
59 Message::User(u) => {
60 for block in &u.content {
61 match block {
62 crate::types::ContentBlock::Image { .. } => {
63 tokens += IMAGE_TOKEN_ESTIMATE;
64 }
65 crate::types::ContentBlock::Video {
66 duration_ms,
67 data: crate::types::VideoData::Inline { data },
68 ..
69 } => {
70 tokens += estimate_inline_video_tokens(data)
71 .max(estimate_video_duration_tokens(*duration_ms));
72 }
73 _ => {
74 let len = block.text_projection().len() as u64;
75 tokens += if len > 0 { (len / 4).max(1) } else { 0 };
76 }
77 }
78 }
79 }
80 Message::ToolResults { results, .. } => {
81 for r in results {
82 for block in &r.content {
83 match block {
84 crate::types::ContentBlock::Image { .. } => {
85 tokens += IMAGE_TOKEN_ESTIMATE;
86 }
87 crate::types::ContentBlock::Video {
88 duration_ms,
89 data: crate::types::VideoData::Inline { data },
90 ..
91 } => {
92 tokens += estimate_inline_video_tokens(data)
93 .max(estimate_video_duration_tokens(*duration_ms));
94 }
95 _ => {
96 let len = block.text_projection().len() as u64;
97 tokens += if len > 0 { (len / 4).max(1) } else { 0 };
98 }
99 }
100 }
101 }
102 }
103 other => {
105 let json = serde_json::to_string(other)
106 .map_err(|e| CompactionError::EstimationFailed(e.to_string()))?;
107 tokens += json.len() as u64 / 4;
108 }
109 }
110 }
111 Ok(tokens)
112}
113
114pub fn build_compaction_context(
119 messages: &[Message],
120 last_input_tokens: u64,
121 last_compaction_boundary_index: Option<u64>,
122 session_boundary_index: u64,
123) -> CompactionContext {
124 let estimated_history_tokens = match estimate_tokens(messages) {
125 Ok(tokens) => tokens,
126 Err(err) => {
127 tracing::warn!("failed to estimate history tokens for compaction context: {err}");
128 0
129 }
130 };
131
132 CompactionContext {
133 last_input_tokens,
134 message_count: messages.len(),
135 estimated_history_tokens,
136 last_compaction_boundary_index,
137 session_boundary_index,
138 }
139}
140
141fn infer_session_boundary_index(messages: &[Message]) -> u64 {
144 messages
145 .iter()
146 .filter(|message| matches!(message, Message::BlockAssistant(_) | Message::Assistant(_)))
147 .count() as u64
148}
149
150pub fn load_compaction_cadence(session: &Session) -> SessionCompactionCadence {
153 session
154 .metadata()
155 .get(SESSION_COMPACTION_CADENCE_KEY)
156 .and_then(|value| serde_json::from_value::<SessionCompactionCadence>(value.clone()).ok())
157 .unwrap_or_else(|| SessionCompactionCadence {
158 session_boundary_index: infer_session_boundary_index(session.messages()),
159 last_compaction_boundary_index: None,
160 })
161}
162
163pub fn persist_compaction_cadence(
166 session: &mut Session,
167 cadence: &SessionCompactionCadence,
168) -> Result<(), serde_json::Error> {
169 let value = serde_json::to_value(cadence)?;
170 session.set_metadata(SESSION_COMPACTION_CADENCE_KEY, value);
171 Ok(())
172}
173
174pub async fn run_compaction<C>(
182 client: &C,
183 compactor: &Arc<dyn Compactor>,
184 messages: &[Message],
185 last_input_tokens: u64,
186 session_boundary_index: u64,
187 event_tx: &Option<mpsc::Sender<AgentEvent>>,
188 event_tap: &crate::event_tap::EventTap,
189) -> Result<CompactionOutcome, CompactionError>
190where
191 C: crate::agent::AgentLlmClient + ?Sized,
192{
193 let estimated = estimate_tokens(messages)?;
194 let message_count = messages.len();
195 let mut event_stream_open = true;
196
197 if event_stream_open
199 && !crate::event_tap::tap_emit(
200 event_tap,
201 event_tx.as_ref(),
202 AgentEvent::CompactionStarted {
203 input_tokens: last_input_tokens,
204 estimated_history_tokens: estimated,
205 message_count,
206 },
207 )
208 .await
209 {
210 event_stream_open = false;
211 tracing::warn!("compaction event stream receiver dropped before CompactionStarted");
212 }
213
214 let compaction_prompt = compactor.compaction_prompt();
216 let max_summary_tokens = compactor.max_summary_tokens();
217
218 let mut compaction_messages = compactor.prepare_for_summarization(messages);
219 compaction_messages.push(Message::User(crate::types::UserMessage::text(
220 compaction_prompt.to_string(),
221 )));
222
223 let llm_result = client
225 .stream_response(&compaction_messages, &[], max_summary_tokens, None, None)
226 .await;
227
228 let (summary_text, summary_usage) = match llm_result {
229 Ok(result) => {
230 let mut summary = String::new();
232 for block in result.blocks() {
233 if let AssistantBlock::Text { text, .. } = block {
234 summary.push_str(text);
235 }
236 }
237 if summary.is_empty() {
238 if event_stream_open
239 && !crate::event_tap::tap_emit(
240 event_tap,
241 event_tx.as_ref(),
242 AgentEvent::CompactionFailed {
243 error: "LLM returned empty summary".to_string(),
244 },
245 )
246 .await
247 {
248 tracing::warn!(
249 "compaction event stream receiver dropped before CompactionFailed"
250 );
251 }
252 return Err(CompactionError::EmptySummary);
253 }
254 (summary, result.usage().clone())
255 }
256 Err(e) => {
257 if event_stream_open
258 && !crate::event_tap::tap_emit(
259 event_tap,
260 event_tx.as_ref(),
261 AgentEvent::CompactionFailed {
262 error: e.to_string(),
263 },
264 )
265 .await
266 {
267 tracing::warn!("compaction event stream receiver dropped before CompactionFailed");
268 }
269 return Err(CompactionError::LlmFailed(e));
270 }
271 };
272
273 let result = compactor.rebuild_history(messages, &summary_text);
275 let messages_after = result.messages.len();
276
277 Ok(CompactionOutcome {
278 new_messages: result.messages,
279 discarded: result.discarded,
280 summary_usage,
281 session_boundary_index,
282 messages_before: message_count,
283 messages_after,
284 })
285}
286
287pub struct CompactionOutcome {
289 pub new_messages: Vec<Message>,
291 pub discarded: Vec<Message>,
293 pub summary_usage: Usage,
295 pub session_boundary_index: u64,
297 pub messages_before: usize,
299 pub messages_after: usize,
301}
302
303#[cfg(test)]
304#[allow(clippy::unwrap_used, clippy::expect_used)]
305mod tests {
306 use super::*;
307 use crate::types::{
308 AssistantBlock, BlockAssistantMessage, ContentBlock, StopReason, ToolResult, Usage,
309 UserMessage, VideoData,
310 };
311
312 #[test]
313 fn load_compaction_cadence_infers_boundary_count_from_existing_history() {
314 let mut session = Session::new();
315 session.push(Message::User(UserMessage::text("first")));
316 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
317 vec![AssistantBlock::Text {
318 text: "ok".to_string(),
319 meta: None,
320 }],
321 StopReason::EndTurn,
322 )));
323 session.push(Message::User(UserMessage::text("second")));
324 session.push(Message::BlockAssistant(BlockAssistantMessage::new(
325 vec![AssistantBlock::Text {
326 text: "done".to_string(),
327 meta: None,
328 }],
329 StopReason::EndTurn,
330 )));
331 session.record_usage(Usage::default());
332
333 let cadence = load_compaction_cadence(&session);
334 assert_eq!(cadence.session_boundary_index, 2);
335 assert_eq!(cadence.last_compaction_boundary_index, None);
336 }
337
338 #[test]
339 fn persisted_compaction_cadence_round_trips_through_session_metadata() {
340 let mut session = Session::new();
341 let cadence = SessionCompactionCadence {
342 session_boundary_index: 7,
343 last_compaction_boundary_index: Some(4),
344 };
345 persist_compaction_cadence(&mut session, &cadence).unwrap();
346
347 assert_eq!(load_compaction_cadence(&session), cadence);
348 }
349
350 #[test]
351 fn estimate_tokens_uses_video_heuristic_for_user_and_tool_results() {
352 let messages = vec![
353 Message::User(UserMessage::with_blocks(vec![ContentBlock::Video {
354 media_type: "video/mp4".to_string(),
355 duration_ms: 8_000,
356 data: VideoData::Inline {
357 data: "A".repeat(8_000),
358 },
359 }])),
360 Message::tool_results(vec![ToolResult::with_blocks(
361 "tool-1".to_string(),
362 vec![ContentBlock::Video {
363 media_type: "video/webm".to_string(),
364 duration_ms: 4_000,
365 data: VideoData::Inline {
366 data: "B".repeat(4_000),
367 },
368 }],
369 false,
370 )]),
371 ];
372
373 assert_eq!(estimate_tokens(&messages).unwrap(), 3_600);
374 }
375}