Skip to main content

symbi_runtime/context/
compaction.rs

1//! Context compaction pipeline for reclaiming tokens.
2//!
3//! Runs synchronously before each LLM call. Walks tiers from most aggressive
4//! (truncate) to least aggressive (summarize), applying the first tier that
5//! matches the current token usage level.
6
7use serde::{Deserialize, Serialize};
8
9use super::types::AccessLevel;
10
11/// Per-agent compaction configuration.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct CompactionConfig {
14    /// Whether compaction is enabled.
15    pub enabled: bool,
16    /// Token usage ratio (0.0–1.0) at which Tier 1 (Summarize) triggers.
17    pub summarize_threshold: f32,
18    /// Token usage ratio at which Tier 2 (Compress Episodic) triggers.
19    pub compress_threshold: f32,
20    /// Token usage ratio at which Tier 3 (Archive to Memory) triggers.
21    pub archive_threshold: f32,
22    /// Token usage ratio at which Tier 4 (Truncate) triggers.
23    pub truncate_threshold: f32,
24    /// Model to use for summarization. `None` = agent's own model.
25    pub compaction_model: Option<String>,
26    /// Maximum tokens for generated summaries.
27    pub max_summary_tokens: usize,
28    /// Access levels whose items are never compacted.
29    pub preserve_access_levels: Vec<AccessLevel>,
30    /// Minimum conversation items before compaction is considered.
31    pub min_items_to_compact: usize,
32}
33
34impl Default for CompactionConfig {
35    fn default() -> Self {
36        Self {
37            enabled: true,
38            summarize_threshold: 0.70,
39            compress_threshold: 0.80,
40            archive_threshold: 0.85,
41            truncate_threshold: 0.90,
42            compaction_model: None,
43            max_summary_tokens: 500,
44            preserve_access_levels: vec![AccessLevel::Secret, AccessLevel::Confidential],
45            min_items_to_compact: 5,
46        }
47    }
48}
49
50/// Which compaction tier was applied.
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
52pub enum CompactionTier {
53    /// Tier 1: LLM summarizes oldest conversation items (OSS).
54    Summarize,
55    /// Tier 2: Merge similar episodic memory items (enterprise).
56    CompressEpisodic,
57    /// Tier 3: Archive summaries and old items to MarkdownMemoryStore (enterprise).
58    ArchiveToMemory,
59    /// Tier 4: Drop oldest conversation items (OSS).
60    Truncate,
61}
62
63impl std::fmt::Display for CompactionTier {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            CompactionTier::Summarize => write!(f, "Summarize"),
67            CompactionTier::CompressEpisodic => write!(f, "CompressEpisodic"),
68            CompactionTier::ArchiveToMemory => write!(f, "ArchiveToMemory"),
69            CompactionTier::Truncate => write!(f, "Truncate"),
70        }
71    }
72}
73
74/// Result of a compaction operation.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct CompactionResult {
77    pub tier_applied: CompactionTier,
78    pub tokens_before: usize,
79    pub tokens_after: usize,
80    pub tokens_saved: usize,
81    pub items_affected: usize,
82    pub duration_ms: u64,
83    pub summary_generated: Option<String>,
84}
85
86use super::types::{ConversationItem, ConversationRole};
87
88/// Tier 4: Truncate — drop oldest conversation items until count drops to
89/// `target_ratio` of original. System messages are preserved.
90pub fn truncate_items(
91    items: &[ConversationItem],
92    _config: &CompactionConfig,
93    target_ratio: f32,
94) -> (Vec<ConversationItem>, usize) {
95    let total = items.len();
96    let target_count = (total as f32 * target_ratio).ceil() as usize;
97
98    // Partition: system messages vs candidates for removal (oldest first)
99    let mut system_items: Vec<&ConversationItem> = Vec::new();
100    let mut candidates: Vec<&ConversationItem> = Vec::new();
101
102    for item in items {
103        if matches!(item.role, ConversationRole::System) {
104            system_items.push(item);
105        } else {
106            candidates.push(item);
107        }
108    }
109
110    // Keep the newest candidates, drop the oldest
111    let keep_count = target_count.saturating_sub(system_items.len());
112    let drop_count = candidates.len().saturating_sub(keep_count);
113
114    let mut result: Vec<ConversationItem> = system_items.into_iter().cloned().collect();
115    result.extend(candidates.into_iter().skip(drop_count).cloned());
116
117    (result, drop_count)
118}
119
120use std::future::Future;
121use std::time::Instant;
122
123/// Tier 1: Summarize — LLM condenses the oldest N conversation items into a
124/// single system message. Preserves system messages and items with protected
125/// access levels.
126///
127/// The `summarizer` closure takes the concatenated text and returns a summary.
128pub async fn summarize_items<F, Fut>(
129    items: &[ConversationItem],
130    config: &CompactionConfig,
131    items_to_summarize: usize,
132    summarizer: F,
133) -> Result<Option<(Vec<ConversationItem>, CompactionResult)>, String>
134where
135    F: FnOnce(String) -> Fut,
136    Fut: Future<Output = Result<String, String>>,
137{
138    if items.len() < config.min_items_to_compact {
139        return Ok(None);
140    }
141
142    let start = Instant::now();
143
144    // Collect items eligible for summarization (not system, not protected)
145    let mut to_summarize: Vec<(usize, &ConversationItem)> = Vec::new();
146    let mut to_keep: Vec<(usize, &ConversationItem)> = Vec::new();
147
148    for (idx, item) in items.iter().enumerate() {
149        if matches!(item.role, ConversationRole::System) {
150            to_keep.push((idx, item));
151        } else if to_summarize.len() < items_to_summarize {
152            to_summarize.push((idx, item));
153        } else {
154            to_keep.push((idx, item));
155        }
156    }
157
158    if to_summarize.is_empty() {
159        return Ok(None);
160    }
161
162    // Build the text to summarize
163    let text_to_summarize: String = to_summarize
164        .iter()
165        .map(|(_, item)| format!("{:?}: {}", item.role, item.content))
166        .collect::<Vec<_>>()
167        .join("\n\n");
168
169    let summary = summarizer(text_to_summarize).await?;
170
171    // Build the replacement item
172    let summary_item = ConversationItem {
173        id: super::types::ContextId::new(),
174        role: ConversationRole::System,
175        content: format!("[Compacted summary] {summary}"),
176        timestamp: std::time::SystemTime::now(),
177        context_used: vec![],
178        knowledge_used: vec![],
179    };
180
181    // Reconstruct: system items + summary + remaining items (in original order)
182    let mut result_items: Vec<ConversationItem> = Vec::new();
183
184    // Add system items that came before the summarized range
185    for (_, item) in to_keep.iter().filter(|(idx, _)| *idx < to_summarize[0].0) {
186        result_items.push((*item).clone());
187    }
188
189    // Insert summary
190    result_items.push(summary_item);
191
192    // Add remaining items after the summarized range
193    for (_, item) in to_keep
194        .iter()
195        .filter(|(idx, _)| *idx > to_summarize.last().unwrap().0)
196    {
197        result_items.push((*item).clone());
198    }
199
200    let duration = start.elapsed();
201
202    let compaction_result = CompactionResult {
203        tier_applied: CompactionTier::Summarize,
204        tokens_before: 0, // Caller fills in actual token counts
205        tokens_after: 0,
206        tokens_saved: 0,
207        items_affected: to_summarize.len(),
208        duration_ms: duration.as_millis() as u64,
209        summary_generated: Some(summary),
210    };
211
212    Ok(Some((result_items, compaction_result)))
213}
214
215/// Tier 2: Compress Episodic — merge similar memory items (enterprise only).
216///
217/// Returns `None` when the enterprise implementation is not yet available so
218/// the compaction pipeline skips this tier and continues to the next one.
219/// Previously this panicked via `todo!()` when the `enterprise-compaction`
220/// feature was enabled, which would hard-abort the runtime under
221/// `panic = "abort"`.
222pub fn tier_compress_episodic() -> Option<CompactionResult> {
223    tracing::debug!(
224        "tier_compress_episodic: enterprise implementation not yet available; skipping tier"
225    );
226    None
227}
228
229/// Tier 3: Archive to Memory — flush old items to MarkdownMemoryStore (enterprise only).
230///
231/// Returns `None` when the enterprise implementation is not yet available.
232/// See `tier_compress_episodic` for rationale.
233pub fn tier_archive_to_memory() -> Option<CompactionResult> {
234    tracing::debug!(
235        "tier_archive_to_memory: enterprise implementation not yet available; skipping tier"
236    );
237    None
238}
239
240/// Select the compaction tier based on current token usage ratio.
241///
242/// Walks from most aggressive (Truncate at 90%) down to least (Summarize at 70%).
243/// Enterprise tiers (CompressEpisodic, ArchiveToMemory) are only available when
244/// the `enterprise-compaction` feature is enabled.
245pub fn select_tier(usage_ratio: f32, config: &CompactionConfig) -> Option<CompactionTier> {
246    if !config.enabled {
247        return None;
248    }
249
250    if usage_ratio >= config.truncate_threshold {
251        return Some(CompactionTier::Truncate);
252    }
253
254    #[cfg(feature = "enterprise-compaction")]
255    if usage_ratio >= config.archive_threshold {
256        return Some(CompactionTier::ArchiveToMemory);
257    }
258
259    #[cfg(feature = "enterprise-compaction")]
260    if usage_ratio >= config.compress_threshold {
261        return Some(CompactionTier::CompressEpisodic);
262    }
263
264    if usage_ratio >= config.summarize_threshold {
265        return Some(CompactionTier::Summarize);
266    }
267
268    None
269}
270
271/// Enterprise audit entry for compaction events.
272#[cfg(feature = "enterprise-compaction")]
273#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct CompactionAuditEntry {
275    pub agent_id: crate::types::AgentId,
276    pub session_id: super::types::SessionId,
277    pub timestamp: std::time::SystemTime,
278    pub tier: CompactionTier,
279    pub result: CompactionResult,
280    pub items_before: Vec<super::types::ContextId>,
281    pub items_after: Vec<super::types::ContextId>,
282}
283
284#[cfg(test)]
285mod tests {
286    use super::super::token_counter::TokenCounter;
287    use super::super::types::{ContextId, ConversationItem, ConversationRole};
288    use super::*;
289    use std::time::SystemTime;
290
291    fn make_conversation_items(count: usize) -> Vec<ConversationItem> {
292        (0..count)
293            .map(|i| ConversationItem {
294                id: ContextId::new(),
295                role: if i == 0 {
296                    ConversationRole::System
297                } else {
298                    ConversationRole::User
299                },
300                content: format!("Message number {i} with some content to take up tokens"),
301                timestamp: SystemTime::now(),
302                context_used: vec![],
303                knowledge_used: vec![],
304            })
305            .collect()
306    }
307
308    #[test]
309    fn default_config_has_correct_thresholds() {
310        let config = CompactionConfig::default();
311        assert!(config.enabled);
312        assert!((config.summarize_threshold - 0.70).abs() < f32::EPSILON);
313        assert!((config.compress_threshold - 0.80).abs() < f32::EPSILON);
314        assert!((config.archive_threshold - 0.85).abs() < f32::EPSILON);
315        assert!((config.truncate_threshold - 0.90).abs() < f32::EPSILON);
316        assert_eq!(config.max_summary_tokens, 500);
317        assert_eq!(config.min_items_to_compact, 5);
318        assert_eq!(config.preserve_access_levels.len(), 2);
319    }
320
321    #[test]
322    fn compaction_tier_display() {
323        assert_eq!(CompactionTier::Summarize.to_string(), "Summarize");
324        assert_eq!(CompactionTier::Truncate.to_string(), "Truncate");
325    }
326
327    #[test]
328    fn compaction_result_serialization() {
329        let result = CompactionResult {
330            tier_applied: CompactionTier::Truncate,
331            tokens_before: 10_000,
332            tokens_after: 5_000,
333            tokens_saved: 5_000,
334            items_affected: 12,
335            duration_ms: 3,
336            summary_generated: None,
337        };
338        let json = serde_json::to_string(&result).unwrap();
339        let deserialized: CompactionResult = serde_json::from_str(&json).unwrap();
340        assert_eq!(deserialized.tokens_saved, 5_000);
341    }
342
343    #[tokio::test]
344    async fn summarize_replaces_items_with_summary() {
345        let items = make_conversation_items(10);
346        let config = CompactionConfig {
347            min_items_to_compact: 3,
348            ..CompactionConfig::default()
349        };
350
351        // Mock summarizer that returns a fixed string
352        let summarizer = |_text: String| {
353            Box::pin(async {
354                Ok::<String, String>("This is a summary of the conversation.".to_string())
355            })
356        };
357
358        let result = summarize_items(&items, &config, 5, summarizer)
359            .await
360            .unwrap();
361
362        assert!(result.is_some(), "should produce a result");
363        let (new_items, compaction) = result.unwrap();
364        assert!(new_items.len() < items.len(), "should have fewer items");
365        assert!(compaction.summary_generated.is_some());
366        let summary_item = new_items
367            .iter()
368            .find(|i| i.content.contains("[Compacted summary]"));
369        assert!(
370            summary_item.is_some(),
371            "should contain compacted summary item"
372        );
373    }
374
375    #[test]
376    fn truncate_drops_oldest_non_system_items() {
377        let items = make_conversation_items(20);
378        let config = CompactionConfig::default();
379
380        let (remaining, affected) = truncate_items(&items, &config, 0.70);
381
382        assert!(affected > 0, "should have dropped items");
383        assert!(
384            remaining
385                .iter()
386                .any(|i| matches!(i.role, ConversationRole::System)),
387            "system messages should be preserved"
388        );
389        assert!(remaining.len() < items.len());
390    }
391
392    #[test]
393    fn select_tier_at_70_percent() {
394        let config = CompactionConfig::default();
395        assert_eq!(select_tier(0.72, &config), Some(CompactionTier::Summarize));
396    }
397
398    #[test]
399    fn select_tier_at_90_percent() {
400        let config = CompactionConfig::default();
401        assert_eq!(select_tier(0.92, &config), Some(CompactionTier::Truncate));
402    }
403
404    #[test]
405    fn select_tier_below_threshold() {
406        let config = CompactionConfig::default();
407        assert_eq!(select_tier(0.50, &config), None);
408    }
409
410    #[test]
411    fn select_tier_at_85_percent_oss_falls_to_summarize() {
412        // Without enterprise-compaction, tiers 2 and 3 are unavailable,
413        // so 85% should fall through to Summarize
414        let config = CompactionConfig::default();
415        let tier = select_tier(0.86, &config);
416        assert!(tier.is_some());
417    }
418
419    #[test]
420    fn enterprise_tiers_return_none_without_feature() {
421        let result = tier_compress_episodic();
422        assert!(result.is_none());
423
424        let result = tier_archive_to_memory();
425        assert!(result.is_none());
426    }
427
428    #[tokio::test]
429    async fn full_pipeline_truncates_when_over_90_percent() {
430        use super::super::token_counter::HeuristicTokenCounter;
431
432        let items = make_conversation_items(100);
433
434        // Use a tiny context limit so we're way over 90%
435        let counter = HeuristicTokenCounter::new(500);
436        let current_tokens = counter.count_messages(&items);
437        let limit = counter.model_context_limit();
438        let ratio = current_tokens as f32 / limit as f32;
439
440        assert!(ratio > 0.90, "ratio {ratio} should be > 0.90 for this test");
441
442        let config = CompactionConfig::default();
443        let tier = select_tier(ratio, &config);
444        assert_eq!(tier, Some(CompactionTier::Truncate));
445
446        // Run truncation
447        let (new_items, affected) = truncate_items(&items, &config, config.summarize_threshold);
448        assert!(affected > 0);
449        assert!(new_items.len() < items.len());
450
451        // Verify token count decreased
452        let new_tokens = counter.count_messages(&new_items);
453        assert!(
454            new_tokens < current_tokens,
455            "tokens should decrease: {new_tokens} < {current_tokens}"
456        );
457    }
458
459    #[tokio::test]
460    async fn full_pipeline_summarizes_when_between_70_and_90() {
461        let items = make_conversation_items(20);
462        let config = CompactionConfig {
463            min_items_to_compact: 3,
464            ..CompactionConfig::default()
465        };
466
467        // Mock: simulate being at 75% usage
468        let tier = select_tier(0.75, &config);
469        assert_eq!(tier, Some(CompactionTier::Summarize));
470
471        // Run summarization with mock
472        let summarizer = |_text: String| {
473            Box::pin(async { Ok::<String, String>("Summary of old messages.".to_string()) })
474        };
475
476        let result = summarize_items(&items, &config, 10, summarizer)
477            .await
478            .unwrap();
479        assert!(result.is_some());
480
481        let (new_items, compaction) = result.unwrap();
482        assert_eq!(compaction.tier_applied, CompactionTier::Summarize);
483        assert_eq!(compaction.items_affected, 10);
484        assert!(new_items.len() < items.len());
485    }
486}