Skip to main content

symbi_runtime/context/
compaction.rs

1//! Context compaction pipeline for reclaiming tokens.
2//!
3//! Runs synchronously before each LLM call. Walks tiers from most aggressive
4//! (truncate) to least aggressive (summarize), applying the first tier that
5//! matches the current token usage level.
6
7use serde::{Deserialize, Serialize};
8
9use super::types::AccessLevel;
10
11/// Per-agent compaction configuration.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct CompactionConfig {
14    /// Whether compaction is enabled.
15    pub enabled: bool,
16    /// Token usage ratio (0.0–1.0) at which Tier 1 (Summarize) triggers.
17    pub summarize_threshold: f32,
18    /// Token usage ratio at which Tier 2 (Compress Episodic) triggers.
19    pub compress_threshold: f32,
20    /// Token usage ratio at which Tier 3 (Archive to Memory) triggers.
21    pub archive_threshold: f32,
22    /// Token usage ratio at which Tier 4 (Truncate) triggers.
23    pub truncate_threshold: f32,
24    /// Model to use for summarization. `None` = agent's own model.
25    pub compaction_model: Option<String>,
26    /// Maximum tokens for generated summaries.
27    pub max_summary_tokens: usize,
28    /// Access levels whose items are never compacted.
29    pub preserve_access_levels: Vec<AccessLevel>,
30    /// Minimum conversation items before compaction is considered.
31    pub min_items_to_compact: usize,
32}
33
34impl Default for CompactionConfig {
35    fn default() -> Self {
36        Self {
37            enabled: true,
38            summarize_threshold: 0.70,
39            compress_threshold: 0.80,
40            archive_threshold: 0.85,
41            truncate_threshold: 0.90,
42            compaction_model: None,
43            max_summary_tokens: 500,
44            preserve_access_levels: vec![AccessLevel::Secret, AccessLevel::Confidential],
45            min_items_to_compact: 5,
46        }
47    }
48}
49
50/// Which compaction tier was applied.
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
52pub enum CompactionTier {
53    /// Tier 1: LLM summarizes oldest conversation items (OSS).
54    Summarize,
55    /// Tier 2: Merge similar episodic memory items (enterprise).
56    CompressEpisodic,
57    /// Tier 3: Archive summaries and old items to MarkdownMemoryStore (enterprise).
58    ArchiveToMemory,
59    /// Tier 4: Drop oldest conversation items (OSS).
60    Truncate,
61}
62
63impl std::fmt::Display for CompactionTier {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            CompactionTier::Summarize => write!(f, "Summarize"),
67            CompactionTier::CompressEpisodic => write!(f, "CompressEpisodic"),
68            CompactionTier::ArchiveToMemory => write!(f, "ArchiveToMemory"),
69            CompactionTier::Truncate => write!(f, "Truncate"),
70        }
71    }
72}
73
74/// Result of a compaction operation.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct CompactionResult {
77    pub tier_applied: CompactionTier,
78    pub tokens_before: usize,
79    pub tokens_after: usize,
80    pub tokens_saved: usize,
81    pub items_affected: usize,
82    pub duration_ms: u64,
83    pub summary_generated: Option<String>,
84}
85
86use super::types::{ConversationItem, ConversationRole};
87
88/// Tier 4: Truncate — drop oldest conversation items until count drops to
89/// `target_ratio` of original. System messages are preserved.
90pub fn truncate_items(
91    items: &[ConversationItem],
92    _config: &CompactionConfig,
93    target_ratio: f32,
94) -> (Vec<ConversationItem>, usize) {
95    let total = items.len();
96    let target_count = (total as f32 * target_ratio).ceil() as usize;
97
98    // Partition: system messages vs candidates for removal (oldest first)
99    let mut system_items: Vec<&ConversationItem> = Vec::new();
100    let mut candidates: Vec<&ConversationItem> = Vec::new();
101
102    for item in items {
103        if matches!(item.role, ConversationRole::System) {
104            system_items.push(item);
105        } else {
106            candidates.push(item);
107        }
108    }
109
110    // Keep the newest candidates, drop the oldest
111    let keep_count = target_count.saturating_sub(system_items.len());
112    let drop_count = candidates.len().saturating_sub(keep_count);
113
114    let mut result: Vec<ConversationItem> = system_items.into_iter().cloned().collect();
115    result.extend(candidates.into_iter().skip(drop_count).cloned());
116
117    (result, drop_count)
118}
119
120use std::future::Future;
121use std::time::Instant;
122
123/// Tier 1: Summarize — LLM condenses the oldest N conversation items into a
124/// single system message. Preserves system messages and items with protected
125/// access levels.
126///
127/// The `summarizer` closure takes the concatenated text and returns a summary.
128pub async fn summarize_items<F, Fut>(
129    items: &[ConversationItem],
130    config: &CompactionConfig,
131    items_to_summarize: usize,
132    summarizer: F,
133) -> Result<Option<(Vec<ConversationItem>, CompactionResult)>, String>
134where
135    F: FnOnce(String) -> Fut,
136    Fut: Future<Output = Result<String, String>>,
137{
138    if items.len() < config.min_items_to_compact {
139        return Ok(None);
140    }
141
142    let start = Instant::now();
143
144    // Collect items eligible for summarization (not system, not protected)
145    let mut to_summarize: Vec<(usize, &ConversationItem)> = Vec::new();
146    let mut to_keep: Vec<(usize, &ConversationItem)> = Vec::new();
147
148    for (idx, item) in items.iter().enumerate() {
149        if matches!(item.role, ConversationRole::System) {
150            to_keep.push((idx, item));
151        } else if to_summarize.len() < items_to_summarize {
152            to_summarize.push((idx, item));
153        } else {
154            to_keep.push((idx, item));
155        }
156    }
157
158    if to_summarize.is_empty() {
159        return Ok(None);
160    }
161
162    // Build the text to summarize
163    let text_to_summarize: String = to_summarize
164        .iter()
165        .map(|(_, item)| format!("{:?}: {}", item.role, item.content))
166        .collect::<Vec<_>>()
167        .join("\n\n");
168
169    let summary = summarizer(text_to_summarize).await?;
170
171    // Build the replacement item
172    let summary_item = ConversationItem {
173        id: super::types::ContextId::new(),
174        role: ConversationRole::System,
175        content: format!("[Compacted summary] {summary}"),
176        timestamp: std::time::SystemTime::now(),
177        context_used: vec![],
178        knowledge_used: vec![],
179    };
180
181    // Reconstruct: system items + summary + remaining items (in original order)
182    let mut result_items: Vec<ConversationItem> = Vec::new();
183
184    // Add system items that came before the summarized range
185    for (_, item) in to_keep.iter().filter(|(idx, _)| *idx < to_summarize[0].0) {
186        result_items.push((*item).clone());
187    }
188
189    // Insert summary
190    result_items.push(summary_item);
191
192    // Add remaining items after the summarized range
193    for (_, item) in to_keep
194        .iter()
195        .filter(|(idx, _)| *idx > to_summarize.last().unwrap().0)
196    {
197        result_items.push((*item).clone());
198    }
199
200    let duration = start.elapsed();
201
202    let compaction_result = CompactionResult {
203        tier_applied: CompactionTier::Summarize,
204        tokens_before: 0, // Caller fills in actual token counts
205        tokens_after: 0,
206        tokens_saved: 0,
207        items_affected: to_summarize.len(),
208        duration_ms: duration.as_millis() as u64,
209        summary_generated: Some(summary),
210    };
211
212    Ok(Some((result_items, compaction_result)))
213}
214
215/// Tier 2: Compress Episodic — merge similar memory items (enterprise only).
216#[cfg(feature = "enterprise-compaction")]
217pub fn tier_compress_episodic() -> Option<CompactionResult> {
218    todo!("enterprise: compress episodic memory items by cosine similarity")
219}
220
221#[cfg(not(feature = "enterprise-compaction"))]
222pub fn tier_compress_episodic() -> Option<CompactionResult> {
223    None
224}
225
226/// Tier 3: Archive to Memory — flush old items to MarkdownMemoryStore (enterprise only).
227#[cfg(feature = "enterprise-compaction")]
228pub fn tier_archive_to_memory() -> Option<CompactionResult> {
229    todo!("enterprise: archive items to MarkdownMemoryStore daily log")
230}
231
232#[cfg(not(feature = "enterprise-compaction"))]
233pub fn tier_archive_to_memory() -> Option<CompactionResult> {
234    None
235}
236
237/// Select the compaction tier based on current token usage ratio.
238///
239/// Walks from most aggressive (Truncate at 90%) down to least (Summarize at 70%).
240/// Enterprise tiers (CompressEpisodic, ArchiveToMemory) are only available when
241/// the `enterprise-compaction` feature is enabled.
242pub fn select_tier(usage_ratio: f32, config: &CompactionConfig) -> Option<CompactionTier> {
243    if !config.enabled {
244        return None;
245    }
246
247    if usage_ratio >= config.truncate_threshold {
248        return Some(CompactionTier::Truncate);
249    }
250
251    #[cfg(feature = "enterprise-compaction")]
252    if usage_ratio >= config.archive_threshold {
253        return Some(CompactionTier::ArchiveToMemory);
254    }
255
256    #[cfg(feature = "enterprise-compaction")]
257    if usage_ratio >= config.compress_threshold {
258        return Some(CompactionTier::CompressEpisodic);
259    }
260
261    if usage_ratio >= config.summarize_threshold {
262        return Some(CompactionTier::Summarize);
263    }
264
265    None
266}
267
268/// Enterprise audit entry for compaction events.
269#[cfg(feature = "enterprise-compaction")]
270#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct CompactionAuditEntry {
272    pub agent_id: crate::types::AgentId,
273    pub session_id: super::types::SessionId,
274    pub timestamp: std::time::SystemTime,
275    pub tier: CompactionTier,
276    pub result: CompactionResult,
277    pub items_before: Vec<super::types::ContextId>,
278    pub items_after: Vec<super::types::ContextId>,
279}
280
281#[cfg(test)]
282mod tests {
283    use super::super::token_counter::TokenCounter;
284    use super::super::types::{ContextId, ConversationItem, ConversationRole};
285    use super::*;
286    use std::time::SystemTime;
287
288    fn make_conversation_items(count: usize) -> Vec<ConversationItem> {
289        (0..count)
290            .map(|i| ConversationItem {
291                id: ContextId::new(),
292                role: if i == 0 {
293                    ConversationRole::System
294                } else {
295                    ConversationRole::User
296                },
297                content: format!("Message number {i} with some content to take up tokens"),
298                timestamp: SystemTime::now(),
299                context_used: vec![],
300                knowledge_used: vec![],
301            })
302            .collect()
303    }
304
305    #[test]
306    fn default_config_has_correct_thresholds() {
307        let config = CompactionConfig::default();
308        assert!(config.enabled);
309        assert!((config.summarize_threshold - 0.70).abs() < f32::EPSILON);
310        assert!((config.compress_threshold - 0.80).abs() < f32::EPSILON);
311        assert!((config.archive_threshold - 0.85).abs() < f32::EPSILON);
312        assert!((config.truncate_threshold - 0.90).abs() < f32::EPSILON);
313        assert_eq!(config.max_summary_tokens, 500);
314        assert_eq!(config.min_items_to_compact, 5);
315        assert_eq!(config.preserve_access_levels.len(), 2);
316    }
317
318    #[test]
319    fn compaction_tier_display() {
320        assert_eq!(CompactionTier::Summarize.to_string(), "Summarize");
321        assert_eq!(CompactionTier::Truncate.to_string(), "Truncate");
322    }
323
324    #[test]
325    fn compaction_result_serialization() {
326        let result = CompactionResult {
327            tier_applied: CompactionTier::Truncate,
328            tokens_before: 10_000,
329            tokens_after: 5_000,
330            tokens_saved: 5_000,
331            items_affected: 12,
332            duration_ms: 3,
333            summary_generated: None,
334        };
335        let json = serde_json::to_string(&result).unwrap();
336        let deserialized: CompactionResult = serde_json::from_str(&json).unwrap();
337        assert_eq!(deserialized.tokens_saved, 5_000);
338    }
339
340    #[tokio::test]
341    async fn summarize_replaces_items_with_summary() {
342        let items = make_conversation_items(10);
343        let config = CompactionConfig {
344            min_items_to_compact: 3,
345            ..CompactionConfig::default()
346        };
347
348        // Mock summarizer that returns a fixed string
349        let summarizer = |_text: String| {
350            Box::pin(async {
351                Ok::<String, String>("This is a summary of the conversation.".to_string())
352            })
353        };
354
355        let result = summarize_items(&items, &config, 5, summarizer)
356            .await
357            .unwrap();
358
359        assert!(result.is_some(), "should produce a result");
360        let (new_items, compaction) = result.unwrap();
361        assert!(new_items.len() < items.len(), "should have fewer items");
362        assert!(compaction.summary_generated.is_some());
363        let summary_item = new_items
364            .iter()
365            .find(|i| i.content.contains("[Compacted summary]"));
366        assert!(
367            summary_item.is_some(),
368            "should contain compacted summary item"
369        );
370    }
371
372    #[test]
373    fn truncate_drops_oldest_non_system_items() {
374        let items = make_conversation_items(20);
375        let config = CompactionConfig::default();
376
377        let (remaining, affected) = truncate_items(&items, &config, 0.70);
378
379        assert!(affected > 0, "should have dropped items");
380        assert!(
381            remaining
382                .iter()
383                .any(|i| matches!(i.role, ConversationRole::System)),
384            "system messages should be preserved"
385        );
386        assert!(remaining.len() < items.len());
387    }
388
389    #[test]
390    fn select_tier_at_70_percent() {
391        let config = CompactionConfig::default();
392        assert_eq!(select_tier(0.72, &config), Some(CompactionTier::Summarize));
393    }
394
395    #[test]
396    fn select_tier_at_90_percent() {
397        let config = CompactionConfig::default();
398        assert_eq!(select_tier(0.92, &config), Some(CompactionTier::Truncate));
399    }
400
401    #[test]
402    fn select_tier_below_threshold() {
403        let config = CompactionConfig::default();
404        assert_eq!(select_tier(0.50, &config), None);
405    }
406
407    #[test]
408    fn select_tier_at_85_percent_oss_falls_to_summarize() {
409        // Without enterprise-compaction, tiers 2 and 3 are unavailable,
410        // so 85% should fall through to Summarize
411        let config = CompactionConfig::default();
412        let tier = select_tier(0.86, &config);
413        assert!(tier.is_some());
414    }
415
416    #[test]
417    fn enterprise_tiers_return_none_without_feature() {
418        let result = tier_compress_episodic();
419        assert!(result.is_none());
420
421        let result = tier_archive_to_memory();
422        assert!(result.is_none());
423    }
424
425    #[tokio::test]
426    async fn full_pipeline_truncates_when_over_90_percent() {
427        use super::super::token_counter::HeuristicTokenCounter;
428
429        let items = make_conversation_items(100);
430
431        // Use a tiny context limit so we're way over 90%
432        let counter = HeuristicTokenCounter::new(500);
433        let current_tokens = counter.count_messages(&items);
434        let limit = counter.model_context_limit();
435        let ratio = current_tokens as f32 / limit as f32;
436
437        assert!(ratio > 0.90, "ratio {ratio} should be > 0.90 for this test");
438
439        let config = CompactionConfig::default();
440        let tier = select_tier(ratio, &config);
441        assert_eq!(tier, Some(CompactionTier::Truncate));
442
443        // Run truncation
444        let (new_items, affected) = truncate_items(&items, &config, config.summarize_threshold);
445        assert!(affected > 0);
446        assert!(new_items.len() < items.len());
447
448        // Verify token count decreased
449        let new_tokens = counter.count_messages(&new_items);
450        assert!(
451            new_tokens < current_tokens,
452            "tokens should decrease: {new_tokens} < {current_tokens}"
453        );
454    }
455
456    #[tokio::test]
457    async fn full_pipeline_summarizes_when_between_70_and_90() {
458        let items = make_conversation_items(20);
459        let config = CompactionConfig {
460            min_items_to_compact: 3,
461            ..CompactionConfig::default()
462        };
463
464        // Mock: simulate being at 75% usage
465        let tier = select_tier(0.75, &config);
466        assert_eq!(tier, Some(CompactionTier::Summarize));
467
468        // Run summarization with mock
469        let summarizer = |_text: String| {
470            Box::pin(async { Ok::<String, String>("Summary of old messages.".to_string()) })
471        };
472
473        let result = summarize_items(&items, &config, 10, summarizer)
474            .await
475            .unwrap();
476        assert!(result.is_some());
477
478        let (new_items, compaction) = result.unwrap();
479        assert_eq!(compaction.tier_applied, CompactionTier::Summarize);
480        assert_eq!(compaction.items_affected, 10);
481        assert!(new_items.len() < items.len());
482    }
483}