1use crate::Session;
7use crate::compact::{
8 CompactionContext, Compactor, SESSION_COMPACTION_CADENCE_KEY, SessionCompactionCadence,
9};
10use crate::event::AgentEvent;
11#[cfg(target_arch = "wasm32")]
12use crate::tokio;
13use crate::types::{AssistantBlock, Message, Usage};
14use std::sync::Arc;
15use tokio::sync::mpsc;
16
17#[derive(Debug, thiserror::Error)]
19pub enum CompactionError {
20 #[error("compaction LLM call failed: {0}")]
22 LlmFailed(#[from] crate::error::AgentError),
23
24 #[error("LLM returned empty summary")]
26 EmptySummary,
27
28 #[error("token estimation failed: {0}")]
30 EstimationFailed(String),
31}
32
33const IMAGE_TOKEN_ESTIMATE: u64 = 1_600;
39
40fn estimate_inline_video_tokens(data: &str) -> u64 {
41 let len = data.len() as u64;
42 if len > 0 { (len / 4).max(1) } else { 0 }
43}
44
45fn estimate_video_duration_tokens(duration_ms: u64) -> u64 {
46 duration_ms.saturating_mul(300).div_ceil(1000)
48}
49
50pub fn estimate_tokens(messages: &[Message]) -> Result<u64, CompactionError> {
56 let mut tokens: u64 = 0;
57 for msg in messages {
58 match msg {
59 Message::User(u) => {
60 for block in &u.content {
61 match block {
62 crate::types::ContentBlock::Image { .. } => {
63 tokens += IMAGE_TOKEN_ESTIMATE;
64 }
65 crate::types::ContentBlock::Video {
66 duration_ms,
67 data: crate::types::VideoData::Inline { data },
68 ..
69 } => {
70 tokens += estimate_inline_video_tokens(data)
71 .max(estimate_video_duration_tokens(*duration_ms));
72 }
73 _ => {
74 let len = block.text_projection().len() as u64;
75 tokens += if len > 0 { (len / 4).max(1) } else { 0 };
76 }
77 }
78 }
79 }
80 Message::ToolResults { results } => {
81 for r in results {
82 for block in &r.content {
83 match block {
84 crate::types::ContentBlock::Image { .. } => {
85 tokens += IMAGE_TOKEN_ESTIMATE;
86 }
87 crate::types::ContentBlock::Video {
88 duration_ms,
89 data: crate::types::VideoData::Inline { data },
90 ..
91 } => {
92 tokens += estimate_inline_video_tokens(data)
93 .max(estimate_video_duration_tokens(*duration_ms));
94 }
95 _ => {
96 let len = block.text_projection().len() as u64;
97 tokens += if len > 0 { (len / 4).max(1) } else { 0 };
98 }
99 }
100 }
101 }
102 }
103 other => {
105 let json = serde_json::to_string(other)
106 .map_err(|e| CompactionError::EstimationFailed(e.to_string()))?;
107 tokens += json.len() as u64 / 4;
108 }
109 }
110 }
111 Ok(tokens)
112}
113
114pub fn build_compaction_context(
119 messages: &[Message],
120 last_input_tokens: u64,
121 last_compaction_boundary_index: Option<u64>,
122 session_boundary_index: u64,
123) -> CompactionContext {
124 let estimated_history_tokens = match estimate_tokens(messages) {
125 Ok(tokens) => tokens,
126 Err(err) => {
127 tracing::warn!("failed to estimate history tokens for compaction context: {err}");
128 0
129 }
130 };
131
132 CompactionContext {
133 last_input_tokens,
134 message_count: messages.len(),
135 estimated_history_tokens,
136 last_compaction_boundary_index,
137 session_boundary_index,
138 }
139}
140
141fn infer_session_boundary_index(messages: &[Message]) -> u64 {
144 messages
145 .iter()
146 .filter(|message| matches!(message, Message::BlockAssistant(_) | Message::Assistant(_)))
147 .count() as u64
148}
149
150pub fn load_compaction_cadence(session: &Session) -> SessionCompactionCadence {
153 session
154 .metadata()
155 .get(SESSION_COMPACTION_CADENCE_KEY)
156 .and_then(|value| serde_json::from_value::<SessionCompactionCadence>(value.clone()).ok())
157 .unwrap_or_else(|| SessionCompactionCadence {
158 session_boundary_index: infer_session_boundary_index(session.messages()),
159 last_compaction_boundary_index: None,
160 })
161}
162
163pub fn persist_compaction_cadence(
166 session: &mut Session,
167 cadence: &SessionCompactionCadence,
168) -> Result<(), serde_json::Error> {
169 let value = serde_json::to_value(cadence)?;
170 session.set_metadata(SESSION_COMPACTION_CADENCE_KEY, value);
171 Ok(())
172}
173
174pub async fn run_compaction<C>(
182 client: &C,
183 compactor: &Arc<dyn Compactor>,
184 messages: &[Message],
185 last_input_tokens: u64,
186 session_boundary_index: u64,
187 event_tx: &Option<mpsc::Sender<AgentEvent>>,
188 event_tap: &crate::event_tap::EventTap,
189) -> Result<CompactionOutcome, CompactionError>
190where
191 C: crate::agent::AgentLlmClient + ?Sized,
192{
193 let estimated = estimate_tokens(messages)?;
194 let message_count = messages.len();
195 let mut event_stream_open = true;
196
197 if event_stream_open
199 && !crate::event_tap::tap_emit(
200 event_tap,
201 event_tx.as_ref(),
202 AgentEvent::CompactionStarted {
203 input_tokens: last_input_tokens,
204 estimated_history_tokens: estimated,
205 message_count,
206 },
207 )
208 .await
209 {
210 event_stream_open = false;
211 tracing::warn!("compaction event stream receiver dropped before CompactionStarted");
212 }
213
214 let compaction_prompt = compactor.compaction_prompt();
216 let max_summary_tokens = compactor.max_summary_tokens();
217
218 let mut compaction_messages = compactor.prepare_for_summarization(messages);
219 compaction_messages.push(Message::User(crate::types::UserMessage::text(
220 compaction_prompt.to_string(),
221 )));
222
223 let llm_result = client
225 .stream_response(&compaction_messages, &[], max_summary_tokens, None, None)
226 .await;
227
228 let (summary_text, summary_usage) = match llm_result {
229 Ok(result) => {
230 let mut summary = String::new();
232 for block in result.blocks() {
233 if let AssistantBlock::Text { text, .. } = block {
234 summary.push_str(text);
235 }
236 }
237 if summary.is_empty() {
238 if event_stream_open
239 && !crate::event_tap::tap_emit(
240 event_tap,
241 event_tx.as_ref(),
242 AgentEvent::CompactionFailed {
243 error: "LLM returned empty summary".to_string(),
244 },
245 )
246 .await
247 {
248 tracing::warn!(
249 "compaction event stream receiver dropped before CompactionFailed"
250 );
251 }
252 return Err(CompactionError::EmptySummary);
253 }
254 (summary, result.usage().clone())
255 }
256 Err(e) => {
257 if event_stream_open
258 && !crate::event_tap::tap_emit(
259 event_tap,
260 event_tx.as_ref(),
261 AgentEvent::CompactionFailed {
262 error: e.to_string(),
263 },
264 )
265 .await
266 {
267 tracing::warn!("compaction event stream receiver dropped before CompactionFailed");
268 }
269 return Err(CompactionError::LlmFailed(e));
270 }
271 };
272
273 let result = compactor.rebuild_history(messages, &summary_text);
275 let messages_after = result.messages.len();
276
277 if event_stream_open
279 && !crate::event_tap::tap_emit(
280 event_tap,
281 event_tx.as_ref(),
282 AgentEvent::CompactionCompleted {
283 summary_tokens: summary_usage.output_tokens,
284 messages_before: message_count,
285 messages_after,
286 },
287 )
288 .await
289 {
290 tracing::warn!("compaction event stream receiver dropped before CompactionCompleted");
291 }
292
293 Ok(CompactionOutcome {
294 new_messages: result.messages,
295 discarded: result.discarded,
296 summary_usage,
297 session_boundary_index,
298 })
299}
300
301pub struct CompactionOutcome {
303 pub new_messages: Vec<Message>,
305 pub discarded: Vec<Message>,
307 pub summary_usage: Usage,
309 pub session_boundary_index: u64,
311}
312
313#[cfg(test)]
314#[allow(clippy::unwrap_used, clippy::expect_used)]
315mod tests {
316 use super::*;
317 use crate::types::{
318 AssistantBlock, BlockAssistantMessage, ContentBlock, StopReason, ToolResult, Usage,
319 UserMessage, VideoData,
320 };
321
322 #[test]
323 fn load_compaction_cadence_infers_boundary_count_from_existing_history() {
324 let mut session = Session::new();
325 session.push(Message::User(UserMessage::text("first")));
326 session.push(Message::BlockAssistant(BlockAssistantMessage {
327 blocks: vec![AssistantBlock::Text {
328 text: "ok".to_string(),
329 meta: None,
330 }],
331 stop_reason: StopReason::EndTurn,
332 }));
333 session.push(Message::User(UserMessage::text("second")));
334 session.push(Message::BlockAssistant(BlockAssistantMessage {
335 blocks: vec![AssistantBlock::Text {
336 text: "done".to_string(),
337 meta: None,
338 }],
339 stop_reason: StopReason::EndTurn,
340 }));
341 session.record_usage(Usage::default());
342
343 let cadence = load_compaction_cadence(&session);
344 assert_eq!(cadence.session_boundary_index, 2);
345 assert_eq!(cadence.last_compaction_boundary_index, None);
346 }
347
348 #[test]
349 fn persisted_compaction_cadence_round_trips_through_session_metadata() {
350 let mut session = Session::new();
351 let cadence = SessionCompactionCadence {
352 session_boundary_index: 7,
353 last_compaction_boundary_index: Some(4),
354 };
355 persist_compaction_cadence(&mut session, &cadence).unwrap();
356
357 assert_eq!(load_compaction_cadence(&session), cadence);
358 }
359
360 #[test]
361 fn estimate_tokens_uses_video_heuristic_for_user_and_tool_results() {
362 let messages = vec![
363 Message::User(UserMessage::with_blocks(vec![ContentBlock::Video {
364 media_type: "video/mp4".to_string(),
365 duration_ms: 8_000,
366 data: VideoData::Inline {
367 data: "A".repeat(8_000),
368 },
369 }])),
370 Message::ToolResults {
371 results: vec![ToolResult::with_blocks(
372 "tool-1".to_string(),
373 vec![ContentBlock::Video {
374 media_type: "video/webm".to_string(),
375 duration_ms: 4_000,
376 data: VideoData::Inline {
377 data: "B".repeat(4_000),
378 },
379 }],
380 false,
381 )],
382 },
383 ];
384
385 assert_eq!(estimate_tokens(&messages).unwrap(), 3_600);
386 }
387}