use std::collections::BTreeMap;
use std::rc::Rc;
use serde_json::Value as JsonValue;
use crate::agent_events::AgentEvent;
use crate::llm::api::LlmCallOptions;
use crate::llm::helpers::{
emit_reminder_lifecycle_event, normalize_transcript_asset, reminder_from_event,
reminder_lifecycle_payload, replace_reminder_payload, SystemReminder,
REMINDER_DEDUPED_EVENT_KIND, REMINDER_EXPIRED_EVENT_KIND,
};
use crate::value::{VmError, VmValue};
use super::{
auto_compact_messages, compact_strategy_name, compaction_policy_metadata_fields,
estimate_message_tokens, parse_compact_strategy, run_lifecycle_hooks,
run_lifecycle_hooks_with_control, AutoCompactConfig, CompactStrategy, CompactionPolicy,
HookControl, HookEvent,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum CompactMode {
Manual,
Host,
Auto,
Workflow,
Worker,
ResumeDigest,
}
impl CompactMode {
pub fn as_str(self) -> &'static str {
match self {
CompactMode::Manual => "manual",
CompactMode::Host => "host",
CompactMode::Auto => "auto",
CompactMode::Workflow => "workflow",
CompactMode::Worker => "worker",
CompactMode::ResumeDigest => "resume_digest",
}
}
pub fn fires_hooks(self) -> bool {
match self {
CompactMode::Manual | CompactMode::Host | CompactMode::Auto => true,
CompactMode::Workflow | CompactMode::Worker | CompactMode::ResumeDigest => false,
}
}
}
pub struct CompactLifecycle<'a> {
pub session_id: Option<&'a str>,
pub transcript_id: Option<&'a str>,
pub mode: CompactMode,
pub reminder_events: Vec<VmValue>,
pub summary_override: Option<String>,
pub provider_options: JsonValue,
pub source_transcript: Option<&'a VmValue>,
pub evaluate_providers: bool,
}
impl<'a> CompactLifecycle<'a> {
pub fn new(mode: CompactMode) -> Self {
Self {
session_id: None,
transcript_id: None,
mode,
reminder_events: Vec::new(),
summary_override: None,
provider_options: JsonValue::Object(serde_json::Map::new()),
source_transcript: None,
evaluate_providers: true,
}
}
pub fn with_session_id(mut self, session_id: Option<&'a str>) -> Self {
self.session_id = session_id;
self
}
pub fn with_transcript_id(mut self, transcript_id: Option<&'a str>) -> Self {
self.transcript_id = transcript_id;
self
}
pub fn with_reminder_events(mut self, events: Vec<VmValue>) -> Self {
self.reminder_events = events;
self
}
pub fn with_summary_override(mut self, summary: Option<String>) -> Self {
self.summary_override = summary;
self
}
pub fn with_provider_options(mut self, options: JsonValue) -> Self {
self.provider_options = options;
self
}
pub fn with_source_transcript(mut self, transcript: Option<&'a VmValue>) -> Self {
self.source_transcript = transcript;
self
}
pub fn with_evaluate_providers(mut self, evaluate: bool) -> Self {
self.evaluate_providers = evaluate;
self
}
}
pub struct CompactionOutcome {
pub summary: String,
pub archived_messages: usize,
pub estimated_tokens_before: usize,
pub estimated_tokens_after: usize,
pub reminder_report: ReminderCompactReport,
pub snapshot_asset: Option<VmValue>,
pub snapshot_asset_id: Option<String>,
pub strategy: CompactStrategy,
pub policy_strategy: String,
pub event_metadata: JsonValue,
}
#[derive(Debug, Default)]
pub struct ReminderCompactReport {
pub preserved_events: Vec<VmValue>,
pub custom_reminders: Vec<VmValue>,
pub expired: Vec<SystemReminder>,
pub compacted: Vec<SystemReminder>,
pub deduped: Vec<ReminderDedupeRecord>,
pub decremented_count: usize,
pub preserved_count: usize,
}
#[derive(Clone, Debug)]
pub struct ReminderDedupeRecord {
pub replaced_id: String,
pub replacing_id: String,
pub dedupe_key: String,
}
pub(crate) async fn run_compaction_lifecycle(
messages: &mut Vec<JsonValue>,
config: &mut AutoCompactConfig,
llm_opts: Option<&LlmCallOptions>,
mut lifecycle: CompactLifecycle<'_>,
) -> Result<Option<CompactionOutcome>, VmError> {
let reminder_events = std::mem::take(&mut lifecycle.reminder_events);
let estimated_tokens_before = estimate_message_tokens(messages);
let original_message_count = messages.len();
let fires_hooks = lifecycle.mode.fires_hooks();
if fires_hooks {
let pre_payload = build_hook_payload(
HookEvent::PreCompact,
&lifecycle,
config,
HookPayloadStage::Pre {
message_count: original_message_count,
estimated_tokens_before,
},
);
match run_lifecycle_hooks_with_control(HookEvent::PreCompact, &pre_payload).await? {
HookControl::Block { .. } => return Ok(None),
HookControl::Modify { payload } => apply_pre_modify_overrides(config, &payload)?,
HookControl::Allow | HookControl::Decision { .. } => {}
}
}
let reminder_report = compact_reminder_events(reminder_events);
config.custom_compactor_reminders = reminder_report.custom_reminders.clone();
let Some(raw_summary) = auto_compact_messages(messages, config, llm_opts).await? else {
return Ok(None);
};
let summary = lifecycle.summary_override.clone().unwrap_or(raw_summary);
if fires_hooks {
emit_reminder_lifecycle_records(lifecycle.transcript_id, &reminder_report);
}
let estimated_tokens_after = estimate_message_tokens(messages);
let archived_messages = original_message_count
.saturating_sub(messages.len())
.saturating_add(1);
let snapshot_asset = lifecycle.source_transcript.map(|transcript| {
build_snapshot_asset(
transcript,
config,
archived_messages,
estimated_tokens_before,
estimated_tokens_after,
)
});
let snapshot_asset_id = snapshot_asset.as_ref().map(snapshot_asset_id_of);
let event_metadata = build_event_metadata(
&lifecycle,
config,
archived_messages,
estimated_tokens_before,
estimated_tokens_after,
snapshot_asset_id.as_deref(),
&reminder_report,
&summary,
);
if fires_hooks {
let post_payload = build_hook_payload(
HookEvent::PostCompact,
&lifecycle,
config,
HookPayloadStage::Post {
original_message_count,
remaining_messages: messages.len(),
archived_messages,
estimated_tokens_before,
estimated_tokens_after,
summary: &summary,
snapshot_asset_id: snapshot_asset_id.as_deref(),
reminder_report: &reminder_report,
},
);
run_lifecycle_hooks(HookEvent::PostCompact, &post_payload).await?;
if let Some(session_id) = lifecycle.session_id {
emit_transcript_compacted_event(
session_id,
lifecycle.mode,
config,
archived_messages,
estimated_tokens_before,
estimated_tokens_after,
snapshot_asset_id.as_deref(),
)
.await;
if lifecycle.evaluate_providers {
let _ = crate::llm::reminder_providers::evaluate_and_inject(
HookEvent::PostCompact,
session_id,
post_payload,
lifecycle.provider_options.clone(),
)
.await;
}
}
}
Ok(Some(CompactionOutcome {
summary,
archived_messages,
estimated_tokens_before,
estimated_tokens_after,
reminder_report,
snapshot_asset,
snapshot_asset_id,
strategy: config.compact_strategy.clone(),
policy_strategy: config.policy_strategy.clone(),
event_metadata,
}))
}
pub async fn emit_transcript_compacted_event(
session_id: &str,
mode: CompactMode,
config: &AutoCompactConfig,
archived_messages: usize,
estimated_tokens_before: usize,
estimated_tokens_after: usize,
snapshot_asset_id: Option<&str>,
) {
crate::llm::emit_live_agent_event(&AgentEvent::TranscriptCompacted {
session_id: session_id.to_string(),
mode: mode.as_str().to_string(),
strategy: config.policy_strategy.clone(),
archived_messages,
estimated_tokens_before,
estimated_tokens_after,
snapshot_asset_id: snapshot_asset_id.map(str::to_string),
instruction_mode: Some(config.policy.instruction_mode().to_string()),
instruction_source: config.policy.instruction_source().map(str::to_string),
compaction_policy: config.policy.metadata_json(),
})
.await;
}
pub fn emit_transcript_compacted_event_sync(
session_id: &str,
mode: CompactMode,
policy: &CompactionPolicy,
policy_strategy: String,
archived_messages: usize,
estimated_tokens_before: usize,
estimated_tokens_after: usize,
snapshot_asset_id: Option<String>,
) {
crate::llm::emit_live_agent_event_sync(&AgentEvent::TranscriptCompacted {
session_id: session_id.to_string(),
mode: mode.as_str().to_string(),
strategy: policy_strategy,
archived_messages,
estimated_tokens_before,
estimated_tokens_after,
snapshot_asset_id,
instruction_mode: Some(policy.instruction_mode().to_string()),
instruction_source: policy.instruction_source().map(str::to_string),
compaction_policy: policy.metadata_json(),
});
}
enum HookPayloadStage<'a> {
Pre {
message_count: usize,
estimated_tokens_before: usize,
},
Post {
original_message_count: usize,
remaining_messages: usize,
archived_messages: usize,
estimated_tokens_before: usize,
estimated_tokens_after: usize,
summary: &'a str,
snapshot_asset_id: Option<&'a str>,
reminder_report: &'a ReminderCompactReport,
},
}
fn build_hook_payload(
event: HookEvent,
lifecycle: &CompactLifecycle<'_>,
config: &AutoCompactConfig,
stage: HookPayloadStage<'_>,
) -> JsonValue {
let session_id = lifecycle.session_id.unwrap_or_default();
let strategy = compact_strategy_name(&config.compact_strategy);
let mut payload = serde_json::json!({
"event": event.as_str(),
"session": {"id": session_id},
"session_id": session_id,
"mode": lifecycle.mode.as_str(),
"strategy": strategy,
"engine_strategy": strategy,
"keep_last": config.keep_last,
"target_tokens": serde_json::Value::Null,
});
if config.token_threshold > 0 {
payload["target_tokens"] = serde_json::json!(config.token_threshold);
}
let Some(map) = payload.as_object_mut() else {
return payload;
};
for (key, value) in compaction_policy_metadata_fields(&config.policy) {
map.insert(key.to_string(), value);
}
match stage {
HookPayloadStage::Pre {
message_count,
estimated_tokens_before,
} => {
map.insert(
"message_count".to_string(),
serde_json::json!(message_count),
);
map.insert(
"estimated_tokens_before".to_string(),
serde_json::json!(estimated_tokens_before),
);
}
HookPayloadStage::Post {
original_message_count,
remaining_messages,
archived_messages,
estimated_tokens_before,
estimated_tokens_after,
summary,
snapshot_asset_id,
reminder_report,
} => {
map.insert(
"message_count".to_string(),
serde_json::json!(original_message_count),
);
map.insert(
"remaining_messages".to_string(),
serde_json::json!(remaining_messages),
);
map.insert(
"archived_messages".to_string(),
serde_json::json!(archived_messages),
);
map.insert(
"estimated_tokens_before".to_string(),
serde_json::json!(estimated_tokens_before),
);
map.insert(
"estimated_tokens_after".to_string(),
serde_json::json!(estimated_tokens_after),
);
map.insert("summary".to_string(), serde_json::json!(summary));
map.insert(
"new_summary_len".to_string(),
serde_json::json!(summary.len()),
);
if let Some(id) = snapshot_asset_id {
map.insert("snapshot_asset_id".to_string(), serde_json::json!(id));
}
map.insert(
"reminders_decremented".to_string(),
serde_json::json!(reminder_report.decremented_count),
);
map.insert(
"reminders_expired".to_string(),
serde_json::json!(reminder_report.expired.len()),
);
map.insert(
"reminders_deduped".to_string(),
serde_json::json!(reminder_report.deduped.len()),
);
map.insert(
"reminders_preserved".to_string(),
serde_json::json!(reminder_report.preserved_count),
);
}
}
payload
}
fn apply_pre_modify_overrides(
config: &mut AutoCompactConfig,
payload: &JsonValue,
) -> Result<(), VmError> {
let Some(map) = payload.as_object() else {
return Ok(());
};
if let Some(value) = map.get("keep_last").and_then(JsonValue::as_u64) {
config.keep_last = value as usize;
}
if let Some(value) = map.get("target_tokens").and_then(JsonValue::as_u64) {
config.token_threshold = value as usize;
config.hard_limit_tokens = Some(value as usize);
}
if let Some(value) = map.get("strategy").or_else(|| map.get("engine_strategy")) {
if let Some(name) = value.as_str() {
let strategy = parse_compact_strategy(name)?;
config.policy_strategy = compact_strategy_name(&strategy).to_string();
config.compact_strategy = strategy;
}
}
Ok(())
}
fn build_event_metadata(
lifecycle: &CompactLifecycle<'_>,
config: &AutoCompactConfig,
archived_messages: usize,
estimated_tokens_before: usize,
estimated_tokens_after: usize,
snapshot_asset_id: Option<&str>,
reminder_report: &ReminderCompactReport,
summary: &str,
) -> JsonValue {
let mut metadata = serde_json::json!({
"mode": lifecycle.mode.as_str(),
"strategy": config.policy_strategy,
"engine_strategy": compact_strategy_name(&config.compact_strategy),
"keep_last": config.keep_last,
"target_tokens": (config.token_threshold > 0).then_some(config.token_threshold),
"archived_messages": archived_messages,
"estimated_tokens_before": estimated_tokens_before,
"estimated_tokens_after": estimated_tokens_after,
"new_summary_len": summary.len(),
"snapshot_asset_id": snapshot_asset_id,
"reminders_decremented": reminder_report.decremented_count,
"reminders_expired": reminder_report.expired.len(),
"reminders_deduped": reminder_report.deduped.len(),
"reminders_preserved": reminder_report.preserved_count,
});
if let Some(map) = metadata.as_object_mut() {
for (key, value) in compaction_policy_metadata_fields(&config.policy) {
map.insert(key.to_string(), value);
}
}
metadata
}
enum CompactEvent {
Other(VmValue),
Reminder {
event: VmValue,
reminder: SystemReminder,
reminder_index: usize,
},
}
pub fn compact_reminder_events(extra_events: Vec<VmValue>) -> ReminderCompactReport {
let mut events = Vec::with_capacity(extra_events.len());
let mut reminders = Vec::new();
let mut expired = Vec::new();
let mut decremented_count = 0;
for event in extra_events {
let Some(reminder) = reminder_from_event(&event) else {
events.push(CompactEvent::Other(event));
continue;
};
let (event, reminder) = match reminder.ttl_turns {
Some(ttl) if ttl <= 1 => {
expired.push(reminder);
continue;
}
Some(ttl) => {
let mut updated = reminder;
updated.ttl_turns = Some(ttl - 1);
decremented_count += 1;
(replace_reminder_payload(&event, &updated), updated)
}
None => (event, reminder),
};
let reminder_index = reminders.len();
reminders.push(reminder.clone());
events.push(CompactEvent::Reminder {
event,
reminder,
reminder_index,
});
}
let mut newest_by_dedupe_key = BTreeMap::new();
for (index, reminder) in reminders.iter().enumerate() {
if let Some(dedupe_key) = reminder.dedupe_key.as_deref() {
newest_by_dedupe_key.insert(dedupe_key.to_string(), index);
}
}
let mut kept_reminders = Vec::new();
let mut preserved_events = Vec::new();
let mut compacted = Vec::new();
let mut deduped = Vec::new();
let mut preserved_count = 0;
for event in events {
match event {
CompactEvent::Other(event) => preserved_events.push(event),
CompactEvent::Reminder {
event,
reminder,
reminder_index,
} => {
let keep = reminder
.dedupe_key
.as_deref()
.and_then(|key| newest_by_dedupe_key.get(key))
.is_none_or(|newest| *newest == reminder_index);
if !keep {
let replacing_id = reminder
.dedupe_key
.as_deref()
.and_then(|key| newest_by_dedupe_key.get(key))
.and_then(|index| reminders.get(*index))
.map(|newest| newest.id.clone())
.unwrap_or_default();
deduped.push(ReminderDedupeRecord {
replaced_id: reminder.id.clone(),
replacing_id,
dedupe_key: reminder.dedupe_key.clone().unwrap_or_default(),
});
continue;
}
kept_reminders.push(crate::stdlib::json_to_vm_value(
&serde_json::to_value(&reminder).unwrap_or(JsonValue::Null),
));
if reminder.preserve_on_compact {
preserved_count += 1;
preserved_events.push(event);
} else {
compacted.push(reminder);
}
}
}
}
ReminderCompactReport {
preserved_events,
custom_reminders: kept_reminders,
expired,
compacted,
deduped,
decremented_count,
preserved_count,
}
}
fn emit_reminder_lifecycle_records(transcript_id: Option<&str>, report: &ReminderCompactReport) {
for reminder in &report.expired {
let mut payload = reminder_lifecycle_payload(transcript_id, reminder);
if let Some(obj) = payload.as_object_mut() {
obj.insert(
"transcript_id".to_string(),
serde_json::json!(transcript_id),
);
obj.insert("reason".to_string(), JsonValue::String("ttl".to_string()));
obj.insert(
"ttl_turns_before".to_string(),
serde_json::json!(reminder.ttl_turns),
);
obj.insert("expired_at_turn".to_string(), JsonValue::Null);
obj.insert(
"expired_at_boundary".to_string(),
JsonValue::String("pre_compact".to_string()),
);
obj.insert(
"phase".to_string(),
JsonValue::String("pre_compact".to_string()),
);
}
emit_reminder_lifecycle_event(REMINDER_EXPIRED_EVENT_KIND, payload);
}
for reminder in &report.compacted {
let mut payload = reminder_lifecycle_payload(transcript_id, reminder);
if let Some(obj) = payload.as_object_mut() {
obj.insert(
"transcript_id".to_string(),
serde_json::json!(transcript_id),
);
obj.insert(
"reason".to_string(),
JsonValue::String("compaction".to_string()),
);
obj.insert(
"expired_at_boundary".to_string(),
JsonValue::String("pre_compact".to_string()),
);
obj.insert(
"phase".to_string(),
JsonValue::String("pre_compact".to_string()),
);
}
emit_reminder_lifecycle_event(REMINDER_EXPIRED_EVENT_KIND, payload);
}
if !report.deduped.is_empty() {
let dropped_reminder_ids = report
.deduped
.iter()
.map(|record| record.replaced_id.clone())
.collect::<Vec<_>>();
emit_reminder_lifecycle_event(
REMINDER_DEDUPED_EVENT_KIND,
serde_json::json!({
"transcript_id": transcript_id,
"boundary": "pre_compact",
"replaced_id": report.deduped.first().map(|record| &record.replaced_id),
"replacing_id": report.deduped.first().map(|record| &record.replacing_id),
"dedupe_key": report.deduped.first().map(|record| &record.dedupe_key),
"replaced_ids": &dropped_reminder_ids,
"dropped_reminder_ids": &dropped_reminder_ids,
"dropped_count": dropped_reminder_ids.len(),
}),
);
}
}
fn build_snapshot_asset(
transcript: &VmValue,
config: &AutoCompactConfig,
archived_messages: usize,
estimated_tokens_before: usize,
estimated_tokens_after: usize,
) -> VmValue {
let mut asset_metadata = BTreeMap::from([
(
"strategy".to_string(),
VmValue::String(Rc::from(compact_strategy_name(&config.compact_strategy))),
),
(
"archived_messages".to_string(),
VmValue::Int(archived_messages as i64),
),
(
"estimated_tokens_before".to_string(),
VmValue::Int(estimated_tokens_before as i64),
),
(
"estimated_tokens_after".to_string(),
VmValue::Int(estimated_tokens_after as i64),
),
(
"instruction_mode".to_string(),
VmValue::String(Rc::from(config.policy.instruction_mode())),
),
]);
if let Some(policy_json) = config.policy.metadata_json() {
asset_metadata.insert(
"compaction_policy".to_string(),
crate::stdlib::json_to_vm_value(&policy_json),
);
}
if let Some(source) = config.policy.instruction_source() {
asset_metadata.insert(
"instruction_source".to_string(),
VmValue::String(Rc::from(source)),
);
}
let asset = VmValue::Dict(Rc::new(BTreeMap::from([
(
"id".to_string(),
VmValue::String(Rc::from(format!(
"compaction-source-{}",
uuid::Uuid::now_v7()
))),
),
(
"kind".to_string(),
VmValue::String(Rc::from("compaction_source_transcript")),
),
(
"title".to_string(),
VmValue::String(Rc::from("Pre-compaction transcript")),
),
(
"visibility".to_string(),
VmValue::String(Rc::from("internal")),
),
("data".to_string(), transcript.clone()),
(
"metadata".to_string(),
VmValue::Dict(Rc::new(asset_metadata)),
),
])));
normalize_transcript_asset(&asset)
}
fn snapshot_asset_id_of(asset: &VmValue) -> String {
asset
.as_dict()
.and_then(|dict| dict.get("id"))
.map(|value| value.display())
.unwrap_or_default()
}
pub fn transcript_compactable_events(transcript: &BTreeMap<String, VmValue>) -> Vec<VmValue> {
transcript
.get("events")
.and_then(|events| match events {
VmValue::List(list) => Some(
list.iter()
.filter(|event| {
event
.as_dict()
.and_then(|dict| dict.get("kind"))
.map(|value| value.display())
.is_some_and(|kind| kind != "message" && kind != "tool_result")
})
.cloned()
.collect(),
),
_ => None,
})
.unwrap_or_default()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llm::helpers::{ReminderPropagate, ReminderRoleHint, ReminderSource};
fn reminder_event_value(body: &str, preserve: bool, ttl: Option<i64>) -> VmValue {
let reminder = SystemReminder {
id: format!("rem-{}", uuid::Uuid::now_v7()),
tags: Vec::new(),
dedupe_key: None,
ttl_turns: ttl,
preserve_on_compact: preserve,
propagate: ReminderPropagate::Session,
role_hint: ReminderRoleHint::System,
source: ReminderSource::StdlibProvider,
body: body.to_string(),
fired_at_turn: 0,
originating_agent_id: None,
};
let reminder_value =
crate::stdlib::json_to_vm_value(&serde_json::to_value(&reminder).unwrap());
let mut event = BTreeMap::new();
event.insert(
"kind".to_string(),
VmValue::String(std::rc::Rc::from("system_reminder")),
);
event.insert(
"role".to_string(),
VmValue::String(std::rc::Rc::from("system")),
);
event.insert("reminder".to_string(), reminder_value);
VmValue::Dict(std::rc::Rc::new(event))
}
#[test]
fn preserve_on_compact_reminder_survives_lifecycle() {
let preserved = reminder_event_value("keep me", true, None);
let droppable = reminder_event_value("drop me", false, None);
let report = compact_reminder_events(vec![preserved.clone(), droppable]);
assert_eq!(report.preserved_count, 1);
assert_eq!(report.compacted.len(), 1);
assert_eq!(report.preserved_events.len(), 1);
assert!(report.preserved_events.iter().any(|event| {
event
.as_dict()
.and_then(|dict| dict.get("reminder"))
.and_then(|reminder| reminder.as_dict())
.and_then(|reminder| reminder.get("body"))
.map(|body| body.display())
.is_some_and(|body| body == "keep me")
}));
}
#[test]
fn ttl_one_reminder_expires_during_lifecycle() {
let ttl_one = reminder_event_value("ephemeral", false, Some(1));
let report = compact_reminder_events(vec![ttl_one]);
assert_eq!(report.expired.len(), 1);
assert_eq!(report.preserved_count, 0);
}
#[test]
fn ttl_above_one_decrements_and_keeps() {
let ttl_three = reminder_event_value("keep ttl", false, Some(3));
let report = compact_reminder_events(vec![ttl_three]);
assert_eq!(report.decremented_count, 1);
assert_eq!(report.preserved_events.len(), 0);
assert_eq!(report.compacted.len(), 1);
}
#[test]
fn fires_hooks_only_for_session_owning_modes() {
assert!(CompactMode::Manual.fires_hooks());
assert!(CompactMode::Host.fires_hooks());
assert!(CompactMode::Auto.fires_hooks());
assert!(!CompactMode::Workflow.fires_hooks());
assert!(!CompactMode::Worker.fires_hooks());
assert!(!CompactMode::ResumeDigest.fires_hooks());
}
}