1use std::collections::{BTreeSet, HashMap};
24use std::sync::Arc;
25
26use agentkit_core::{Item, ItemKind, MetadataMap, Part, SessionId, TurnCancellation};
27use agentkit_loop::{
28 Agent, AgentBuilder, AgentEvent, LoopCtx, LoopError, LoopMutator, LoopStep, ModelAdapter,
29 MutationPoint, SessionConfig, TranscriptCursor,
30};
31use async_trait::async_trait;
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
40pub enum CompactionReason {
41 TranscriptTooLong,
43 Manual,
45 Custom(String),
47}
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
52pub struct CompactionRequest {
53 pub transcript: Vec<Item>,
55 pub reason: CompactionReason,
57 pub metadata: MetadataMap,
59}
60
61impl CompactionRequest {
62 pub fn new(transcript: Vec<Item>, reason: CompactionReason) -> Self {
64 Self {
65 transcript,
66 reason,
67 metadata: MetadataMap::new(),
68 }
69 }
70
71 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
73 self.metadata = metadata;
74 self
75 }
76}
77
78#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
82pub struct CompactionResult {
83 pub transcript: Vec<Item>,
85 pub replaced_items: usize,
87 pub metadata: MetadataMap,
89}
90
91impl CompactionResult {
92 pub fn new(transcript: Vec<Item>, replaced_items: usize) -> Self {
94 Self {
95 transcript,
96 replaced_items,
97 metadata: MetadataMap::new(),
98 }
99 }
100
101 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
103 self.metadata = metadata;
104 self
105 }
106}
107
108#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
111pub struct SummaryRequest {
112 pub items: Vec<Item>,
114 pub reason: CompactionReason,
116 pub metadata: MetadataMap,
118}
119
120impl SummaryRequest {
121 pub fn new(items: Vec<Item>, reason: CompactionReason) -> Self {
123 Self {
124 items,
125 reason,
126 metadata: MetadataMap::new(),
127 }
128 }
129
130 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
132 self.metadata = metadata;
133 self
134 }
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct SummaryResult {
140 pub items: Vec<Item>,
142 pub metadata: MetadataMap,
144}
145
146impl SummaryResult {
147 pub fn new(items: Vec<Item>) -> Self {
149 Self {
150 items,
151 metadata: MetadataMap::new(),
152 }
153 }
154
155 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
157 self.metadata = metadata;
158 self
159 }
160}
161
162#[async_trait]
174pub trait CompactionBackend: Send + Sync {
175 async fn summarize(
187 &self,
188 request: SummaryRequest,
189 cancellation: Option<TurnCancellation>,
190 ) -> Result<SummaryResult, CompactionError>;
191}
192
193#[async_trait]
201pub trait Compactor: Send + Sync {
202 fn should_compact(&self, transcript: &[Item], point: MutationPoint)
205 -> Option<CompactionReason>;
206
207 async fn compact(
211 &self,
212 transcript: &[Item],
213 reason: CompactionReason,
214 cancellation: Option<TurnCancellation>,
215 ) -> Result<Vec<Item>, CompactionError>;
216}
217
218pub struct CompactionContext<'a> {
224 pub backend: Option<&'a dyn CompactionBackend>,
227 pub cancellation: Option<TurnCancellation>,
230}
231
232impl<'a> CompactionContext<'a> {
233 pub fn new() -> Self {
235 Self {
236 backend: None,
237 cancellation: None,
238 }
239 }
240
241 pub fn with_backend(mut self, backend: &'a dyn CompactionBackend) -> Self {
243 self.backend = Some(backend);
244 self
245 }
246
247 pub fn with_cancellation(mut self, cancellation: TurnCancellation) -> Self {
249 self.cancellation = Some(cancellation);
250 self
251 }
252}
253
254impl Default for CompactionContext<'_> {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260#[async_trait]
286pub trait CompactionStrategy: Send + Sync {
287 async fn apply(
299 &self,
300 request: CompactionRequest,
301 ctx: &mut CompactionContext<'_>,
302 ) -> Result<CompactionResult, CompactionError>;
303}
304
305#[derive(Clone, Default)]
336pub struct CompactionPipeline {
337 strategies: Vec<Arc<dyn CompactionStrategy>>,
338}
339
340impl CompactionPipeline {
341 pub fn new() -> Self {
343 Self::default()
344 }
345
346 pub fn with_strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
350 self.strategies.push(Arc::new(strategy));
351 self
352 }
353}
354
355#[async_trait]
356impl CompactionStrategy for CompactionPipeline {
357 async fn apply(
358 &self,
359 mut request: CompactionRequest,
360 ctx: &mut CompactionContext<'_>,
361 ) -> Result<CompactionResult, CompactionError> {
362 let mut replaced_items = 0;
363 let mut metadata = MetadataMap::new();
364
365 for strategy in &self.strategies {
366 if ctx
367 .cancellation
368 .as_ref()
369 .is_some_and(TurnCancellation::is_cancelled)
370 {
371 return Err(CompactionError::Cancelled);
372 }
373 let result = strategy.apply(request.clone(), ctx).await?;
374 request.transcript = result.transcript;
375 replaced_items += result.replaced_items;
376 metadata.extend(result.metadata);
377 }
378
379 Ok(CompactionResult {
380 transcript: request.transcript,
381 replaced_items,
382 metadata,
383 })
384 }
385}
386
387#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
407pub struct DropReasoningStrategy {
408 drop_empty_items: bool,
409}
410
411impl DropReasoningStrategy {
412 pub fn new() -> Self {
415 Self {
416 drop_empty_items: true,
417 }
418 }
419
420 pub fn drop_empty_items(mut self, value: bool) -> Self {
425 self.drop_empty_items = value;
426 self
427 }
428}
429
430#[async_trait]
431impl CompactionStrategy for DropReasoningStrategy {
432 async fn apply(
433 &self,
434 request: CompactionRequest,
435 _ctx: &mut CompactionContext<'_>,
436 ) -> Result<CompactionResult, CompactionError> {
437 let mut transcript = Vec::with_capacity(request.transcript.len());
438 let mut replaced_items = 0;
439
440 for mut item in request.transcript {
441 let original_len = item.parts.len();
442 item.parts
443 .retain(|part| !matches!(part, Part::Reasoning(_)));
444 let changed = item.parts.len() != original_len;
445 if item.parts.is_empty() && self.drop_empty_items {
446 if changed {
447 replaced_items += 1;
448 }
449 continue;
450 }
451 if changed {
452 replaced_items += 1;
453 }
454 transcript.push(item);
455 }
456
457 Ok(CompactionResult {
458 transcript,
459 replaced_items,
460 metadata: MetadataMap::new(),
461 })
462 }
463}
464
465#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
486pub struct DropFailedToolResultsStrategy {
487 drop_empty_items: bool,
488}
489
490impl DropFailedToolResultsStrategy {
491 pub fn new() -> Self {
494 Self {
495 drop_empty_items: true,
496 }
497 }
498
499 pub fn drop_empty_items(mut self, value: bool) -> Self {
504 self.drop_empty_items = value;
505 self
506 }
507}
508
509#[async_trait]
510impl CompactionStrategy for DropFailedToolResultsStrategy {
511 async fn apply(
512 &self,
513 request: CompactionRequest,
514 _ctx: &mut CompactionContext<'_>,
515 ) -> Result<CompactionResult, CompactionError> {
516 let failed_call_ids = request
517 .transcript
518 .iter()
519 .flat_map(|item| item.parts.iter())
520 .filter_map(|part| match part {
521 Part::ToolResult(result) if result.is_error => Some(result.call_id.clone()),
522 _ => None,
523 })
524 .collect::<BTreeSet<_>>();
525 let mut transcript = Vec::with_capacity(request.transcript.len());
526 let mut replaced_items = 0;
527
528 for mut item in request.transcript {
529 let original_len = item.parts.len();
530 item.parts.retain(|part| {
531 !matches!(part, Part::ToolResult(result) if result.is_error)
532 && !matches!(part, Part::ToolCall(call) if failed_call_ids.contains(&call.id))
533 });
534 let changed = item.parts.len() != original_len;
535 if item.parts.is_empty() && self.drop_empty_items {
536 if changed {
537 replaced_items += 1;
538 }
539 continue;
540 }
541 if changed {
542 replaced_items += 1;
543 }
544 transcript.push(item);
545 }
546
547 Ok(CompactionResult {
548 transcript,
549 replaced_items,
550 metadata: MetadataMap::new(),
551 })
552 }
553}
554
555#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
573pub struct KeepRecentStrategy {
574 keep_last: usize,
575 preserve_kinds: BTreeSet<ItemKind>,
576}
577
578impl KeepRecentStrategy {
579 pub fn new(keep_last: usize) -> Self {
581 Self {
582 keep_last,
583 preserve_kinds: BTreeSet::new(),
584 }
585 }
586
587 pub fn preserve_kind(mut self, kind: ItemKind) -> Self {
590 self.preserve_kinds.insert(kind);
591 self
592 }
593}
594
595#[async_trait]
596impl CompactionStrategy for KeepRecentStrategy {
597 async fn apply(
598 &self,
599 request: CompactionRequest,
600 _ctx: &mut CompactionContext<'_>,
601 ) -> Result<CompactionResult, CompactionError> {
602 let removable = removable_indices(&request.transcript, &self.preserve_kinds);
603 if removable.len() <= self.keep_last {
604 return Ok(CompactionResult {
605 transcript: request.transcript,
606 replaced_items: 0,
607 metadata: MetadataMap::new(),
608 });
609 }
610
611 let keep_indices = removable
612 .iter()
613 .skip(removable.len() - self.keep_last)
614 .copied()
615 .collect::<BTreeSet<_>>();
616 let keep_indices =
617 expand_indices_to_tool_pairs(&request.transcript, keep_indices, &self.preserve_kinds);
618 let replaced_items = removable
619 .iter()
620 .filter(|index| !keep_indices.contains(index))
621 .count();
622 let transcript = request
623 .transcript
624 .into_iter()
625 .enumerate()
626 .filter_map(|(index, item)| {
627 (self.preserve_kinds.contains(&item.kind) || keep_indices.contains(&index))
628 .then_some(item)
629 })
630 .collect::<Vec<_>>();
631
632 Ok(CompactionResult {
633 transcript,
634 replaced_items,
635 metadata: MetadataMap::new(),
636 })
637 }
638}
639
640#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
661pub struct SummarizeOlderStrategy {
662 keep_last: usize,
663 preserve_kinds: BTreeSet<ItemKind>,
664}
665
666impl SummarizeOlderStrategy {
667 pub fn new(keep_last: usize) -> Self {
670 Self {
671 keep_last,
672 preserve_kinds: BTreeSet::new(),
673 }
674 }
675
676 pub fn preserve_kind(mut self, kind: ItemKind) -> Self {
679 self.preserve_kinds.insert(kind);
680 self
681 }
682}
683
684#[async_trait]
685impl CompactionStrategy for SummarizeOlderStrategy {
686 async fn apply(
687 &self,
688 request: CompactionRequest,
689 ctx: &mut CompactionContext<'_>,
690 ) -> Result<CompactionResult, CompactionError> {
691 let Some(backend) = ctx.backend else {
692 return Err(CompactionError::MissingBackend(
693 "summarize strategy requires a compaction backend".into(),
694 ));
695 };
696
697 let removable = removable_indices(&request.transcript, &self.preserve_kinds);
698 if removable.len() <= self.keep_last {
699 return Ok(CompactionResult {
700 transcript: request.transcript,
701 replaced_items: 0,
702 metadata: MetadataMap::new(),
703 });
704 }
705
706 let keep_indices = removable
707 .iter()
708 .skip(removable.len() - self.keep_last)
709 .copied()
710 .collect::<BTreeSet<_>>();
711 let keep_indices =
712 expand_indices_to_tool_pairs(&request.transcript, keep_indices, &self.preserve_kinds);
713 let summary_indices = removable
714 .iter()
715 .copied()
716 .filter(|index| !keep_indices.contains(index))
717 .collect::<Vec<_>>();
718 if summary_indices.is_empty() {
719 return Ok(CompactionResult {
720 transcript: request.transcript,
721 replaced_items: 0,
722 metadata: MetadataMap::new(),
723 });
724 }
725 let first_summary_index = summary_indices[0];
726 let summary_index_set = summary_indices.iter().copied().collect::<BTreeSet<_>>();
727 let summary_items = summary_indices
728 .iter()
729 .map(|index| request.transcript[*index].clone())
730 .collect::<Vec<_>>();
731 let summary = backend
732 .summarize(
733 SummaryRequest {
734 items: summary_items,
735 reason: request.reason.clone(),
736 metadata: request.metadata.clone(),
737 },
738 ctx.cancellation.clone(),
739 )
740 .await?;
741
742 let mut transcript = Vec::new();
743 let mut inserted_summary = false;
744 let mut summary_output = Some(summary.items);
745 for (index, item) in request.transcript.into_iter().enumerate() {
746 if summary_index_set.contains(&index) {
747 if !inserted_summary && index == first_summary_index {
748 transcript.extend(summary_output.take().unwrap_or_default());
749 inserted_summary = true;
750 }
751 continue;
752 }
753 transcript.push(item);
754 }
755
756 Ok(CompactionResult {
757 transcript,
758 replaced_items: summary_indices.len(),
759 metadata: summary.metadata,
760 })
761 }
762}
763
764fn removable_indices(transcript: &[Item], preserve_kinds: &BTreeSet<ItemKind>) -> Vec<usize> {
765 transcript
766 .iter()
767 .enumerate()
768 .filter_map(|(index, item)| (!preserve_kinds.contains(&item.kind)).then_some(index))
769 .collect()
770}
771
772fn expand_indices_to_tool_pairs(
773 transcript: &[Item],
774 mut keep_indices: BTreeSet<usize>,
775 preserve_kinds: &BTreeSet<ItemKind>,
776) -> BTreeSet<usize> {
777 keep_indices.extend(
778 transcript
779 .iter()
780 .enumerate()
781 .filter_map(|(index, item)| preserve_kinds.contains(&item.kind).then_some(index)),
782 );
783
784 let mut calls = HashMap::new();
785 let mut results: HashMap<_, Vec<usize>> = HashMap::new();
786 for (index, item) in transcript.iter().enumerate() {
787 for part in &item.parts {
788 match part {
789 Part::ToolCall(call) => {
790 calls.entry(call.id.clone()).or_insert(index);
791 }
792 Part::ToolResult(result) => {
793 results
794 .entry(result.call_id.clone())
795 .or_default()
796 .push(index);
797 }
798 _ => {}
799 }
800 }
801 }
802
803 loop {
804 let before_len = keep_indices.len();
805 for (call_id, call_index) in &calls {
806 if keep_indices.contains(call_index)
807 && let Some(result_indices) = results.get(call_id)
808 {
809 keep_indices.extend(result_indices.iter().copied());
810 }
811 }
812 for (call_id, result_indices) in &results {
813 if result_indices
814 .iter()
815 .any(|result_index| keep_indices.contains(result_index))
816 && let Some(call_index) = calls.get(call_id)
817 {
818 keep_indices.insert(*call_index);
819 }
820 }
821 if keep_indices.len() == before_len {
822 break;
823 }
824 }
825
826 keep_indices
827}
828
829#[derive(Debug, Error)]
831pub enum CompactionError {
832 #[error("compaction cancelled")]
834 Cancelled,
835 #[error("missing compaction backend: {0}")]
838 MissingBackend(String),
839 #[error("compaction failed: {0}")]
841 Failed(String),
842}
843
844pub struct CompactorMutator<C> {
854 compactor: C,
855 name: String,
856}
857
858impl<C: Compactor> CompactorMutator<C> {
859 pub fn new(compactor: C) -> Self {
861 Self {
862 compactor,
863 name: "compactor".into(),
864 }
865 }
866
867 pub fn with_name(mut self, name: impl Into<String>) -> Self {
870 self.name = name.into();
871 self
872 }
873}
874
875#[async_trait]
876impl<C: Compactor + 'static> LoopMutator for CompactorMutator<C> {
877 async fn mutate(
878 &self,
879 cursor: &mut TranscriptCursor<'_>,
880 ctx: LoopCtx<'_>,
881 ) -> Result<(), LoopError> {
882 let Some(reason) = self.compactor.should_compact(cursor.as_slice(), ctx.point) else {
883 return Ok(());
884 };
885
886 ctx.emitter.emit(AgentEvent::MutationStarted {
887 session_id: ctx.session_id.clone(),
888 turn_id: ctx.turn_id.cloned(),
889 mutator: self.name.clone(),
890 point: ctx.point,
891 });
892
893 let before_len = cursor.len();
894 let result = self
895 .compactor
896 .compact(cursor.as_slice(), reason.clone(), ctx.cancellation.clone())
897 .await;
898
899 let mut metadata = MetadataMap::new();
900 metadata.insert("reason".into(), format!("{reason:?}").into());
901
902 match result {
903 Ok(new_items) => {
904 let replaced = before_len.saturating_sub(new_items.len());
905 metadata.insert("replaced_items".into(), (replaced as u64).into());
906 **cursor = new_items;
907 ctx.emitter.emit(AgentEvent::MutationFinished {
908 session_id: ctx.session_id.clone(),
909 turn_id: ctx.turn_id.cloned(),
910 mutator: self.name.clone(),
911 dirty: true,
912 metadata,
913 });
914 Ok(())
915 }
916 Err(err) => {
917 metadata.insert("error".into(), err.to_string().into());
918 ctx.emitter.emit(AgentEvent::MutationFinished {
919 session_id: ctx.session_id.clone(),
920 turn_id: ctx.turn_id.cloned(),
921 mutator: self.name.clone(),
922 dirty: false,
923 metadata,
924 });
925 match err {
926 CompactionError::Cancelled => Err(LoopError::Cancelled),
927 other => Err(LoopError::Mutator(other.to_string())),
928 }
929 }
930 }
931 }
932}
933
934pub trait AgentBuilderCompactorExt<M: ModelAdapter>: Sized {
938 fn compactor<C: Compactor + 'static>(self, compactor: C) -> Self;
940}
941
942impl<M: ModelAdapter> AgentBuilderCompactorExt<M> for AgentBuilder<M> {
943 fn compactor<C: Compactor + 'static>(self, compactor: C) -> Self {
944 self.mutator(CompactorMutator::new(compactor))
945 }
946}
947
948pub type TriggerFn = Box<dyn Fn(&[Item], MutationPoint) -> Option<CompactionReason> + Send + Sync>;
952
953pub struct StrategyCompactor {
982 trigger: TriggerFn,
983 strategy: Arc<dyn CompactionStrategy>,
984 backend: Option<Arc<dyn CompactionBackend>>,
985 metadata: MetadataMap,
986}
987
988impl StrategyCompactor {
989 pub fn new<T, S>(trigger: T, strategy: S) -> Self
994 where
995 T: Fn(&[Item], MutationPoint) -> Option<CompactionReason> + Send + Sync + 'static,
996 S: CompactionStrategy + 'static,
997 {
998 Self {
999 trigger: Box::new(trigger),
1000 strategy: Arc::new(strategy),
1001 backend: None,
1002 metadata: MetadataMap::new(),
1003 }
1004 }
1005
1006 pub fn builder() -> StrategyCompactorBuilder {
1008 StrategyCompactorBuilder::default()
1009 }
1010
1011 pub fn with_backend(mut self, backend: impl CompactionBackend + 'static) -> Self {
1014 self.backend = Some(Arc::new(backend));
1015 self
1016 }
1017
1018 pub fn with_shared_backend(mut self, backend: Arc<dyn CompactionBackend>) -> Self {
1021 self.backend = Some(backend);
1022 self
1023 }
1024
1025 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
1027 self.metadata = metadata;
1028 self
1029 }
1030}
1031
1032#[async_trait]
1033impl Compactor for StrategyCompactor {
1034 fn should_compact(
1035 &self,
1036 transcript: &[Item],
1037 point: MutationPoint,
1038 ) -> Option<CompactionReason> {
1039 (self.trigger)(transcript, point)
1040 }
1041
1042 async fn compact(
1043 &self,
1044 transcript: &[Item],
1045 reason: CompactionReason,
1046 cancellation: Option<TurnCancellation>,
1047 ) -> Result<Vec<Item>, CompactionError> {
1048 let request = CompactionRequest {
1049 transcript: transcript.to_vec(),
1050 reason,
1051 metadata: self.metadata.clone(),
1052 };
1053 let mut ctx = CompactionContext {
1054 backend: self.backend.as_deref(),
1055 cancellation,
1056 };
1057 let result = self.strategy.apply(request, &mut ctx).await?;
1058 Ok(result.transcript)
1059 }
1060}
1061
1062#[derive(Debug, Error)]
1064pub enum StrategyCompactorBuildError {
1065 #[error("trigger is required")]
1067 MissingTrigger,
1068 #[error("strategy is required")]
1070 MissingStrategy,
1071}
1072
1073#[derive(Default)]
1075pub struct StrategyCompactorBuilder {
1076 trigger: Option<TriggerFn>,
1077 strategy: Option<Arc<dyn CompactionStrategy>>,
1078 backend: Option<Arc<dyn CompactionBackend>>,
1079 metadata: MetadataMap,
1080}
1081
1082impl StrategyCompactorBuilder {
1083 pub fn trigger<T>(mut self, trigger: T) -> Self
1085 where
1086 T: Fn(&[Item], MutationPoint) -> Option<CompactionReason> + Send + Sync + 'static,
1087 {
1088 self.trigger = Some(Box::new(trigger));
1089 self
1090 }
1091
1092 pub fn item_count_trigger(self, max_items: usize) -> Self {
1094 self.trigger(move |transcript: &[Item], _point| {
1095 (transcript.len() > max_items).then_some(CompactionReason::TranscriptTooLong)
1096 })
1097 }
1098
1099 pub fn strategy(mut self, strategy: impl CompactionStrategy + 'static) -> Self {
1101 self.strategy = Some(Arc::new(strategy));
1102 self
1103 }
1104
1105 pub fn backend(mut self, backend: impl CompactionBackend + 'static) -> Self {
1107 self.backend = Some(Arc::new(backend));
1108 self
1109 }
1110
1111 pub fn shared_backend(mut self, backend: Arc<dyn CompactionBackend>) -> Self {
1113 self.backend = Some(backend);
1114 self
1115 }
1116
1117 pub fn metadata(mut self, metadata: MetadataMap) -> Self {
1119 self.metadata = metadata;
1120 self
1121 }
1122
1123 pub fn build(self) -> Result<StrategyCompactor, StrategyCompactorBuildError> {
1125 Ok(StrategyCompactor {
1126 trigger: self
1127 .trigger
1128 .ok_or(StrategyCompactorBuildError::MissingTrigger)?,
1129 strategy: self
1130 .strategy
1131 .ok_or(StrategyCompactorBuildError::MissingStrategy)?,
1132 backend: self.backend,
1133 metadata: self.metadata,
1134 })
1135 }
1136}
1137
1138const DEFAULT_COMPACTION_PROMPT: &str = "You are a compaction agent. Compress the \
1139transcript that follows into a durable context note for an assistant that has lost the \
1140original messages. Preserve every named person, every year and date, every place, every \
1141decision the assistant committed to, every tool the assistant invoked, and every \
1142actionable fact in the tool results. Drop chatter, narration, and chain-of-thought. \
1143Return only the compacted note as plain text.";
1144
1145pub fn context_window_trigger(window: u64, percent: u32) -> TriggerFn {
1154 let percent = percent.clamp(1, 100);
1155 let threshold = window.saturating_mul(percent as u64) / 100;
1156 Box::new(move |transcript: &[Item], point: MutationPoint| {
1157 if point != MutationPoint::AfterTurnEnded {
1158 return None;
1159 }
1160 let last_input = transcript
1161 .iter()
1162 .rev()
1163 .find_map(|i| i.usage.as_ref()?.tokens.as_ref().map(|t| t.input_tokens))?;
1164 (last_input >= threshold).then(|| {
1165 CompactionReason::Custom(format!(
1166 "input_tokens={last_input} >= threshold={threshold} (window={window}, {percent}%)",
1167 ))
1168 })
1169 })
1170}
1171
1172pub fn item_count_trigger(max_items: usize) -> TriggerFn {
1176 Box::new(move |transcript: &[Item], _point: MutationPoint| {
1177 (transcript.len() > max_items).then_some(CompactionReason::TranscriptTooLong)
1178 })
1179}
1180
1181#[derive(Debug, Error)]
1183pub enum AgentCompactorBuildError {
1184 #[error("agent is required")]
1186 MissingAgent,
1187 #[error("session_id is required")]
1189 MissingSessionId,
1190}
1191
1192pub struct AgentCompactor<M: ModelAdapter + Clone + 'static> {
1200 inner: Arc<Agent<M>>,
1201 session_id: SessionId,
1202 system_prompt: String,
1203}
1204
1205impl<M: ModelAdapter + Clone + 'static> AgentCompactor<M> {
1206 pub fn builder() -> AgentCompactorBuilder<M> {
1208 AgentCompactorBuilder::new()
1209 }
1210}
1211
1212pub struct AgentCompactorBuilder<M: ModelAdapter + Clone + 'static> {
1214 agent: Option<Arc<Agent<M>>>,
1215 session_id: Option<SessionId>,
1216 system_prompt: Option<String>,
1217}
1218
1219impl<M: ModelAdapter + Clone + 'static> AgentCompactorBuilder<M> {
1220 fn new() -> Self {
1221 Self {
1222 agent: None,
1223 session_id: None,
1224 system_prompt: None,
1225 }
1226 }
1227
1228 pub fn agent(mut self, agent: Arc<Agent<M>>) -> Self {
1230 self.agent = Some(agent);
1231 self
1232 }
1233
1234 pub fn session_id(mut self, id: SessionId) -> Self {
1236 self.session_id = Some(id);
1237 self
1238 }
1239
1240 pub fn system_prompt(mut self, s: impl Into<String>) -> Self {
1242 self.system_prompt = Some(s.into());
1243 self
1244 }
1245
1246 pub fn build(self) -> Result<AgentCompactor<M>, AgentCompactorBuildError> {
1248 Ok(AgentCompactor {
1249 inner: self.agent.ok_or(AgentCompactorBuildError::MissingAgent)?,
1250 session_id: self
1251 .session_id
1252 .ok_or(AgentCompactorBuildError::MissingSessionId)?,
1253 system_prompt: self
1254 .system_prompt
1255 .unwrap_or_else(|| DEFAULT_COMPACTION_PROMPT.into()),
1256 })
1257 }
1258}
1259
1260#[async_trait]
1261impl<M: ModelAdapter + Clone + 'static> CompactionBackend for AgentCompactor<M> {
1262 async fn summarize(
1263 &self,
1264 request: SummaryRequest,
1265 cancellation: Option<TurnCancellation>,
1266 ) -> Result<SummaryResult, CompactionError> {
1267 if cancellation
1268 .as_ref()
1269 .is_some_and(TurnCancellation::is_cancelled)
1270 {
1271 return Err(CompactionError::Cancelled);
1272 }
1273
1274 let rendered = render_items_for_summary(&request.items);
1275
1276 let driver_input = vec![
1277 Item::text(ItemKind::System, self.system_prompt.clone()),
1278 Item::text(
1279 ItemKind::User,
1280 format!(
1281 "Compress the transcript below into a durable context note. \
1282 Preserve names, places, dates, decisions, and tool outcomes.\n\n{rendered}"
1283 ),
1284 ),
1285 ];
1286
1287 let mut driver = self
1288 .inner
1289 .start(SessionConfig::new(self.session_id.clone()))
1290 .await
1291 .map_err(|e| CompactionError::Failed(e.to_string()))?;
1292 driver
1293 .submit_input(driver_input)
1294 .map_err(|e| CompactionError::Failed(e.to_string()))?;
1295
1296 let summary = run_compactor_to_completion(&mut driver)
1297 .await
1298 .map_err(CompactionError::Failed)?;
1299
1300 Ok(SummaryResult {
1301 items: vec![Item::text(ItemKind::Context, summary)],
1302 metadata: MetadataMap::new(),
1303 })
1304 }
1305}
1306
1307async fn run_compactor_to_completion<S>(
1308 driver: &mut agentkit_loop::LoopDriver<S>,
1309) -> Result<String, String>
1310where
1311 S: agentkit_loop::ModelSession,
1312{
1313 use agentkit_loop::LoopInterrupt;
1314 loop {
1315 let step = driver.next().await.map_err(|e| e.to_string())?;
1316 match step {
1317 LoopStep::Finished(result) => {
1318 let mut sections = Vec::new();
1319 for item in result.items {
1320 if item.kind != ItemKind::Assistant {
1321 continue;
1322 }
1323 for part in item.parts {
1324 if let Part::Text(t) = part {
1325 sections.push(t.text);
1326 }
1327 }
1328 }
1329 return Ok(sections.join("\n"));
1330 }
1331 LoopStep::Interrupt(LoopInterrupt::AfterToolResult(_)) => continue,
1332 LoopStep::Interrupt(LoopInterrupt::AwaitingInput(_)) => {
1333 return Err("compactor sub-agent unexpectedly awaiting input".into());
1334 }
1335 LoopStep::Interrupt(LoopInterrupt::ApprovalRequest(_)) => {
1336 return Err("compactor sub-agent unexpectedly required approval".into());
1337 }
1338 }
1339 }
1340}
1341
1342fn render_items_for_summary(items: &[Item]) -> String {
1343 items
1344 .iter()
1345 .map(|item| {
1346 let kind = match item.kind {
1347 ItemKind::User => "USER",
1348 ItemKind::Assistant => "ASSISTANT",
1349 ItemKind::System => "SYSTEM",
1350 ItemKind::Developer => "DEVELOPER",
1351 ItemKind::Tool => "TOOL",
1352 ItemKind::Context => "CONTEXT",
1353 ItemKind::Notification => "NOTIFICATION",
1354 };
1355 let body = item
1356 .parts
1357 .iter()
1358 .filter_map(|p| match p {
1359 Part::Text(t) => Some(t.text.clone()),
1360 Part::Structured(v) => Some(v.value.to_string()),
1361 _ => None,
1362 })
1363 .collect::<Vec<_>>()
1364 .join("\n");
1365 format!("[{kind}]\n{body}")
1366 })
1367 .collect::<Vec<_>>()
1368 .join("\n\n")
1369}
1370
1371#[cfg(test)]
1372mod tests {
1373 use agentkit_core::{
1374 CancellationController, Part, TextPart, ToolCallPart, ToolOutput, ToolResultPart,
1375 };
1376
1377 use super::*;
1378
1379 fn user_item(text: &str) -> Item {
1380 Item {
1381 id: None,
1382 kind: ItemKind::User,
1383 parts: vec![Part::Text(TextPart {
1384 text: text.into(),
1385 metadata: MetadataMap::new(),
1386 })],
1387 metadata: MetadataMap::new(),
1388 usage: None,
1389 finish_reason: None,
1390 created_at: None,
1391 }
1392 }
1393
1394 fn assistant_with_reasoning() -> Item {
1395 Item {
1396 id: None,
1397 kind: ItemKind::Assistant,
1398 parts: vec![
1399 Part::Reasoning(agentkit_core::ReasoningPart {
1400 summary: Some("think".into()),
1401 data: None,
1402 redacted: false,
1403 metadata: MetadataMap::new(),
1404 }),
1405 Part::Text(TextPart {
1406 text: "answer".into(),
1407 metadata: MetadataMap::new(),
1408 }),
1409 ],
1410 metadata: MetadataMap::new(),
1411 usage: None,
1412 finish_reason: None,
1413 created_at: None,
1414 }
1415 }
1416
1417 fn failed_tool_item() -> Item {
1418 Item {
1419 id: None,
1420 kind: ItemKind::Tool,
1421 parts: vec![Part::ToolResult(ToolResultPart {
1422 call_id: "call-1".into(),
1423 output: ToolOutput::Text("failed".into()),
1424 is_error: true,
1425 metadata: MetadataMap::new(),
1426 })],
1427 metadata: MetadataMap::new(),
1428 usage: None,
1429 finish_reason: None,
1430 created_at: None,
1431 }
1432 }
1433
1434 fn tool_call_item(id: &str) -> Item {
1435 Item {
1436 id: None,
1437 kind: ItemKind::Assistant,
1438 parts: vec![Part::ToolCall(ToolCallPart {
1439 id: id.into(),
1440 name: "lookup".into(),
1441 input: serde_json::json!({}),
1442 metadata: MetadataMap::new(),
1443 })],
1444 metadata: MetadataMap::new(),
1445 usage: None,
1446 finish_reason: None,
1447 created_at: None,
1448 }
1449 }
1450
1451 fn tool_result_item(id: &str, is_error: bool) -> Item {
1452 Item {
1453 id: None,
1454 kind: ItemKind::Tool,
1455 parts: vec![Part::ToolResult(ToolResultPart {
1456 call_id: id.into(),
1457 output: ToolOutput::Text("result".into()),
1458 is_error,
1459 metadata: MetadataMap::new(),
1460 })],
1461 metadata: MetadataMap::new(),
1462 usage: None,
1463 finish_reason: None,
1464 created_at: None,
1465 }
1466 }
1467
1468 #[tokio::test]
1469 async fn pipeline_applies_local_strategies_in_order() {
1470 let request = CompactionRequest {
1471 transcript: vec![
1472 user_item("a"),
1473 assistant_with_reasoning(),
1474 failed_tool_item(),
1475 user_item("b"),
1476 user_item("c"),
1477 ],
1478 reason: CompactionReason::TranscriptTooLong,
1479 metadata: MetadataMap::new(),
1480 };
1481 let pipeline = CompactionPipeline::new()
1482 .with_strategy(DropReasoningStrategy::new())
1483 .with_strategy(DropFailedToolResultsStrategy::new())
1484 .with_strategy(
1485 KeepRecentStrategy::new(2)
1486 .preserve_kind(ItemKind::System)
1487 .preserve_kind(ItemKind::Context),
1488 );
1489 let mut ctx = CompactionContext {
1490 backend: None,
1491 cancellation: None,
1492 };
1493
1494 let result = pipeline.apply(request, &mut ctx).await.unwrap();
1495 assert_eq!(result.transcript.len(), 2);
1496 assert!(result.replaced_items >= 2);
1497 assert!(result.transcript.iter().all(|item| {
1498 item.parts
1499 .iter()
1500 .all(|part| !matches!(part, Part::Reasoning(_)))
1501 }));
1502 }
1503
1504 #[tokio::test]
1505 async fn keep_recent_preserves_tool_call_result_pairs() {
1506 let request = CompactionRequest {
1507 transcript: vec![
1508 user_item("old"),
1509 tool_call_item("call-1"),
1510 tool_result_item("call-1", false),
1511 user_item("recent"),
1512 ],
1513 reason: CompactionReason::TranscriptTooLong,
1514 metadata: MetadataMap::new(),
1515 };
1516 let strategy = KeepRecentStrategy::new(2);
1517 let mut ctx = CompactionContext {
1518 backend: None,
1519 cancellation: None,
1520 };
1521
1522 let result = strategy.apply(request, &mut ctx).await.unwrap();
1523 assert_eq!(result.replaced_items, 1);
1524 assert_eq!(result.transcript.len(), 3);
1525 assert!(matches!(result.transcript[0].parts[0], Part::ToolCall(_)));
1526 assert!(matches!(result.transcript[1].parts[0], Part::ToolResult(_)));
1527 }
1528
1529 #[tokio::test]
1530 async fn failed_tool_result_removal_drops_matching_tool_call() {
1531 let request = CompactionRequest {
1532 transcript: vec![
1533 tool_call_item("call-1"),
1534 tool_result_item("call-1", true),
1535 user_item("recent"),
1536 ],
1537 reason: CompactionReason::TranscriptTooLong,
1538 metadata: MetadataMap::new(),
1539 };
1540 let strategy = DropFailedToolResultsStrategy::new();
1541 let mut ctx = CompactionContext {
1542 backend: None,
1543 cancellation: None,
1544 };
1545
1546 let result = strategy.apply(request, &mut ctx).await.unwrap();
1547 assert_eq!(result.replaced_items, 2);
1548 assert_eq!(result.transcript.len(), 1);
1549 assert!(matches!(result.transcript[0].kind, ItemKind::User));
1550 }
1551
1552 struct FakeBackend;
1553
1554 #[async_trait]
1555 impl CompactionBackend for FakeBackend {
1556 async fn summarize(
1557 &self,
1558 request: SummaryRequest,
1559 _cancellation: Option<TurnCancellation>,
1560 ) -> Result<SummaryResult, CompactionError> {
1561 Ok(SummaryResult {
1562 items: vec![Item {
1563 id: None,
1564 kind: ItemKind::Context,
1565 parts: vec![Part::Text(TextPart {
1566 text: format!("summary of {} items", request.items.len()),
1567 metadata: MetadataMap::new(),
1568 })],
1569 metadata: MetadataMap::new(),
1570 usage: None,
1571 finish_reason: None,
1572 created_at: None,
1573 }],
1574 metadata: MetadataMap::new(),
1575 })
1576 }
1577 }
1578
1579 #[tokio::test]
1580 async fn summarize_strategy_uses_backend() {
1581 let request = CompactionRequest {
1582 transcript: vec![user_item("a"), user_item("b"), user_item("c")],
1583 reason: CompactionReason::TranscriptTooLong,
1584 metadata: MetadataMap::new(),
1585 };
1586 let strategy = SummarizeOlderStrategy::new(1);
1587 let mut ctx = CompactionContext {
1588 backend: Some(&FakeBackend),
1589 cancellation: None,
1590 };
1591
1592 let result = strategy.apply(request, &mut ctx).await.unwrap();
1593 assert_eq!(result.replaced_items, 2);
1594 assert_eq!(result.transcript.len(), 2);
1595 match &result.transcript[0].parts[0] {
1596 Part::Text(text) => assert_eq!(text.text, "summary of 2 items"),
1597 other => panic!("unexpected part: {other:?}"),
1598 }
1599 }
1600
1601 #[tokio::test]
1602 async fn summarize_strategy_preserves_tool_call_result_pairs() {
1603 let request = CompactionRequest {
1604 transcript: vec![
1605 user_item("old"),
1606 tool_call_item("call-1"),
1607 tool_result_item("call-1", false),
1608 user_item("recent"),
1609 ],
1610 reason: CompactionReason::TranscriptTooLong,
1611 metadata: MetadataMap::new(),
1612 };
1613 let strategy = SummarizeOlderStrategy::new(2);
1614 let mut ctx = CompactionContext {
1615 backend: Some(&FakeBackend),
1616 cancellation: None,
1617 };
1618
1619 let result = strategy.apply(request, &mut ctx).await.unwrap();
1620 assert_eq!(result.replaced_items, 1);
1621 assert_eq!(result.transcript.len(), 4);
1622 match &result.transcript[0].parts[0] {
1623 Part::Text(text) => assert_eq!(text.text, "summary of 1 items"),
1624 other => panic!("unexpected part: {other:?}"),
1625 }
1626 assert!(matches!(result.transcript[1].parts[0], Part::ToolCall(_)));
1627 assert!(matches!(result.transcript[2].parts[0], Part::ToolResult(_)));
1628 }
1629
1630 #[tokio::test]
1631 async fn pipeline_stops_when_cancelled() {
1632 let controller = CancellationController::new();
1633 let checkpoint = controller.handle().checkpoint();
1634 controller.interrupt();
1635 let request = CompactionRequest {
1636 transcript: vec![user_item("a"), user_item("b"), user_item("c")],
1637 reason: CompactionReason::TranscriptTooLong,
1638 metadata: MetadataMap::new(),
1639 };
1640 let pipeline = CompactionPipeline::new().with_strategy(DropReasoningStrategy::new());
1641 let mut ctx = CompactionContext {
1642 backend: None,
1643 cancellation: Some(checkpoint),
1644 };
1645
1646 let error = pipeline.apply(request, &mut ctx).await.unwrap_err();
1647 assert!(matches!(error, CompactionError::Cancelled));
1648 }
1649}