1use std::collections::{BTreeSet, HashMap};
24use std::sync::Arc;
25
26use agentkit_core::{Item, ItemKind, MetadataMap, Part, SessionId, TurnCancellation};
27use agentkit_loop::{
28 Agent, AgentBuilder, AgentEvent, LoopCtx, LoopError, LoopMutator, LoopStep, ModelAdapter,
29 MutationPoint, SessionConfig, TranscriptCursor,
30};
31use async_trait::async_trait;
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
40pub enum CompactionReason {
41 TranscriptTooLong,
43 Manual,
45 Custom(String),
47}
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
52pub struct CompactionRequest {
53 pub transcript: Vec<Item>,
55 pub reason: CompactionReason,
57 pub metadata: MetadataMap,
59}
60
61impl CompactionRequest {
62 pub fn new(transcript: Vec<Item>, reason: CompactionReason) -> Self {
64 Self {
65 transcript,
66 reason,
67 metadata: MetadataMap::new(),
68 }
69 }
70
71 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
73 self.metadata = metadata;
74 self
75 }
76}
77
78#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
82pub struct CompactionResult {
83 pub transcript: Vec<Item>,
85 pub replaced_items: usize,
87 pub metadata: MetadataMap,
89}
90
91impl CompactionResult {
92 pub fn new(transcript: Vec<Item>, replaced_items: usize) -> Self {
94 Self {
95 transcript,
96 replaced_items,
97 metadata: MetadataMap::new(),
98 }
99 }
100
101 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
103 self.metadata = metadata;
104 self
105 }
106}
107
108#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
111pub struct SummaryRequest {
112 pub items: Vec<Item>,
114 pub reason: CompactionReason,
116 pub metadata: MetadataMap,
118}
119
120impl SummaryRequest {
121 pub fn new(items: Vec<Item>, reason: CompactionReason) -> Self {
123 Self {
124 items,
125 reason,
126 metadata: MetadataMap::new(),
127 }
128 }
129
130 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
132 self.metadata = metadata;
133 self
134 }
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct SummaryResult {
140 pub items: Vec<Item>,
142 pub metadata: MetadataMap,
144}
145
146impl SummaryResult {
147 pub fn new(items: Vec<Item>) -> Self {
149 Self {
150 items,
151 metadata: MetadataMap::new(),
152 }
153 }
154
155 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
157 self.metadata = metadata;
158 self
159 }
160}
161
162#[async_trait]
174pub trait CompactionBackend: Send + Sync {
175 async fn summarize(
187 &self,
188 request: SummaryRequest,
189 cancellation: Option<TurnCancellation>,
190 ) -> Result<SummaryResult, CompactionError>;
191}
192
193#[async_trait]
201pub trait Compactor: Send + Sync {
202 fn should_compact(&self, transcript: &[Item], point: MutationPoint)
205 -> Option<CompactionReason>;
206
207 async fn compact(
211 &self,
212 transcript: &[Item],
213 reason: CompactionReason,
214 cancellation: Option<TurnCancellation>,
215 ) -> Result<Vec<Item>, CompactionError>;
216}
217
218pub struct CompactionContext<'a> {
224 pub backend: Option<&'a dyn CompactionBackend>,
227 pub cancellation: Option<TurnCancellation>,
230}
231
232impl<'a> CompactionContext<'a> {
233 pub fn new() -> Self {
235 Self {
236 backend: None,
237 cancellation: None,
238 }
239 }
240
241 pub fn with_backend(mut self, backend: &'a dyn CompactionBackend) -> Self {
243 self.backend = Some(backend);
244 self
245 }
246
247 pub fn with_cancellation(mut self, cancellation: TurnCancellation) -> Self {
249 self.cancellation = Some(cancellation);
250 self
251 }
252}
253
254impl Default for CompactionContext<'_> {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260#[async_trait]
286pub trait CompactionStrategy: Send + Sync {
287 async fn apply(
299 &self,
300 request: CompactionRequest,
301 ctx: &mut CompactionContext<'_>,
302 ) -> Result<CompactionResult, CompactionError>;
303}
304
305#[derive(Clone, Default)]
336pub struct CompactionPipeline {
337 strategies: Vec<Arc<dyn CompactionStrategy>>,
338}
339
340impl CompactionPipeline {
341 pub fn new() -> Self {
343 Self::default()
344 }
345
346 pub fn with_strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
350 self.strategies.push(Arc::new(strategy));
351 self
352 }
353}
354
355#[async_trait]
356impl CompactionStrategy for CompactionPipeline {
357 async fn apply(
358 &self,
359 mut request: CompactionRequest,
360 ctx: &mut CompactionContext<'_>,
361 ) -> Result<CompactionResult, CompactionError> {
362 let mut replaced_items = 0;
363 let mut metadata = MetadataMap::new();
364
365 for strategy in &self.strategies {
366 if ctx
367 .cancellation
368 .as_ref()
369 .is_some_and(TurnCancellation::is_cancelled)
370 {
371 return Err(CompactionError::Cancelled);
372 }
373 let result = strategy.apply(request.clone(), ctx).await?;
374 request.transcript = result.transcript;
375 replaced_items += result.replaced_items;
376 metadata.extend(result.metadata);
377 }
378
379 Ok(CompactionResult {
380 transcript: request.transcript,
381 replaced_items,
382 metadata,
383 })
384 }
385}
386
387#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
407pub struct DropReasoningStrategy {
408 drop_empty_items: bool,
409}
410
411impl DropReasoningStrategy {
412 pub fn new() -> Self {
415 Self {
416 drop_empty_items: true,
417 }
418 }
419
420 pub fn drop_empty_items(mut self, value: bool) -> Self {
425 self.drop_empty_items = value;
426 self
427 }
428}
429
430#[async_trait]
431impl CompactionStrategy for DropReasoningStrategy {
432 async fn apply(
433 &self,
434 request: CompactionRequest,
435 _ctx: &mut CompactionContext<'_>,
436 ) -> Result<CompactionResult, CompactionError> {
437 let mut transcript = Vec::with_capacity(request.transcript.len());
438 let mut replaced_items = 0;
439
440 for mut item in request.transcript {
441 let original_len = item.parts.len();
442 item.parts
443 .retain(|part| !matches!(part, Part::Reasoning(_)));
444 let changed = item.parts.len() != original_len;
445 if item.parts.is_empty() && self.drop_empty_items {
446 if changed {
447 replaced_items += 1;
448 }
449 continue;
450 }
451 if changed {
452 replaced_items += 1;
453 }
454 transcript.push(item);
455 }
456
457 Ok(CompactionResult {
458 transcript,
459 replaced_items,
460 metadata: MetadataMap::new(),
461 })
462 }
463}
464
465#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
486pub struct DropFailedToolResultsStrategy {
487 drop_empty_items: bool,
488}
489
490impl DropFailedToolResultsStrategy {
491 pub fn new() -> Self {
494 Self {
495 drop_empty_items: true,
496 }
497 }
498
499 pub fn drop_empty_items(mut self, value: bool) -> Self {
504 self.drop_empty_items = value;
505 self
506 }
507}
508
509#[async_trait]
510impl CompactionStrategy for DropFailedToolResultsStrategy {
511 async fn apply(
512 &self,
513 request: CompactionRequest,
514 _ctx: &mut CompactionContext<'_>,
515 ) -> Result<CompactionResult, CompactionError> {
516 let failed_call_ids = request
517 .transcript
518 .iter()
519 .flat_map(|item| item.parts.iter())
520 .filter_map(|part| match part {
521 Part::ToolResult(result) if result.is_error => Some(result.call_id.clone()),
522 _ => None,
523 })
524 .collect::<BTreeSet<_>>();
525 let mut transcript = Vec::with_capacity(request.transcript.len());
526 let mut replaced_items = 0;
527
528 for mut item in request.transcript {
529 let original_len = item.parts.len();
530 item.parts.retain(|part| {
531 !matches!(part, Part::ToolResult(result) if result.is_error)
532 && !matches!(part, Part::ToolCall(call) if failed_call_ids.contains(&call.id))
533 });
534 let changed = item.parts.len() != original_len;
535 if item.parts.is_empty() && self.drop_empty_items {
536 if changed {
537 replaced_items += 1;
538 }
539 continue;
540 }
541 if changed {
542 replaced_items += 1;
543 }
544 transcript.push(item);
545 }
546
547 Ok(CompactionResult {
548 transcript,
549 replaced_items,
550 metadata: MetadataMap::new(),
551 })
552 }
553}
554
555#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
573pub struct KeepRecentStrategy {
574 keep_last: usize,
575 preserve_kinds: BTreeSet<ItemKind>,
576}
577
578impl KeepRecentStrategy {
579 pub fn new(keep_last: usize) -> Self {
581 Self {
582 keep_last,
583 preserve_kinds: BTreeSet::new(),
584 }
585 }
586
587 pub fn preserve_kind(mut self, kind: ItemKind) -> Self {
590 self.preserve_kinds.insert(kind);
591 self
592 }
593}
594
595#[async_trait]
596impl CompactionStrategy for KeepRecentStrategy {
597 async fn apply(
598 &self,
599 request: CompactionRequest,
600 _ctx: &mut CompactionContext<'_>,
601 ) -> Result<CompactionResult, CompactionError> {
602 let removable = removable_indices(&request.transcript, &self.preserve_kinds);
603 if removable.len() <= self.keep_last {
604 return Ok(CompactionResult {
605 transcript: request.transcript,
606 replaced_items: 0,
607 metadata: MetadataMap::new(),
608 });
609 }
610
611 let keep_indices = removable
612 .iter()
613 .skip(removable.len() - self.keep_last)
614 .copied()
615 .collect::<BTreeSet<_>>();
616 let keep_indices =
617 expand_indices_to_tool_pairs(&request.transcript, keep_indices, &self.preserve_kinds);
618 let replaced_items = removable
619 .iter()
620 .filter(|index| !keep_indices.contains(index))
621 .count();
622 let transcript = request
623 .transcript
624 .into_iter()
625 .enumerate()
626 .filter_map(|(index, item)| {
627 (self.preserve_kinds.contains(&item.kind) || keep_indices.contains(&index))
628 .then_some(item)
629 })
630 .collect::<Vec<_>>();
631
632 Ok(CompactionResult {
633 transcript,
634 replaced_items,
635 metadata: MetadataMap::new(),
636 })
637 }
638}
639
640#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
661pub struct SummarizeOlderStrategy {
662 keep_last: usize,
663 preserve_kinds: BTreeSet<ItemKind>,
664}
665
666impl SummarizeOlderStrategy {
667 pub fn new(keep_last: usize) -> Self {
670 Self {
671 keep_last,
672 preserve_kinds: BTreeSet::new(),
673 }
674 }
675
676 pub fn preserve_kind(mut self, kind: ItemKind) -> Self {
679 self.preserve_kinds.insert(kind);
680 self
681 }
682}
683
684#[async_trait]
685impl CompactionStrategy for SummarizeOlderStrategy {
686 async fn apply(
687 &self,
688 request: CompactionRequest,
689 ctx: &mut CompactionContext<'_>,
690 ) -> Result<CompactionResult, CompactionError> {
691 let Some(backend) = ctx.backend else {
692 return Err(CompactionError::MissingBackend(
693 "summarize strategy requires a compaction backend".into(),
694 ));
695 };
696
697 let removable = removable_indices(&request.transcript, &self.preserve_kinds);
698 if removable.len() <= self.keep_last {
699 return Ok(CompactionResult {
700 transcript: request.transcript,
701 replaced_items: 0,
702 metadata: MetadataMap::new(),
703 });
704 }
705
706 let keep_indices = removable
707 .iter()
708 .skip(removable.len() - self.keep_last)
709 .copied()
710 .collect::<BTreeSet<_>>();
711 let keep_indices =
712 expand_indices_to_tool_pairs(&request.transcript, keep_indices, &self.preserve_kinds);
713 let summary_indices = removable
714 .iter()
715 .copied()
716 .filter(|index| !keep_indices.contains(index))
717 .collect::<Vec<_>>();
718 if summary_indices.is_empty() {
719 return Ok(CompactionResult {
720 transcript: request.transcript,
721 replaced_items: 0,
722 metadata: MetadataMap::new(),
723 });
724 }
725 let first_summary_index = summary_indices[0];
726 let summary_index_set = summary_indices.iter().copied().collect::<BTreeSet<_>>();
727 let summary_items = summary_indices
728 .iter()
729 .map(|index| request.transcript[*index].clone())
730 .collect::<Vec<_>>();
731 let summary = backend
732 .summarize(
733 SummaryRequest {
734 items: summary_items,
735 reason: request.reason.clone(),
736 metadata: request.metadata.clone(),
737 },
738 ctx.cancellation.clone(),
739 )
740 .await?;
741
742 let mut transcript = Vec::new();
743 let mut inserted_summary = false;
744 let mut summary_output = Some(summary.items);
745 for (index, item) in request.transcript.into_iter().enumerate() {
746 if summary_index_set.contains(&index) {
747 if !inserted_summary && index == first_summary_index {
748 transcript.extend(summary_output.take().unwrap_or_default());
749 inserted_summary = true;
750 }
751 continue;
752 }
753 transcript.push(item);
754 }
755
756 Ok(CompactionResult {
757 transcript,
758 replaced_items: summary_indices.len(),
759 metadata: summary.metadata,
760 })
761 }
762}
763
764fn removable_indices(transcript: &[Item], preserve_kinds: &BTreeSet<ItemKind>) -> Vec<usize> {
765 transcript
766 .iter()
767 .enumerate()
768 .filter_map(|(index, item)| (!preserve_kinds.contains(&item.kind)).then_some(index))
769 .collect()
770}
771
772fn expand_indices_to_tool_pairs(
773 transcript: &[Item],
774 mut keep_indices: BTreeSet<usize>,
775 preserve_kinds: &BTreeSet<ItemKind>,
776) -> BTreeSet<usize> {
777 keep_indices.extend(
778 transcript
779 .iter()
780 .enumerate()
781 .filter_map(|(index, item)| preserve_kinds.contains(&item.kind).then_some(index)),
782 );
783
784 let mut calls = HashMap::new();
785 let mut results: HashMap<_, Vec<usize>> = HashMap::new();
786 for (index, item) in transcript.iter().enumerate() {
787 for part in &item.parts {
788 match part {
789 Part::ToolCall(call) => {
790 calls.entry(call.id.clone()).or_insert(index);
791 }
792 Part::ToolResult(result) => {
793 results
794 .entry(result.call_id.clone())
795 .or_default()
796 .push(index);
797 }
798 _ => {}
799 }
800 }
801 }
802
803 loop {
804 let before_len = keep_indices.len();
805 for (call_id, call_index) in &calls {
806 if keep_indices.contains(call_index)
807 && let Some(result_indices) = results.get(call_id)
808 {
809 keep_indices.extend(result_indices.iter().copied());
810 }
811 }
812 for (call_id, result_indices) in &results {
813 if result_indices
814 .iter()
815 .any(|result_index| keep_indices.contains(result_index))
816 && let Some(call_index) = calls.get(call_id)
817 {
818 keep_indices.insert(*call_index);
819 }
820 }
821 if keep_indices.len() == before_len {
822 break;
823 }
824 }
825
826 keep_indices
827}
828
829#[derive(Debug, Error)]
831pub enum CompactionError {
832 #[error("compaction cancelled")]
834 Cancelled,
835 #[error("missing compaction backend: {0}")]
838 MissingBackend(String),
839 #[error("compaction failed: {0}")]
841 Failed(String),
842}
843
844pub struct CompactorMutator<C> {
854 compactor: C,
855 name: String,
856}
857
858impl<C: Compactor> CompactorMutator<C> {
859 pub fn new(compactor: C) -> Self {
861 Self {
862 compactor,
863 name: "compactor".into(),
864 }
865 }
866
867 pub fn with_name(mut self, name: impl Into<String>) -> Self {
870 self.name = name.into();
871 self
872 }
873}
874
875#[async_trait]
876impl<C: Compactor + 'static> LoopMutator for CompactorMutator<C> {
877 async fn mutate(
878 &self,
879 cursor: &mut TranscriptCursor<'_>,
880 ctx: LoopCtx<'_>,
881 ) -> Result<(), LoopError> {
882 let Some(reason) = self.compactor.should_compact(cursor.as_slice(), ctx.point) else {
883 return Ok(());
884 };
885
886 ctx.emitter.emit(AgentEvent::MutationStarted {
887 session_id: ctx.session_id.clone(),
888 turn_id: ctx.turn_id.cloned(),
889 mutator: self.name.clone(),
890 point: ctx.point,
891 });
892
893 let before_len = cursor.len();
894 let result = self
895 .compactor
896 .compact(cursor.as_slice(), reason.clone(), ctx.cancellation.clone())
897 .await;
898
899 let mut metadata = MetadataMap::new();
900 metadata.insert("reason".into(), format!("{reason:?}").into());
901
902 match result {
903 Ok(new_items) => {
904 let replaced = before_len.saturating_sub(new_items.len());
905 metadata.insert("replaced_items".into(), (replaced as u64).into());
906 **cursor = new_items;
907 ctx.emitter.emit(AgentEvent::MutationFinished {
908 session_id: ctx.session_id.clone(),
909 turn_id: ctx.turn_id.cloned(),
910 mutator: self.name.clone(),
911 dirty: true,
912 metadata,
913 });
914 Ok(())
915 }
916 Err(err) => {
917 metadata.insert("error".into(), err.to_string().into());
918 ctx.emitter.emit(AgentEvent::MutationFinished {
919 session_id: ctx.session_id.clone(),
920 turn_id: ctx.turn_id.cloned(),
921 mutator: self.name.clone(),
922 dirty: false,
923 metadata,
924 });
925 match err {
926 CompactionError::Cancelled => Err(LoopError::Cancelled),
927 other => Err(LoopError::Mutator(other.to_string())),
928 }
929 }
930 }
931 }
932}
933
934pub trait AgentBuilderCompactorExt<M: ModelAdapter>: Sized {
938 fn compactor<C: Compactor + 'static>(self, compactor: C) -> Self;
940}
941
942impl<M: ModelAdapter> AgentBuilderCompactorExt<M> for AgentBuilder<M> {
943 fn compactor<C: Compactor + 'static>(self, compactor: C) -> Self {
944 self.mutator(CompactorMutator::new(compactor))
945 }
946}
947
948pub type TriggerFn =
952 Box<dyn Fn(&[Item], MutationPoint) -> Option<CompactionReason> + Send + Sync>;
953
954pub struct StrategyCompactor {
983 trigger: TriggerFn,
984 strategy: Arc<dyn CompactionStrategy>,
985 backend: Option<Arc<dyn CompactionBackend>>,
986 metadata: MetadataMap,
987}
988
989impl StrategyCompactor {
990 pub fn new<T, S>(trigger: T, strategy: S) -> Self
995 where
996 T: Fn(&[Item], MutationPoint) -> Option<CompactionReason> + Send + Sync + 'static,
997 S: CompactionStrategy + 'static,
998 {
999 Self {
1000 trigger: Box::new(trigger),
1001 strategy: Arc::new(strategy),
1002 backend: None,
1003 metadata: MetadataMap::new(),
1004 }
1005 }
1006
1007 pub fn builder() -> StrategyCompactorBuilder {
1009 StrategyCompactorBuilder::default()
1010 }
1011
1012 pub fn with_backend(mut self, backend: impl CompactionBackend + 'static) -> Self {
1015 self.backend = Some(Arc::new(backend));
1016 self
1017 }
1018
1019 pub fn with_shared_backend(mut self, backend: Arc<dyn CompactionBackend>) -> Self {
1022 self.backend = Some(backend);
1023 self
1024 }
1025
1026 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
1028 self.metadata = metadata;
1029 self
1030 }
1031}
1032
1033#[async_trait]
1034impl Compactor for StrategyCompactor {
1035 fn should_compact(
1036 &self,
1037 transcript: &[Item],
1038 point: MutationPoint,
1039 ) -> Option<CompactionReason> {
1040 (self.trigger)(transcript, point)
1041 }
1042
1043 async fn compact(
1044 &self,
1045 transcript: &[Item],
1046 reason: CompactionReason,
1047 cancellation: Option<TurnCancellation>,
1048 ) -> Result<Vec<Item>, CompactionError> {
1049 let request = CompactionRequest {
1050 transcript: transcript.to_vec(),
1051 reason,
1052 metadata: self.metadata.clone(),
1053 };
1054 let mut ctx = CompactionContext {
1055 backend: self.backend.as_deref(),
1056 cancellation,
1057 };
1058 let result = self.strategy.apply(request, &mut ctx).await?;
1059 Ok(result.transcript)
1060 }
1061}
1062
1063#[derive(Debug, Error)]
1065pub enum StrategyCompactorBuildError {
1066 #[error("trigger is required")]
1068 MissingTrigger,
1069 #[error("strategy is required")]
1071 MissingStrategy,
1072}
1073
1074#[derive(Default)]
1076pub struct StrategyCompactorBuilder {
1077 trigger: Option<TriggerFn>,
1078 strategy: Option<Arc<dyn CompactionStrategy>>,
1079 backend: Option<Arc<dyn CompactionBackend>>,
1080 metadata: MetadataMap,
1081}
1082
1083impl StrategyCompactorBuilder {
1084 pub fn trigger<T>(mut self, trigger: T) -> Self
1086 where
1087 T: Fn(&[Item], MutationPoint) -> Option<CompactionReason> + Send + Sync + 'static,
1088 {
1089 self.trigger = Some(Box::new(trigger));
1090 self
1091 }
1092
1093 pub fn item_count_trigger(self, max_items: usize) -> Self {
1095 self.trigger(move |transcript: &[Item], _point| {
1096 (transcript.len() > max_items).then_some(CompactionReason::TranscriptTooLong)
1097 })
1098 }
1099
1100 pub fn strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
1102 self.strategy = Some(Arc::new(strategy));
1103 self
1104 }
1105
1106 pub fn backend(mut self, backend: impl CompactionBackend + 'static) -> Self {
1108 self.backend = Some(Arc::new(backend));
1109 self
1110 }
1111
1112 pub fn shared_backend(mut self, backend: Arc<dyn CompactionBackend>) -> Self {
1114 self.backend = Some(backend);
1115 self
1116 }
1117
1118 pub fn metadata(mut self, metadata: MetadataMap) -> Self {
1120 self.metadata = metadata;
1121 self
1122 }
1123
1124 pub fn build(self) -> Result<StrategyCompactor, StrategyCompactorBuildError> {
1126 Ok(StrategyCompactor {
1127 trigger: self
1128 .trigger
1129 .ok_or(StrategyCompactorBuildError::MissingTrigger)?,
1130 strategy: self
1131 .strategy
1132 .ok_or(StrategyCompactorBuildError::MissingStrategy)?,
1133 backend: self.backend,
1134 metadata: self.metadata,
1135 })
1136 }
1137}
1138
1139const DEFAULT_COMPACTION_PROMPT: &str = "You are a compaction agent. Compress the \
1140transcript that follows into a durable context note for an assistant that has lost the \
1141original messages. Preserve every named person, every year and date, every place, every \
1142decision the assistant committed to, every tool the assistant invoked, and every \
1143actionable fact in the tool results. Drop chatter, narration, and chain-of-thought. \
1144Return only the compacted note as plain text.";
1145
1146pub fn context_window_trigger(window: u64, percent: u32) -> TriggerFn {
1155 let percent = percent.clamp(1, 100);
1156 let threshold = window.saturating_mul(percent as u64) / 100;
1157 Box::new(move |transcript: &[Item], point: MutationPoint| {
1158 if point != MutationPoint::AfterTurnEnded {
1159 return None;
1160 }
1161 let last_input = transcript
1162 .iter()
1163 .rev()
1164 .find_map(|i| i.usage.as_ref()?.tokens.as_ref().map(|t| t.input_tokens))?;
1165 (last_input >= threshold).then(|| {
1166 CompactionReason::Custom(format!(
1167 "input_tokens={last_input} >= threshold={threshold} (window={window}, {percent}%)",
1168 ))
1169 })
1170 })
1171}
1172
1173pub fn item_count_trigger(max_items: usize) -> TriggerFn {
1177 Box::new(move |transcript: &[Item], _point: MutationPoint| {
1178 (transcript.len() > max_items).then_some(CompactionReason::TranscriptTooLong)
1179 })
1180}
1181
1182#[derive(Debug, Error)]
1184pub enum AgentCompactorBuildError {
1185 #[error("agent is required")]
1187 MissingAgent,
1188 #[error("session_id is required")]
1190 MissingSessionId,
1191}
1192
1193pub struct AgentCompactor<M: ModelAdapter + Clone + 'static> {
1201 inner: Arc<Agent<M>>,
1202 session_id: SessionId,
1203 system_prompt: String,
1204}
1205
1206impl<M: ModelAdapter + Clone + 'static> AgentCompactor<M> {
1207 pub fn builder() -> AgentCompactorBuilder<M> {
1209 AgentCompactorBuilder::new()
1210 }
1211}
1212
1213pub struct AgentCompactorBuilder<M: ModelAdapter + Clone + 'static> {
1215 agent: Option<Arc<Agent<M>>>,
1216 session_id: Option<SessionId>,
1217 system_prompt: Option<String>,
1218}
1219
1220impl<M: ModelAdapter + Clone + 'static> AgentCompactorBuilder<M> {
1221 fn new() -> Self {
1222 Self {
1223 agent: None,
1224 session_id: None,
1225 system_prompt: None,
1226 }
1227 }
1228
1229 pub fn agent(mut self, agent: Arc<Agent<M>>) -> Self {
1231 self.agent = Some(agent);
1232 self
1233 }
1234
1235 pub fn session_id(mut self, id: SessionId) -> Self {
1237 self.session_id = Some(id);
1238 self
1239 }
1240
1241 pub fn system_prompt(mut self, s: impl Into<String>) -> Self {
1243 self.system_prompt = Some(s.into());
1244 self
1245 }
1246
1247 pub fn build(self) -> Result<AgentCompactor<M>, AgentCompactorBuildError> {
1249 Ok(AgentCompactor {
1250 inner: self.agent.ok_or(AgentCompactorBuildError::MissingAgent)?,
1251 session_id: self
1252 .session_id
1253 .ok_or(AgentCompactorBuildError::MissingSessionId)?,
1254 system_prompt: self
1255 .system_prompt
1256 .unwrap_or_else(|| DEFAULT_COMPACTION_PROMPT.into()),
1257 })
1258 }
1259}
1260
1261#[async_trait]
1262impl<M: ModelAdapter + Clone + 'static> CompactionBackend for AgentCompactor<M> {
1263 async fn summarize(
1264 &self,
1265 request: SummaryRequest,
1266 cancellation: Option<TurnCancellation>,
1267 ) -> Result<SummaryResult, CompactionError> {
1268 if cancellation
1269 .as_ref()
1270 .is_some_and(TurnCancellation::is_cancelled)
1271 {
1272 return Err(CompactionError::Cancelled);
1273 }
1274
1275 let rendered = render_items_for_summary(&request.items);
1276
1277 let driver_input = vec![
1278 Item::text(ItemKind::System, self.system_prompt.clone()),
1279 Item::text(
1280 ItemKind::User,
1281 format!(
1282 "Compress the transcript below into a durable context note. \
1283 Preserve names, places, dates, decisions, and tool outcomes.\n\n{rendered}"
1284 ),
1285 ),
1286 ];
1287
1288 let mut driver = self
1289 .inner
1290 .start(SessionConfig::new(self.session_id.clone()))
1291 .await
1292 .map_err(|e| CompactionError::Failed(e.to_string()))?;
1293 driver
1294 .submit_input(driver_input)
1295 .map_err(|e| CompactionError::Failed(e.to_string()))?;
1296
1297 let summary = run_compactor_to_completion(&mut driver)
1298 .await
1299 .map_err(CompactionError::Failed)?;
1300
1301 Ok(SummaryResult {
1302 items: vec![Item::text(ItemKind::Context, summary)],
1303 metadata: MetadataMap::new(),
1304 })
1305 }
1306}
1307
1308async fn run_compactor_to_completion<S>(
1309 driver: &mut agentkit_loop::LoopDriver<S>,
1310) -> Result<String, String>
1311where
1312 S: agentkit_loop::ModelSession,
1313{
1314 use agentkit_loop::LoopInterrupt;
1315 loop {
1316 let step = driver.next().await.map_err(|e| e.to_string())?;
1317 match step {
1318 LoopStep::Finished(result) => {
1319 let mut sections = Vec::new();
1320 for item in result.items {
1321 if item.kind != ItemKind::Assistant {
1322 continue;
1323 }
1324 for part in item.parts {
1325 if let Part::Text(t) = part {
1326 sections.push(t.text);
1327 }
1328 }
1329 }
1330 return Ok(sections.join("\n"));
1331 }
1332 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
1333 LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {
1334 return Err("compactor sub-agent unexpectedly awaiting input".into());
1335 }
1336 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(_)) => {
1337 return Err("compactor sub-agent unexpectedly required approval".into());
1338 }
1339 }
1340 }
1341}
1342
1343fn render_items_for_summary(items: &[Item]) -> String {
1344 items
1345 .iter()
1346 .map(|item| {
1347 let kind = match item.kind {
1348 ItemKind::User => "USER",
1349 ItemKind::Assistant => "ASSISTANT",
1350 ItemKind::System => "SYSTEM",
1351 ItemKind::Developer => "DEVELOPER",
1352 ItemKind::Tool => "TOOL",
1353 ItemKind::Context => "CONTEXT",
1354 ItemKind::Notification => "NOTIFICATION",
1355 };
1356 let body = item
1357 .parts
1358 .iter()
1359 .filter_map(|p| match p {
1360 Part::Text(t) => Some(t.text.clone()),
1361 Part::Structured(v) => Some(v.value.to_string()),
1362 _ => None,
1363 })
1364 .collect::<Vec<_>>()
1365 .join("\n");
1366 format!("[{kind}]\n{body}")
1367 })
1368 .collect::<Vec<_>>()
1369 .join("\n\n")
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374 use agentkit_core::{
1375 CancellationController, Part, TextPart, ToolCallPart, ToolOutput, ToolResultPart,
1376 };
1377
1378 use super::*;
1379
1380 fn user_item(text: &str) -> Item {
1381 Item {
1382 id: None,
1383 kind: ItemKind::User,
1384 parts: vec![Part::Text(TextPart {
1385 text: text.into(),
1386 metadata: MetadataMap::new(),
1387 })],
1388 metadata: MetadataMap::new(),
1389 usage: None,
1390 finish_reason: None,
1391 created_at: None,
1392 }
1393 }
1394
1395 fn assistant_with_reasoning() -> Item {
1396 Item {
1397 id: None,
1398 kind: ItemKind::Assistant,
1399 parts: vec![
1400 Part::Reasoning(agentkit_core::ReasoningPart {
1401 summary: Some("think".into()),
1402 data: None,
1403 redacted: false,
1404 metadata: MetadataMap::new(),
1405 }),
1406 Part::Text(TextPart {
1407 text: "answer".into(),
1408 metadata: MetadataMap::new(),
1409 }),
1410 ],
1411 metadata: MetadataMap::new(),
1412 usage: None,
1413 finish_reason: None,
1414 created_at: None,
1415 }
1416 }
1417
1418 fn failed_tool_item() -> Item {
1419 Item {
1420 id: None,
1421 kind: ItemKind::Tool,
1422 parts: vec![Part::ToolResult(ToolResultPart {
1423 call_id: "call-1".into(),
1424 output: ToolOutput::Text("failed".into()),
1425 is_error: true,
1426 metadata: MetadataMap::new(),
1427 })],
1428 metadata: MetadataMap::new(),
1429 usage: None,
1430 finish_reason: None,
1431 created_at: None,
1432 }
1433 }
1434
1435 fn tool_call_item(id: &str) -> Item {
1436 Item {
1437 id: None,
1438 kind: ItemKind::Assistant,
1439 parts: vec![Part::ToolCall(ToolCallPart {
1440 id: id.into(),
1441 name: "lookup".into(),
1442 input: serde_json::json!({}),
1443 metadata: MetadataMap::new(),
1444 })],
1445 metadata: MetadataMap::new(),
1446 usage: None,
1447 finish_reason: None,
1448 created_at: None,
1449 }
1450 }
1451
1452 fn tool_result_item(id: &str, is_error: bool) -> Item {
1453 Item {
1454 id: None,
1455 kind: ItemKind::Tool,
1456 parts: vec![Part::ToolResult(ToolResultPart {
1457 call_id: id.into(),
1458 output: ToolOutput::Text("result".into()),
1459 is_error,
1460 metadata: MetadataMap::new(),
1461 })],
1462 metadata: MetadataMap::new(),
1463 usage: None,
1464 finish_reason: None,
1465 created_at: None,
1466 }
1467 }
1468
1469 #[tokio::test]
1470 async fn pipeline_applies_local_strategies_in_order() {
1471 let request = CompactionRequest {
1472 transcript: vec![
1473 user_item("a"),
1474 assistant_with_reasoning(),
1475 failed_tool_item(),
1476 user_item("b"),
1477 user_item("c"),
1478 ],
1479 reason: CompactionReason::TranscriptTooLong,
1480 metadata: MetadataMap::new(),
1481 };
1482 let pipeline = CompactionPipeline::new()
1483 .with_strategy(DropReasoningStrategy::new())
1484 .with_strategy(DropFailedToolResultsStrategy::new())
1485 .with_strategy(
1486 KeepRecentStrategy::new(2)
1487 .preserve_kind(ItemKind::System)
1488 .preserve_kind(ItemKind::Context),
1489 );
1490 let mut ctx = CompactionContext {
1491 backend: None,
1492 cancellation: None,
1493 };
1494
1495 let result = pipeline.apply(request, &mut ctx).await.unwrap();
1496 assert_eq!(result.transcript.len(), 2);
1497 assert!(result.replaced_items >= 2);
1498 assert!(result.transcript.iter().all(|item| {
1499 item.parts
1500 .iter()
1501 .all(|part| !matches!(part, Part::Reasoning(_)))
1502 }));
1503 }
1504
1505 #[tokio::test]
1506 async fn keep_recent_preserves_tool_call_result_pairs() {
1507 let request = CompactionRequest {
1508 transcript: vec![
1509 user_item("old"),
1510 tool_call_item("call-1"),
1511 tool_result_item("call-1", false),
1512 user_item("recent"),
1513 ],
1514 reason: CompactionReason::TranscriptTooLong,
1515 metadata: MetadataMap::new(),
1516 };
1517 let strategy = KeepRecentStrategy::new(2);
1518 let mut ctx = CompactionContext {
1519 backend: None,
1520 cancellation: None,
1521 };
1522
1523 let result = strategy.apply(request, &mut ctx).await.unwrap();
1524 assert_eq!(result.replaced_items, 1);
1525 assert_eq!(result.transcript.len(), 3);
1526 assert!(matches!(result.transcript[0].parts[0], Part::ToolCall(_)));
1527 assert!(matches!(result.transcript[1].parts[0], Part::ToolResult(_)));
1528 }
1529
1530 #[tokio::test]
1531 async fn failed_tool_result_removal_drops_matching_tool_call() {
1532 let request = CompactionRequest {
1533 transcript: vec![
1534 tool_call_item("call-1"),
1535 tool_result_item("call-1", true),
1536 user_item("recent"),
1537 ],
1538 reason: CompactionReason::TranscriptTooLong,
1539 metadata: MetadataMap::new(),
1540 };
1541 let strategy = DropFailedToolResultsStrategy::new();
1542 let mut ctx = CompactionContext {
1543 backend: None,
1544 cancellation: None,
1545 };
1546
1547 let result = strategy.apply(request, &mut ctx).await.unwrap();
1548 assert_eq!(result.replaced_items, 2);
1549 assert_eq!(result.transcript.len(), 1);
1550 assert!(matches!(result.transcript[0].kind, ItemKind::User));
1551 }
1552
1553 struct FakeBackend;
1554
1555 #[async_trait]
1556 impl CompactionBackend for FakeBackend {
1557 async fn summarize(
1558 &self,
1559 request: SummaryRequest,
1560 _cancellation: Option<TurnCancellation>,
1561 ) -> Result<SummaryResult, CompactionError> {
1562 Ok(SummaryResult {
1563 items: vec![Item {
1564 id: None,
1565 kind: ItemKind::Context,
1566 parts: vec![Part::Text(TextPart {
1567 text: format!("summary of {} items", request.items.len()),
1568 metadata: MetadataMap::new(),
1569 })],
1570 metadata: MetadataMap::new(),
1571 usage: None,
1572 finish_reason: None,
1573 created_at: None,
1574 }],
1575 metadata: MetadataMap::new(),
1576 })
1577 }
1578 }
1579
1580 #[tokio::test]
1581 async fn summarize_strategy_uses_backend() {
1582 let request = CompactionRequest {
1583 transcript: vec![user_item("a"), user_item("b"), user_item("c")],
1584 reason: CompactionReason::TranscriptTooLong,
1585 metadata: MetadataMap::new(),
1586 };
1587 let strategy = SummarizeOlderStrategy::new(1);
1588 let mut ctx = CompactionContext {
1589 backend: Some(&FakeBackend),
1590 cancellation: None,
1591 };
1592
1593 let result = strategy.apply(request, &mut ctx).await.unwrap();
1594 assert_eq!(result.replaced_items, 2);
1595 assert_eq!(result.transcript.len(), 2);
1596 match &result.transcript[0].parts[0] {
1597 Part::Text(text) => assert_eq!(text.text, "summary of 2 items"),
1598 other => panic!("unexpected part: {other:?}"),
1599 }
1600 }
1601
1602 #[tokio::test]
1603 async fn summarize_strategy_preserves_tool_call_result_pairs() {
1604 let request = CompactionRequest {
1605 transcript: vec![
1606 user_item("old"),
1607 tool_call_item("call-1"),
1608 tool_result_item("call-1", false),
1609 user_item("recent"),
1610 ],
1611 reason: CompactionReason::TranscriptTooLong,
1612 metadata: MetadataMap::new(),
1613 };
1614 let strategy = SummarizeOlderStrategy::new(2);
1615 let mut ctx = CompactionContext {
1616 backend: Some(&FakeBackend),
1617 cancellation: None,
1618 };
1619
1620 let result = strategy.apply(request, &mut ctx).await.unwrap();
1621 assert_eq!(result.replaced_items, 1);
1622 assert_eq!(result.transcript.len(), 4);
1623 match &result.transcript[0].parts[0] {
1624 Part::Text(text) => assert_eq!(text.text, "summary of 1 items"),
1625 other => panic!("unexpected part: {other:?}"),
1626 }
1627 assert!(matches!(result.transcript[1].parts[0], Part::ToolCall(_)));
1628 assert!(matches!(result.transcript[2].parts[0], Part::ToolResult(_)));
1629 }
1630
1631 #[tokio::test]
1632 async fn pipeline_stops_when_cancelled() {
1633 let controller = CancellationController::new();
1634 let checkpoint = controller.handle().checkpoint();
1635 controller.interrupt();
1636 let request = CompactionRequest {
1637 transcript: vec![user_item("a"), user_item("b"), user_item("c")],
1638 reason: CompactionReason::TranscriptTooLong,
1639 metadata: MetadataMap::new(),
1640 };
1641 let pipeline = CompactionPipeline::new().with_strategy(DropReasoningStrategy::new());
1642 let mut ctx = CompactionContext {
1643 backend: None,
1644 cancellation: Some(checkpoint),
1645 };
1646
1647 let error = pipeline.apply(request, &mut ctx).await.unwrap_err();
1648 assert!(matches!(error, CompactionError::Cancelled));
1649 }
1650}