Skip to main content

meerkat_core/agent/
compact.rs

1//! Agent compaction — runs context compaction during the agent loop.
2//!
3//! Called from the agent state machine when a compactor is configured and
4//! the threshold is met.
5
6use crate::Session;
7use crate::compact::{
8    CompactionContext, Compactor, SESSION_COMPACTION_CADENCE_KEY, SessionCompactionCadence,
9};
10use crate::event::AgentEvent;
11#[cfg(target_arch = "wasm32")]
12use crate::tokio;
13use crate::types::{AssistantBlock, Message, Usage};
14use std::sync::Arc;
15use tokio::sync::mpsc;
16
17/// Errors that can occur during compaction.
18#[derive(Debug, thiserror::Error)]
19pub enum CompactionError {
20    /// The LLM call for summarization failed.
21    #[error("compaction LLM call failed: {0}")]
22    LlmFailed(#[from] crate::error::AgentError),
23
24    /// The LLM returned an empty summary.
25    #[error("LLM returned empty summary")]
26    EmptySummary,
27
28    /// Failed to estimate token count (serialization error).
29    #[error("token estimation failed: {0}")]
30    EstimationFailed(String),
31}
32
33/// Approximate token cost per image block.
34///
35/// Anthropic charges ~1600 tokens for a standard image regardless of
36/// resolution. Using a fixed estimate avoids counting raw base64 bytes
37/// (which inflate the estimate by ~200x).
38const IMAGE_TOKEN_ESTIMATE: u64 = 1_600;
39
40fn estimate_inline_video_tokens(data: &str) -> u64 {
41    let len = data.len() as u64;
42    if len > 0 { (len / 4).max(1) } else { 0 }
43}
44
45fn estimate_video_duration_tokens(duration_ms: u64) -> u64 {
46    // Rough default-resolution Gemini heuristic, scaled by caller-provided duration.
47    duration_ms.saturating_mul(300).div_ceil(1000)
48}
49
50/// Estimate token count from message history.
51///
52/// Text content uses `json_bytes / 4` as a rough heuristic.
53/// Image blocks use a fixed per-image estimate instead of serializing
54/// the base64 payload (which would massively overcount).
55pub fn estimate_tokens(messages: &[Message]) -> Result<u64, CompactionError> {
56    let mut tokens: u64 = 0;
57    for msg in messages {
58        match msg {
59            Message::User(u) => {
60                for block in &u.content {
61                    match block {
62                        crate::types::ContentBlock::Image { .. } => {
63                            tokens += IMAGE_TOKEN_ESTIMATE;
64                        }
65                        crate::types::ContentBlock::Video {
66                            duration_ms,
67                            data: crate::types::VideoData::Inline { data },
68                            ..
69                        } => {
70                            tokens += estimate_inline_video_tokens(data)
71                                .max(estimate_video_duration_tokens(*duration_ms));
72                        }
73                        _ => {
74                            let len = block.text_projection().len() as u64;
75                            tokens += if len > 0 { (len / 4).max(1) } else { 0 };
76                        }
77                    }
78                }
79            }
80            Message::ToolResults { results } => {
81                for r in results {
82                    for block in &r.content {
83                        match block {
84                            crate::types::ContentBlock::Image { .. } => {
85                                tokens += IMAGE_TOKEN_ESTIMATE;
86                            }
87                            crate::types::ContentBlock::Video {
88                                duration_ms,
89                                data: crate::types::VideoData::Inline { data },
90                                ..
91                            } => {
92                                tokens += estimate_inline_video_tokens(data)
93                                    .max(estimate_video_duration_tokens(*duration_ms));
94                            }
95                            _ => {
96                                let len = block.text_projection().len() as u64;
97                                tokens += if len > 0 { (len / 4).max(1) } else { 0 };
98                            }
99                        }
100                    }
101                }
102            }
103            // For assistant/system messages, serialize to JSON (no image blocks).
104            other => {
105                let json = serde_json::to_string(other)
106                    .map_err(|e| CompactionError::EstimationFailed(e.to_string()))?;
107                tokens += json.len() as u64 / 4;
108            }
109        }
110    }
111    Ok(tokens)
112}
113
114/// Build a `CompactionContext` from current agent state.
115///
116/// Falls back to 0 estimated tokens if serialization fails (non-fatal for
117/// context building — the should_compact check will use last_input_tokens).
118pub fn build_compaction_context(
119    messages: &[Message],
120    last_input_tokens: u64,
121    last_compaction_boundary_index: Option<u64>,
122    session_boundary_index: u64,
123) -> CompactionContext {
124    let estimated_history_tokens = match estimate_tokens(messages) {
125        Ok(tokens) => tokens,
126        Err(err) => {
127            tracing::warn!("failed to estimate history tokens for compaction context: {err}");
128            0
129        }
130    };
131
132    CompactionContext {
133        last_input_tokens,
134        message_count: messages.len(),
135        estimated_history_tokens,
136        last_compaction_boundary_index,
137        session_boundary_index,
138    }
139}
140
141/// Best-effort count of prior LLM boundaries for older sessions that do not
142/// yet carry explicit cadence metadata.
143fn infer_session_boundary_index(messages: &[Message]) -> u64 {
144    messages
145        .iter()
146        .filter(|message| matches!(message, Message::BlockAssistant(_) | Message::Assistant(_)))
147        .count() as u64
148}
149
150/// Load persisted compaction cadence from session metadata, falling back to
151/// transcript-derived history for pre-migration sessions.
152pub fn load_compaction_cadence(session: &Session) -> SessionCompactionCadence {
153    session
154        .metadata()
155        .get(SESSION_COMPACTION_CADENCE_KEY)
156        .and_then(|value| serde_json::from_value::<SessionCompactionCadence>(value.clone()).ok())
157        .unwrap_or_else(|| SessionCompactionCadence {
158            session_boundary_index: infer_session_boundary_index(session.messages()),
159            last_compaction_boundary_index: None,
160        })
161}
162
163/// Persist compaction cadence so reused/resumed sessions preserve their
164/// session-scoped compaction behavior.
165pub fn persist_compaction_cadence(
166    session: &mut Session,
167    cadence: &SessionCompactionCadence,
168) -> Result<(), serde_json::Error> {
169    let value = serde_json::to_value(cadence)?;
170    session.set_metadata(SESSION_COMPACTION_CADENCE_KEY, value);
171    Ok(())
172}
173
174/// Run the compaction flow.
175///
176/// 1. Emit CompactionStarted
177/// 2. Call LLM with compaction prompt
178/// 3. On failure: emit CompactionFailed, return error without mutating session
179/// 4. Rebuild history via compactor
180/// 5. Emit CompactionCompleted
181pub async fn run_compaction<C>(
182    client: &C,
183    compactor: &Arc<dyn Compactor>,
184    messages: &[Message],
185    last_input_tokens: u64,
186    session_boundary_index: u64,
187    event_tx: &Option<mpsc::Sender<AgentEvent>>,
188    event_tap: &crate::event_tap::EventTap,
189) -> Result<CompactionOutcome, CompactionError>
190where
191    C: crate::agent::AgentLlmClient + ?Sized,
192{
193    let estimated = estimate_tokens(messages)?;
194    let message_count = messages.len();
195    let mut event_stream_open = true;
196
197    // 1. Emit CompactionStarted
198    if event_stream_open
199        && !crate::event_tap::tap_emit(
200            event_tap,
201            event_tx.as_ref(),
202            AgentEvent::CompactionStarted {
203                input_tokens: last_input_tokens,
204                estimated_history_tokens: estimated,
205                message_count,
206            },
207        )
208        .await
209    {
210        event_stream_open = false;
211        tracing::warn!("compaction event stream receiver dropped before CompactionStarted");
212    }
213
214    // 2. Build the compaction prompt messages
215    let compaction_prompt = compactor.compaction_prompt();
216    let max_summary_tokens = compactor.max_summary_tokens();
217
218    let mut compaction_messages = compactor.prepare_for_summarization(messages);
219    compaction_messages.push(Message::User(crate::types::UserMessage::text(
220        compaction_prompt.to_string(),
221    )));
222
223    // 3. Call LLM with empty tools, max_summary_tokens
224    let llm_result = client
225        .stream_response(&compaction_messages, &[], max_summary_tokens, None, None)
226        .await;
227
228    let (summary_text, summary_usage) = match llm_result {
229        Ok(result) => {
230            // Extract summary text from response blocks
231            let mut summary = String::new();
232            for block in result.blocks() {
233                if let AssistantBlock::Text { text, .. } = block {
234                    summary.push_str(text);
235                }
236            }
237            if summary.is_empty() {
238                if event_stream_open
239                    && !crate::event_tap::tap_emit(
240                        event_tap,
241                        event_tx.as_ref(),
242                        AgentEvent::CompactionFailed {
243                            error: "LLM returned empty summary".to_string(),
244                        },
245                    )
246                    .await
247                {
248                    tracing::warn!(
249                        "compaction event stream receiver dropped before CompactionFailed"
250                    );
251                }
252                return Err(CompactionError::EmptySummary);
253            }
254            (summary, result.usage().clone())
255        }
256        Err(e) => {
257            if event_stream_open
258                && !crate::event_tap::tap_emit(
259                    event_tap,
260                    event_tx.as_ref(),
261                    AgentEvent::CompactionFailed {
262                        error: e.to_string(),
263                    },
264                )
265                .await
266            {
267                tracing::warn!("compaction event stream receiver dropped before CompactionFailed");
268            }
269            return Err(CompactionError::LlmFailed(e));
270        }
271    };
272
273    // 4. Rebuild history — extract system prompt from messages directly
274    let result = compactor.rebuild_history(messages, &summary_text);
275    let messages_after = result.messages.len();
276
277    // 5. Emit CompactionCompleted
278    if event_stream_open
279        && !crate::event_tap::tap_emit(
280            event_tap,
281            event_tx.as_ref(),
282            AgentEvent::CompactionCompleted {
283                summary_tokens: summary_usage.output_tokens,
284                messages_before: message_count,
285                messages_after,
286            },
287        )
288        .await
289    {
290        tracing::warn!("compaction event stream receiver dropped before CompactionCompleted");
291    }
292
293    Ok(CompactionOutcome {
294        new_messages: result.messages,
295        discarded: result.discarded,
296        summary_usage,
297        session_boundary_index,
298    })
299}
300
301/// Result of a successful compaction.
302pub struct CompactionOutcome {
303    /// New session messages to replace current history.
304    pub new_messages: Vec<Message>,
305    /// Messages that were discarded (for future memory indexing).
306    pub discarded: Vec<Message>,
307    /// Usage from the summary LLM call.
308    pub summary_usage: Usage,
309    /// Session boundary index at which compaction occurred.
310    pub session_boundary_index: u64,
311}
312
313#[cfg(test)]
314#[allow(clippy::unwrap_used, clippy::expect_used)]
315mod tests {
316    use super::*;
317    use crate::types::{
318        AssistantBlock, BlockAssistantMessage, ContentBlock, StopReason, ToolResult, Usage,
319        UserMessage, VideoData,
320    };
321
322    #[test]
323    fn load_compaction_cadence_infers_boundary_count_from_existing_history() {
324        let mut session = Session::new();
325        session.push(Message::User(UserMessage::text("first")));
326        session.push(Message::BlockAssistant(BlockAssistantMessage {
327            blocks: vec![AssistantBlock::Text {
328                text: "ok".to_string(),
329                meta: None,
330            }],
331            stop_reason: StopReason::EndTurn,
332        }));
333        session.push(Message::User(UserMessage::text("second")));
334        session.push(Message::BlockAssistant(BlockAssistantMessage {
335            blocks: vec![AssistantBlock::Text {
336                text: "done".to_string(),
337                meta: None,
338            }],
339            stop_reason: StopReason::EndTurn,
340        }));
341        session.record_usage(Usage::default());
342
343        let cadence = load_compaction_cadence(&session);
344        assert_eq!(cadence.session_boundary_index, 2);
345        assert_eq!(cadence.last_compaction_boundary_index, None);
346    }
347
348    #[test]
349    fn persisted_compaction_cadence_round_trips_through_session_metadata() {
350        let mut session = Session::new();
351        let cadence = SessionCompactionCadence {
352            session_boundary_index: 7,
353            last_compaction_boundary_index: Some(4),
354        };
355        persist_compaction_cadence(&mut session, &cadence).unwrap();
356
357        assert_eq!(load_compaction_cadence(&session), cadence);
358    }
359
360    #[test]
361    fn estimate_tokens_uses_video_heuristic_for_user_and_tool_results() {
362        let messages = vec![
363            Message::User(UserMessage::with_blocks(vec![ContentBlock::Video {
364                media_type: "video/mp4".to_string(),
365                duration_ms: 8_000,
366                data: VideoData::Inline {
367                    data: "A".repeat(8_000),
368                },
369            }])),
370            Message::ToolResults {
371                results: vec![ToolResult::with_blocks(
372                    "tool-1".to_string(),
373                    vec![ContentBlock::Video {
374                        media_type: "video/webm".to_string(),
375                        duration_ms: 4_000,
376                        data: VideoData::Inline {
377                            data: "B".repeat(4_000),
378                        },
379                    }],
380                    false,
381                )],
382            },
383        ];
384
385        assert_eq!(estimate_tokens(&messages).unwrap(), 3_600);
386    }
387}