1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use agentkit_core::{Item, ItemKind, MetadataMap, Part, SessionId, TurnCancellation, TurnId};
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
10pub enum CompactionReason {
11 TranscriptTooLong,
12 Manual,
13 Custom(String),
14}
15
16#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
17pub struct CompactionRequest {
18 pub session_id: SessionId,
19 pub turn_id: Option<TurnId>,
20 pub transcript: Vec<Item>,
21 pub reason: CompactionReason,
22 pub metadata: MetadataMap,
23}
24
25#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
26pub struct CompactionResult {
27 pub transcript: Vec<Item>,
28 pub replaced_items: usize,
29 pub metadata: MetadataMap,
30}
31
32#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
33pub struct SummaryRequest {
34 pub session_id: SessionId,
35 pub turn_id: Option<TurnId>,
36 pub items: Vec<Item>,
37 pub reason: CompactionReason,
38 pub metadata: MetadataMap,
39}
40
41#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
42pub struct SummaryResult {
43 pub items: Vec<Item>,
44 pub metadata: MetadataMap,
45}
46
47pub trait CompactionTrigger: Send + Sync {
48 fn should_compact(
49 &self,
50 session_id: &SessionId,
51 turn_id: Option<&TurnId>,
52 transcript: &[Item],
53 ) -> Option<CompactionReason>;
54}
55
56#[async_trait]
57pub trait CompactionBackend: Send + Sync {
58 async fn summarize(
59 &self,
60 request: SummaryRequest,
61 cancellation: Option<TurnCancellation>,
62 ) -> Result<SummaryResult, CompactionError>;
63}
64
65pub struct CompactionContext<'a> {
66 pub backend: Option<&'a dyn CompactionBackend>,
67 pub metadata: &'a MetadataMap,
68 pub cancellation: Option<TurnCancellation>,
69}
70
71#[async_trait]
72pub trait CompactionStrategy: Send + Sync {
73 async fn apply(
74 &self,
75 request: CompactionRequest,
76 ctx: &mut CompactionContext<'_>,
77 ) -> Result<CompactionResult, CompactionError>;
78}
79
80#[derive(Clone)]
81pub struct CompactionConfig {
82 pub trigger: Arc<dyn CompactionTrigger>,
83 pub strategy: Arc<dyn CompactionStrategy>,
84 pub backend: Option<Arc<dyn CompactionBackend>>,
85 pub metadata: MetadataMap,
86}
87
88impl CompactionConfig {
89 pub fn new(
90 trigger: impl CompactionTrigger + 'static,
91 strategy: impl CompactionStrategy + 'static,
92 ) -> Self {
93 Self {
94 trigger: Arc::new(trigger),
95 strategy: Arc::new(strategy),
96 backend: None,
97 metadata: MetadataMap::new(),
98 }
99 }
100
101 pub fn with_backend(mut self, backend: impl CompactionBackend + 'static) -> Self {
102 self.backend = Some(Arc::new(backend));
103 self
104 }
105
106 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
107 self.metadata = metadata;
108 self
109 }
110}
111
112#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
113pub struct ItemCountTrigger {
114 pub max_items: usize,
115}
116
117impl ItemCountTrigger {
118 pub fn new(max_items: usize) -> Self {
119 Self { max_items }
120 }
121}
122
123impl CompactionTrigger for ItemCountTrigger {
124 fn should_compact(
125 &self,
126 _session_id: &SessionId,
127 _turn_id: Option<&TurnId>,
128 transcript: &[Item],
129 ) -> Option<CompactionReason> {
130 (transcript.len() > self.max_items).then_some(CompactionReason::TranscriptTooLong)
131 }
132}
133
134#[derive(Clone, Default)]
135pub struct CompactionPipeline {
136 strategies: Vec<Arc<dyn CompactionStrategy>>,
137}
138
139impl CompactionPipeline {
140 pub fn new() -> Self {
141 Self::default()
142 }
143
144 pub fn with_strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
145 self.strategies.push(Arc::new(strategy));
146 self
147 }
148}
149
150#[async_trait]
151impl CompactionStrategy for CompactionPipeline {
152 async fn apply(
153 &self,
154 mut request: CompactionRequest,
155 ctx: &mut CompactionContext<'_>,
156 ) -> Result<CompactionResult, CompactionError> {
157 let mut replaced_items = 0;
158 let mut metadata = MetadataMap::new();
159
160 for strategy in &self.strategies {
161 if ctx
162 .cancellation
163 .as_ref()
164 .is_some_and(TurnCancellation::is_cancelled)
165 {
166 return Err(CompactionError::Cancelled);
167 }
168 let result = strategy.apply(request.clone(), ctx).await?;
169 request.transcript = result.transcript;
170 replaced_items += result.replaced_items;
171 metadata.extend(result.metadata);
172 }
173
174 Ok(CompactionResult {
175 transcript: request.transcript,
176 replaced_items,
177 metadata,
178 })
179 }
180}
181
182#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
183pub struct DropReasoningStrategy {
184 drop_empty_items: bool,
185}
186
187impl DropReasoningStrategy {
188 pub fn new() -> Self {
189 Self {
190 drop_empty_items: true,
191 }
192 }
193
194 pub fn drop_empty_items(mut self, value: bool) -> Self {
195 self.drop_empty_items = value;
196 self
197 }
198}
199
200#[async_trait]
201impl CompactionStrategy for DropReasoningStrategy {
202 async fn apply(
203 &self,
204 request: CompactionRequest,
205 _ctx: &mut CompactionContext<'_>,
206 ) -> Result<CompactionResult, CompactionError> {
207 let mut transcript = Vec::with_capacity(request.transcript.len());
208 let mut replaced_items = 0;
209
210 for mut item in request.transcript {
211 let original_len = item.parts.len();
212 item.parts
213 .retain(|part| !matches!(part, Part::Reasoning(_)));
214 let changed = item.parts.len() != original_len;
215 if item.parts.is_empty() && self.drop_empty_items {
216 if changed {
217 replaced_items += 1;
218 }
219 continue;
220 }
221 if changed {
222 replaced_items += 1;
223 }
224 transcript.push(item);
225 }
226
227 Ok(CompactionResult {
228 transcript,
229 replaced_items,
230 metadata: MetadataMap::new(),
231 })
232 }
233}
234
235#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
236pub struct DropFailedToolResultsStrategy {
237 drop_empty_items: bool,
238}
239
240impl DropFailedToolResultsStrategy {
241 pub fn new() -> Self {
242 Self {
243 drop_empty_items: true,
244 }
245 }
246
247 pub fn drop_empty_items(mut self, value: bool) -> Self {
248 self.drop_empty_items = value;
249 self
250 }
251}
252
253#[async_trait]
254impl CompactionStrategy for DropFailedToolResultsStrategy {
255 async fn apply(
256 &self,
257 request: CompactionRequest,
258 _ctx: &mut CompactionContext<'_>,
259 ) -> Result<CompactionResult, CompactionError> {
260 let mut transcript = Vec::with_capacity(request.transcript.len());
261 let mut replaced_items = 0;
262
263 for mut item in request.transcript {
264 let original_len = item.parts.len();
265 item.parts.retain(|part| {
266 !matches!(
267 part,
268 Part::ToolResult(result) if result.is_error
269 )
270 });
271 let changed = item.parts.len() != original_len;
272 if item.parts.is_empty() && self.drop_empty_items {
273 if changed {
274 replaced_items += 1;
275 }
276 continue;
277 }
278 if changed {
279 replaced_items += 1;
280 }
281 transcript.push(item);
282 }
283
284 Ok(CompactionResult {
285 transcript,
286 replaced_items,
287 metadata: MetadataMap::new(),
288 })
289 }
290}
291
292#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
293pub struct KeepRecentStrategy {
294 keep_last: usize,
295 preserve_kinds: BTreeSet<ItemKind>,
296}
297
298impl KeepRecentStrategy {
299 pub fn new(keep_last: usize) -> Self {
300 Self {
301 keep_last,
302 preserve_kinds: BTreeSet::new(),
303 }
304 }
305
306 pub fn preserve_kind(mut self, kind: ItemKind) -> Self {
307 self.preserve_kinds.insert(kind);
308 self
309 }
310}
311
312#[async_trait]
313impl CompactionStrategy for KeepRecentStrategy {
314 async fn apply(
315 &self,
316 request: CompactionRequest,
317 _ctx: &mut CompactionContext<'_>,
318 ) -> Result<CompactionResult, CompactionError> {
319 let removable = removable_indices(&request.transcript, &self.preserve_kinds);
320 if removable.len() <= self.keep_last {
321 return Ok(CompactionResult {
322 transcript: request.transcript,
323 replaced_items: 0,
324 metadata: MetadataMap::new(),
325 });
326 }
327
328 let keep_indices = removable
329 .iter()
330 .skip(removable.len() - self.keep_last)
331 .copied()
332 .collect::<BTreeSet<_>>();
333 let transcript = request
334 .transcript
335 .into_iter()
336 .enumerate()
337 .filter_map(|(index, item)| {
338 (self.preserve_kinds.contains(&item.kind) || keep_indices.contains(&index))
339 .then_some(item)
340 })
341 .collect::<Vec<_>>();
342
343 Ok(CompactionResult {
344 transcript,
345 replaced_items: removable.len() - self.keep_last,
346 metadata: MetadataMap::new(),
347 })
348 }
349}
350
351#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
352pub struct SummarizeOlderStrategy {
353 keep_last: usize,
354 preserve_kinds: BTreeSet<ItemKind>,
355}
356
357impl SummarizeOlderStrategy {
358 pub fn new(keep_last: usize) -> Self {
359 Self {
360 keep_last,
361 preserve_kinds: BTreeSet::new(),
362 }
363 }
364
365 pub fn preserve_kind(mut self, kind: ItemKind) -> Self {
366 self.preserve_kinds.insert(kind);
367 self
368 }
369}
370
371#[async_trait]
372impl CompactionStrategy for SummarizeOlderStrategy {
373 async fn apply(
374 &self,
375 request: CompactionRequest,
376 ctx: &mut CompactionContext<'_>,
377 ) -> Result<CompactionResult, CompactionError> {
378 let Some(backend) = ctx.backend else {
379 return Err(CompactionError::MissingBackend(
380 "summarize strategy requires a compaction backend".into(),
381 ));
382 };
383
384 let removable = removable_indices(&request.transcript, &self.preserve_kinds);
385 if removable.len() <= self.keep_last {
386 return Ok(CompactionResult {
387 transcript: request.transcript,
388 replaced_items: 0,
389 metadata: MetadataMap::new(),
390 });
391 }
392
393 let summary_indices = removable[..removable.len() - self.keep_last].to_vec();
394 let first_summary_index = summary_indices[0];
395 let summary_index_set = summary_indices.iter().copied().collect::<BTreeSet<_>>();
396 let summary_items = summary_indices
397 .iter()
398 .map(|index| request.transcript[*index].clone())
399 .collect::<Vec<_>>();
400 let summary = backend
401 .summarize(
402 SummaryRequest {
403 session_id: request.session_id.clone(),
404 turn_id: request.turn_id.clone(),
405 items: summary_items,
406 reason: request.reason.clone(),
407 metadata: request.metadata.clone(),
408 },
409 ctx.cancellation.clone(),
410 )
411 .await?;
412
413 let mut transcript = Vec::new();
414 let mut inserted_summary = false;
415 let mut summary_output = Some(summary.items);
416 for (index, item) in request.transcript.into_iter().enumerate() {
417 if summary_index_set.contains(&index) {
418 if !inserted_summary && index == first_summary_index {
419 transcript.extend(summary_output.take().unwrap_or_default());
420 inserted_summary = true;
421 }
422 continue;
423 }
424 transcript.push(item);
425 }
426
427 Ok(CompactionResult {
428 transcript,
429 replaced_items: summary_indices.len(),
430 metadata: summary.metadata,
431 })
432 }
433}
434
435fn removable_indices(transcript: &[Item], preserve_kinds: &BTreeSet<ItemKind>) -> Vec<usize> {
436 transcript
437 .iter()
438 .enumerate()
439 .filter_map(|(index, item)| (!preserve_kinds.contains(&item.kind)).then_some(index))
440 .collect()
441}
442
443#[derive(Debug, Error)]
444pub enum CompactionError {
445 #[error("compaction cancelled")]
446 Cancelled,
447 #[error("missing compaction backend: {0}")]
448 MissingBackend(String),
449 #[error("compaction failed: {0}")]
450 Failed(String),
451}
452
453#[cfg(test)]
454mod tests {
455 use agentkit_core::{CancellationController, Part, TextPart, ToolOutput, ToolResultPart};
456
457 use super::*;
458
459 fn user_item(text: &str) -> Item {
460 Item {
461 id: None,
462 kind: ItemKind::User,
463 parts: vec![Part::Text(TextPart {
464 text: text.into(),
465 metadata: MetadataMap::new(),
466 })],
467 metadata: MetadataMap::new(),
468 }
469 }
470
471 fn assistant_with_reasoning() -> Item {
472 Item {
473 id: None,
474 kind: ItemKind::Assistant,
475 parts: vec![
476 Part::Reasoning(agentkit_core::ReasoningPart {
477 summary: Some("think".into()),
478 data: None,
479 redacted: false,
480 metadata: MetadataMap::new(),
481 }),
482 Part::Text(TextPart {
483 text: "answer".into(),
484 metadata: MetadataMap::new(),
485 }),
486 ],
487 metadata: MetadataMap::new(),
488 }
489 }
490
491 fn failed_tool_item() -> Item {
492 Item {
493 id: None,
494 kind: ItemKind::Tool,
495 parts: vec![Part::ToolResult(ToolResultPart {
496 call_id: "call-1".into(),
497 output: ToolOutput::Text("failed".into()),
498 is_error: true,
499 metadata: MetadataMap::new(),
500 })],
501 metadata: MetadataMap::new(),
502 }
503 }
504
505 #[test]
506 fn item_count_trigger_fires_after_limit() {
507 let trigger = ItemCountTrigger::new(2);
508 let transcript = vec![user_item("a"), user_item("b"), user_item("c")];
509 assert_eq!(
510 trigger.should_compact(&SessionId::new("s"), None, &transcript),
511 Some(CompactionReason::TranscriptTooLong)
512 );
513 }
514
515 #[tokio::test]
516 async fn pipeline_applies_local_strategies_in_order() {
517 let request = CompactionRequest {
518 session_id: "s".into(),
519 turn_id: None,
520 transcript: vec![
521 user_item("a"),
522 assistant_with_reasoning(),
523 failed_tool_item(),
524 user_item("b"),
525 user_item("c"),
526 ],
527 reason: CompactionReason::TranscriptTooLong,
528 metadata: MetadataMap::new(),
529 };
530 let pipeline = CompactionPipeline::new()
531 .with_strategy(DropReasoningStrategy::new())
532 .with_strategy(DropFailedToolResultsStrategy::new())
533 .with_strategy(
534 KeepRecentStrategy::new(2)
535 .preserve_kind(ItemKind::System)
536 .preserve_kind(ItemKind::Context),
537 );
538 let mut ctx = CompactionContext {
539 backend: None,
540 metadata: &MetadataMap::new(),
541 cancellation: None,
542 };
543
544 let result = pipeline.apply(request, &mut ctx).await.unwrap();
545 assert_eq!(result.transcript.len(), 2);
546 assert!(result.replaced_items >= 2);
547 assert!(result.transcript.iter().all(|item| {
548 item.parts
549 .iter()
550 .all(|part| !matches!(part, Part::Reasoning(_)))
551 }));
552 }
553
554 struct FakeBackend;
555
556 #[async_trait]
557 impl CompactionBackend for FakeBackend {
558 async fn summarize(
559 &self,
560 request: SummaryRequest,
561 _cancellation: Option<TurnCancellation>,
562 ) -> Result<SummaryResult, CompactionError> {
563 Ok(SummaryResult {
564 items: vec![Item {
565 id: None,
566 kind: ItemKind::Context,
567 parts: vec![Part::Text(TextPart {
568 text: format!("summary of {} items", request.items.len()),
569 metadata: MetadataMap::new(),
570 })],
571 metadata: MetadataMap::new(),
572 }],
573 metadata: MetadataMap::new(),
574 })
575 }
576 }
577
578 #[tokio::test]
579 async fn summarize_strategy_uses_backend() {
580 let request = CompactionRequest {
581 session_id: "s".into(),
582 turn_id: None,
583 transcript: vec![user_item("a"), user_item("b"), user_item("c")],
584 reason: CompactionReason::TranscriptTooLong,
585 metadata: MetadataMap::new(),
586 };
587 let strategy = SummarizeOlderStrategy::new(1);
588 let mut ctx = CompactionContext {
589 backend: Some(&FakeBackend),
590 metadata: &MetadataMap::new(),
591 cancellation: None,
592 };
593
594 let result = strategy.apply(request, &mut ctx).await.unwrap();
595 assert_eq!(result.replaced_items, 2);
596 assert_eq!(result.transcript.len(), 2);
597 match &result.transcript[0].parts[0] {
598 Part::Text(text) => assert_eq!(text.text, "summary of 2 items"),
599 other => panic!("unexpected part: {other:?}"),
600 }
601 }
602
603 #[tokio::test]
604 async fn pipeline_stops_when_cancelled() {
605 let controller = CancellationController::new();
606 let checkpoint = controller.handle().checkpoint();
607 controller.interrupt();
608 let request = CompactionRequest {
609 session_id: "s".into(),
610 turn_id: None,
611 transcript: vec![user_item("a"), user_item("b"), user_item("c")],
612 reason: CompactionReason::TranscriptTooLong,
613 metadata: MetadataMap::new(),
614 };
615 let pipeline = CompactionPipeline::new().with_strategy(DropReasoningStrategy::new());
616 let mut ctx = CompactionContext {
617 backend: None,
618 metadata: &MetadataMap::new(),
619 cancellation: Some(checkpoint),
620 };
621
622 let error = pipeline.apply(request, &mut ctx).await.unwrap_err();
623 assert!(matches!(error, CompactionError::Cancelled));
624 }
625}