Skip to main content

agentkit_compaction/
lib.rs

1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use agentkit_core::{Item, ItemKind, MetadataMap, Part, SessionId, TurnCancellation, TurnId};
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
10pub enum CompactionReason {
11    TranscriptTooLong,
12    Manual,
13    Custom(String),
14}
15
16#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
17pub struct CompactionRequest {
18    pub session_id: SessionId,
19    pub turn_id: Option<TurnId>,
20    pub transcript: Vec<Item>,
21    pub reason: CompactionReason,
22    pub metadata: MetadataMap,
23}
24
25#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
26pub struct CompactionResult {
27    pub transcript: Vec<Item>,
28    pub replaced_items: usize,
29    pub metadata: MetadataMap,
30}
31
32#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
33pub struct SummaryRequest {
34    pub session_id: SessionId,
35    pub turn_id: Option<TurnId>,
36    pub items: Vec<Item>,
37    pub reason: CompactionReason,
38    pub metadata: MetadataMap,
39}
40
41#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
42pub struct SummaryResult {
43    pub items: Vec<Item>,
44    pub metadata: MetadataMap,
45}
46
47pub trait CompactionTrigger: Send + Sync {
48    fn should_compact(
49        &self,
50        session_id: &SessionId,
51        turn_id: Option<&TurnId>,
52        transcript: &[Item],
53    ) -> Option<CompactionReason>;
54}
55
56#[async_trait]
57pub trait CompactionBackend: Send + Sync {
58    async fn summarize(
59        &self,
60        request: SummaryRequest,
61        cancellation: Option<TurnCancellation>,
62    ) -> Result<SummaryResult, CompactionError>;
63}
64
65pub struct CompactionContext<'a> {
66    pub backend: Option<&'a dyn CompactionBackend>,
67    pub metadata: &'a MetadataMap,
68    pub cancellation: Option<TurnCancellation>,
69}
70
71#[async_trait]
72pub trait CompactionStrategy: Send + Sync {
73    async fn apply(
74        &self,
75        request: CompactionRequest,
76        ctx: &mut CompactionContext<'_>,
77    ) -> Result<CompactionResult, CompactionError>;
78}
79
80#[derive(Clone)]
81pub struct CompactionConfig {
82    pub trigger: Arc<dyn CompactionTrigger>,
83    pub strategy: Arc<dyn CompactionStrategy>,
84    pub backend: Option<Arc<dyn CompactionBackend>>,
85    pub metadata: MetadataMap,
86}
87
88impl CompactionConfig {
89    pub fn new(
90        trigger: impl CompactionTrigger + 'static,
91        strategy: impl CompactionStrategy + 'static,
92    ) -> Self {
93        Self {
94            trigger: Arc::new(trigger),
95            strategy: Arc::new(strategy),
96            backend: None,
97            metadata: MetadataMap::new(),
98        }
99    }
100
101    pub fn with_backend(mut self, backend: impl CompactionBackend + 'static) -> Self {
102        self.backend = Some(Arc::new(backend));
103        self
104    }
105
106    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
107        self.metadata = metadata;
108        self
109    }
110}
111
112#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
113pub struct ItemCountTrigger {
114    pub max_items: usize,
115}
116
117impl ItemCountTrigger {
118    pub fn new(max_items: usize) -> Self {
119        Self { max_items }
120    }
121}
122
123impl CompactionTrigger for ItemCountTrigger {
124    fn should_compact(
125        &self,
126        _session_id: &SessionId,
127        _turn_id: Option<&TurnId>,
128        transcript: &[Item],
129    ) -> Option<CompactionReason> {
130        (transcript.len() > self.max_items).then_some(CompactionReason::TranscriptTooLong)
131    }
132}
133
134#[derive(Clone, Default)]
135pub struct CompactionPipeline {
136    strategies: Vec<Arc<dyn CompactionStrategy>>,
137}
138
139impl CompactionPipeline {
140    pub fn new() -> Self {
141        Self::default()
142    }
143
144    pub fn with_strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
145        self.strategies.push(Arc::new(strategy));
146        self
147    }
148}
149
150#[async_trait]
151impl CompactionStrategy for CompactionPipeline {
152    async fn apply(
153        &self,
154        mut request: CompactionRequest,
155        ctx: &mut CompactionContext<'_>,
156    ) -> Result<CompactionResult, CompactionError> {
157        let mut replaced_items = 0;
158        let mut metadata = MetadataMap::new();
159
160        for strategy in &self.strategies {
161            if ctx
162                .cancellation
163                .as_ref()
164                .is_some_and(TurnCancellation::is_cancelled)
165            {
166                return Err(CompactionError::Cancelled);
167            }
168            let result = strategy.apply(request.clone(), ctx).await?;
169            request.transcript = result.transcript;
170            replaced_items += result.replaced_items;
171            metadata.extend(result.metadata);
172        }
173
174        Ok(CompactionResult {
175            transcript: request.transcript,
176            replaced_items,
177            metadata,
178        })
179    }
180}
181
182#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
183pub struct DropReasoningStrategy {
184    drop_empty_items: bool,
185}
186
187impl DropReasoningStrategy {
188    pub fn new() -> Self {
189        Self {
190            drop_empty_items: true,
191        }
192    }
193
194    pub fn drop_empty_items(mut self, value: bool) -> Self {
195        self.drop_empty_items = value;
196        self
197    }
198}
199
200#[async_trait]
201impl CompactionStrategy for DropReasoningStrategy {
202    async fn apply(
203        &self,
204        request: CompactionRequest,
205        _ctx: &mut CompactionContext<'_>,
206    ) -> Result<CompactionResult, CompactionError> {
207        let mut transcript = Vec::with_capacity(request.transcript.len());
208        let mut replaced_items = 0;
209
210        for mut item in request.transcript {
211            let original_len = item.parts.len();
212            item.parts
213                .retain(|part| !matches!(part, Part::Reasoning(_)));
214            let changed = item.parts.len() != original_len;
215            if item.parts.is_empty() && self.drop_empty_items {
216                if changed {
217                    replaced_items += 1;
218                }
219                continue;
220            }
221            if changed {
222                replaced_items += 1;
223            }
224            transcript.push(item);
225        }
226
227        Ok(CompactionResult {
228            transcript,
229            replaced_items,
230            metadata: MetadataMap::new(),
231        })
232    }
233}
234
235#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
236pub struct DropFailedToolResultsStrategy {
237    drop_empty_items: bool,
238}
239
240impl DropFailedToolResultsStrategy {
241    pub fn new() -> Self {
242        Self {
243            drop_empty_items: true,
244        }
245    }
246
247    pub fn drop_empty_items(mut self, value: bool) -> Self {
248        self.drop_empty_items = value;
249        self
250    }
251}
252
253#[async_trait]
254impl CompactionStrategy for DropFailedToolResultsStrategy {
255    async fn apply(
256        &self,
257        request: CompactionRequest,
258        _ctx: &mut CompactionContext<'_>,
259    ) -> Result<CompactionResult, CompactionError> {
260        let mut transcript = Vec::with_capacity(request.transcript.len());
261        let mut replaced_items = 0;
262
263        for mut item in request.transcript {
264            let original_len = item.parts.len();
265            item.parts.retain(|part| {
266                !matches!(
267                    part,
268                    Part::ToolResult(result) if result.is_error
269                )
270            });
271            let changed = item.parts.len() != original_len;
272            if item.parts.is_empty() && self.drop_empty_items {
273                if changed {
274                    replaced_items += 1;
275                }
276                continue;
277            }
278            if changed {
279                replaced_items += 1;
280            }
281            transcript.push(item);
282        }
283
284        Ok(CompactionResult {
285            transcript,
286            replaced_items,
287            metadata: MetadataMap::new(),
288        })
289    }
290}
291
292#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
293pub struct KeepRecentStrategy {
294    keep_last: usize,
295    preserve_kinds: BTreeSet<ItemKind>,
296}
297
298impl KeepRecentStrategy {
299    pub fn new(keep_last: usize) -> Self {
300        Self {
301            keep_last,
302            preserve_kinds: BTreeSet::new(),
303        }
304    }
305
306    pub fn preserve_kind(mut self, kind: ItemKind) -> Self {
307        self.preserve_kinds.insert(kind);
308        self
309    }
310}
311
312#[async_trait]
313impl CompactionStrategy for KeepRecentStrategy {
314    async fn apply(
315        &self,
316        request: CompactionRequest,
317        _ctx: &mut CompactionContext<'_>,
318    ) -> Result<CompactionResult, CompactionError> {
319        let removable = removable_indices(&request.transcript, &self.preserve_kinds);
320        if removable.len() <= self.keep_last {
321            return Ok(CompactionResult {
322                transcript: request.transcript,
323                replaced_items: 0,
324                metadata: MetadataMap::new(),
325            });
326        }
327
328        let keep_indices = removable
329            .iter()
330            .skip(removable.len() - self.keep_last)
331            .copied()
332            .collect::<BTreeSet<_>>();
333        let transcript = request
334            .transcript
335            .into_iter()
336            .enumerate()
337            .filter_map(|(index, item)| {
338                (self.preserve_kinds.contains(&item.kind) || keep_indices.contains(&index))
339                    .then_some(item)
340            })
341            .collect::<Vec<_>>();
342
343        Ok(CompactionResult {
344            transcript,
345            replaced_items: removable.len() - self.keep_last,
346            metadata: MetadataMap::new(),
347        })
348    }
349}
350
351#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
352pub struct SummarizeOlderStrategy {
353    keep_last: usize,
354    preserve_kinds: BTreeSet<ItemKind>,
355}
356
357impl SummarizeOlderStrategy {
358    pub fn new(keep_last: usize) -> Self {
359        Self {
360            keep_last,
361            preserve_kinds: BTreeSet::new(),
362        }
363    }
364
365    pub fn preserve_kind(mut self, kind: ItemKind) -> Self {
366        self.preserve_kinds.insert(kind);
367        self
368    }
369}
370
371#[async_trait]
372impl CompactionStrategy for SummarizeOlderStrategy {
373    async fn apply(
374        &self,
375        request: CompactionRequest,
376        ctx: &mut CompactionContext<'_>,
377    ) -> Result<CompactionResult, CompactionError> {
378        let Some(backend) = ctx.backend else {
379            return Err(CompactionError::MissingBackend(
380                "summarize strategy requires a compaction backend".into(),
381            ));
382        };
383
384        let removable = removable_indices(&request.transcript, &self.preserve_kinds);
385        if removable.len() <= self.keep_last {
386            return Ok(CompactionResult {
387                transcript: request.transcript,
388                replaced_items: 0,
389                metadata: MetadataMap::new(),
390            });
391        }
392
393        let summary_indices = removable[..removable.len() - self.keep_last].to_vec();
394        let first_summary_index = summary_indices[0];
395        let summary_index_set = summary_indices.iter().copied().collect::<BTreeSet<_>>();
396        let summary_items = summary_indices
397            .iter()
398            .map(|index| request.transcript[*index].clone())
399            .collect::<Vec<_>>();
400        let summary = backend
401            .summarize(
402                SummaryRequest {
403                    session_id: request.session_id.clone(),
404                    turn_id: request.turn_id.clone(),
405                    items: summary_items,
406                    reason: request.reason.clone(),
407                    metadata: request.metadata.clone(),
408                },
409                ctx.cancellation.clone(),
410            )
411            .await?;
412
413        let mut transcript = Vec::new();
414        let mut inserted_summary = false;
415        let mut summary_output = Some(summary.items);
416        for (index, item) in request.transcript.into_iter().enumerate() {
417            if summary_index_set.contains(&index) {
418                if !inserted_summary && index == first_summary_index {
419                    transcript.extend(summary_output.take().unwrap_or_default());
420                    inserted_summary = true;
421                }
422                continue;
423            }
424            transcript.push(item);
425        }
426
427        Ok(CompactionResult {
428            transcript,
429            replaced_items: summary_indices.len(),
430            metadata: summary.metadata,
431        })
432    }
433}
434
435fn removable_indices(transcript: &[Item], preserve_kinds: &BTreeSet<ItemKind>) -> Vec<usize> {
436    transcript
437        .iter()
438        .enumerate()
439        .filter_map(|(index, item)| (!preserve_kinds.contains(&item.kind)).then_some(index))
440        .collect()
441}
442
443#[derive(Debug, Error)]
444pub enum CompactionError {
445    #[error("compaction cancelled")]
446    Cancelled,
447    #[error("missing compaction backend: {0}")]
448    MissingBackend(String),
449    #[error("compaction failed: {0}")]
450    Failed(String),
451}
452
453#[cfg(test)]
454mod tests {
455    use agentkit_core::{CancellationController, Part, TextPart, ToolOutput, ToolResultPart};
456
457    use super::*;
458
459    fn user_item(text: &str) -> Item {
460        Item {
461            id: None,
462            kind: ItemKind::User,
463            parts: vec![Part::Text(TextPart {
464                text: text.into(),
465                metadata: MetadataMap::new(),
466            })],
467            metadata: MetadataMap::new(),
468        }
469    }
470
471    fn assistant_with_reasoning() -> Item {
472        Item {
473            id: None,
474            kind: ItemKind::Assistant,
475            parts: vec![
476                Part::Reasoning(agentkit_core::ReasoningPart {
477                    summary: Some("think".into()),
478                    data: None,
479                    redacted: false,
480                    metadata: MetadataMap::new(),
481                }),
482                Part::Text(TextPart {
483                    text: "answer".into(),
484                    metadata: MetadataMap::new(),
485                }),
486            ],
487            metadata: MetadataMap::new(),
488        }
489    }
490
491    fn failed_tool_item() -> Item {
492        Item {
493            id: None,
494            kind: ItemKind::Tool,
495            parts: vec![Part::ToolResult(ToolResultPart {
496                call_id: "call-1".into(),
497                output: ToolOutput::Text("failed".into()),
498                is_error: true,
499                metadata: MetadataMap::new(),
500            })],
501            metadata: MetadataMap::new(),
502        }
503    }
504
505    #[test]
506    fn item_count_trigger_fires_after_limit() {
507        let trigger = ItemCountTrigger::new(2);
508        let transcript = vec![user_item("a"), user_item("b"), user_item("c")];
509        assert_eq!(
510            trigger.should_compact(&SessionId::new("s"), None, &transcript),
511            Some(CompactionReason::TranscriptTooLong)
512        );
513    }
514
515    #[tokio::test]
516    async fn pipeline_applies_local_strategies_in_order() {
517        let request = CompactionRequest {
518            session_id: "s".into(),
519            turn_id: None,
520            transcript: vec![
521                user_item("a"),
522                assistant_with_reasoning(),
523                failed_tool_item(),
524                user_item("b"),
525                user_item("c"),
526            ],
527            reason: CompactionReason::TranscriptTooLong,
528            metadata: MetadataMap::new(),
529        };
530        let pipeline = CompactionPipeline::new()
531            .with_strategy(DropReasoningStrategy::new())
532            .with_strategy(DropFailedToolResultsStrategy::new())
533            .with_strategy(
534                KeepRecentStrategy::new(2)
535                    .preserve_kind(ItemKind::System)
536                    .preserve_kind(ItemKind::Context),
537            );
538        let mut ctx = CompactionContext {
539            backend: None,
540            metadata: &MetadataMap::new(),
541            cancellation: None,
542        };
543
544        let result = pipeline.apply(request, &mut ctx).await.unwrap();
545        assert_eq!(result.transcript.len(), 2);
546        assert!(result.replaced_items >= 2);
547        assert!(result.transcript.iter().all(|item| {
548            item.parts
549                .iter()
550                .all(|part| !matches!(part, Part::Reasoning(_)))
551        }));
552    }
553
554    struct FakeBackend;
555
556    #[async_trait]
557    impl CompactionBackend for FakeBackend {
558        async fn summarize(
559            &self,
560            request: SummaryRequest,
561            _cancellation: Option<TurnCancellation>,
562        ) -> Result<SummaryResult, CompactionError> {
563            Ok(SummaryResult {
564                items: vec![Item {
565                    id: None,
566                    kind: ItemKind::Context,
567                    parts: vec![Part::Text(TextPart {
568                        text: format!("summary of {} items", request.items.len()),
569                        metadata: MetadataMap::new(),
570                    })],
571                    metadata: MetadataMap::new(),
572                }],
573                metadata: MetadataMap::new(),
574            })
575        }
576    }
577
578    #[tokio::test]
579    async fn summarize_strategy_uses_backend() {
580        let request = CompactionRequest {
581            session_id: "s".into(),
582            turn_id: None,
583            transcript: vec![user_item("a"), user_item("b"), user_item("c")],
584            reason: CompactionReason::TranscriptTooLong,
585            metadata: MetadataMap::new(),
586        };
587        let strategy = SummarizeOlderStrategy::new(1);
588        let mut ctx = CompactionContext {
589            backend: Some(&FakeBackend),
590            metadata: &MetadataMap::new(),
591            cancellation: None,
592        };
593
594        let result = strategy.apply(request, &mut ctx).await.unwrap();
595        assert_eq!(result.replaced_items, 2);
596        assert_eq!(result.transcript.len(), 2);
597        match &result.transcript[0].parts[0] {
598            Part::Text(text) => assert_eq!(text.text, "summary of 2 items"),
599            other => panic!("unexpected part: {other:?}"),
600        }
601    }
602
603    #[tokio::test]
604    async fn pipeline_stops_when_cancelled() {
605        let controller = CancellationController::new();
606        let checkpoint = controller.handle().checkpoint();
607        controller.interrupt();
608        let request = CompactionRequest {
609            session_id: "s".into(),
610            turn_id: None,
611            transcript: vec![user_item("a"), user_item("b"), user_item("c")],
612            reason: CompactionReason::TranscriptTooLong,
613            metadata: MetadataMap::new(),
614        };
615        let pipeline = CompactionPipeline::new().with_strategy(DropReasoningStrategy::new());
616        let mut ctx = CompactionContext {
617            backend: None,
618            metadata: &MetadataMap::new(),
619            cancellation: Some(checkpoint),
620        };
621
622        let error = pipeline.apply(request, &mut ctx).await.unwrap_err();
623        assert!(matches!(error, CompactionError::Cancelled));
624    }
625}