use std::collections::HashSet;
use std::sync::Arc;
use awaken_contract::contract::inference::ContextWindowPolicy;
use awaken_contract::contract::message::{Message, Role, Visibility};
use awaken_contract::contract::transform::estimate_message_tokens;
use super::plugin::{CompactionAction, CompactionBoundary, CompactionInFlight, CompactionStateKey};
use super::summarizer::{MIN_COMPACTION_GAIN_TOKENS, extract_previous_summary, render_transcript};
use crate::state::{MutationBatch, StateStore};
pub const COMPACTION_COMPLETED_EVENT: &str = "context.compacted";
pub const COMPACTION_FAILED_EVENT: &str = "context.compaction_failed";
pub fn find_compaction_boundary(
messages: &[Arc<Message>],
start: usize,
end: usize,
) -> Option<usize> {
let mut open_calls = HashSet::<String>::new();
let mut best_boundary = None;
for (idx, msg) in messages.iter().enumerate().skip(start).take(end - start) {
if let Some(ref calls) = msg.tool_calls {
for call in calls {
open_calls.insert(call.id.clone());
}
}
if msg.role == Role::Tool
&& let Some(ref call_id) = msg.tool_call_id
{
open_calls.remove(call_id);
}
let next_is_tool = messages
.get(idx + 1)
.is_some_and(|next| next.role == Role::Tool);
if open_calls.is_empty() && !next_is_tool {
best_boundary = Some(idx);
}
}
best_boundary
}
pub fn trim_to_compaction_boundary(messages: &mut Vec<Arc<Message>>) {
let last_summary_idx = messages.iter().rposition(|m| {
m.role == Role::System
&& m.visibility == Visibility::Internal
&& m.text().contains("<conversation-summary>")
});
if let Some(idx) = last_summary_idx
&& idx > 0
{
messages.drain(..idx);
}
}
pub fn record_compaction_boundary(
boundary: super::plugin::CompactionBoundary,
) -> super::plugin::CompactionAction {
super::plugin::CompactionAction::RecordBoundary(boundary)
}
#[derive(Debug, Clone)]
pub struct CompactionPlan {
pub transcript: String,
pub previous_summary: Option<String>,
pub boundary_message_id: String,
pub pre_tokens: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct AppliedCompaction {
pub boundary_index: usize,
pub pre_tokens: usize,
pub post_tokens: usize,
}
pub fn plan_compaction(
messages: &[Arc<Message>],
policy: &ContextWindowPolicy,
) -> Option<CompactionPlan> {
if messages.len() < 2 {
return None;
}
let keep_suffix = policy.compaction_raw_suffix_messages.min(messages.len());
let search_end = messages.len().saturating_sub(keep_suffix);
if search_end < 2 {
return None;
}
let boundary = find_compaction_boundary(messages, 0, search_end)?;
let boundary_message_id = messages[boundary].id.clone()?;
let pre_tokens: usize = messages[..=boundary]
.iter()
.map(|m| estimate_message_tokens(m))
.sum();
if pre_tokens < MIN_COMPACTION_GAIN_TOKENS {
return None;
}
let transcript = render_transcript(&messages[..=boundary]);
if transcript.is_empty() {
return None;
}
let previous_summary = extract_previous_summary(messages);
Some(CompactionPlan {
transcript,
previous_summary,
boundary_message_id,
pre_tokens,
})
}
pub fn record_compaction_in_flight(in_flight: CompactionInFlight) -> CompactionAction {
CompactionAction::SetInFlight(in_flight)
}
pub fn clear_compaction_in_flight() -> CompactionAction {
CompactionAction::ClearInFlight
}
pub fn try_consume_compaction_event(
messages: &mut Vec<Arc<Message>>,
payload: &serde_json::Value,
store: &StateStore,
) -> bool {
let Some(event_type) = compaction_event_type(payload) else {
return false;
};
let inner = payload.get("payload");
match event_type {
e if e == COMPACTION_COMPLETED_EVENT => {
let boundary_id = inner
.and_then(|p| p.get("boundary_message_id"))
.and_then(|v| v.as_str())
.unwrap_or_default();
let summary = inner
.and_then(|p| p.get("summary"))
.and_then(|v| v.as_str())
.unwrap_or_default();
let reported_pre_tokens = inner
.and_then(|p| p.get("pre_tokens"))
.and_then(|v| v.as_u64())
.unwrap_or(0) as usize;
let mut batch = MutationBatch::new();
if !boundary_id.is_empty()
&& !summary.is_empty()
&& let Some(applied) = apply_summary(messages, boundary_id, summary)
{
batch.update::<CompactionStateKey>(CompactionAction::RecordBoundary(
CompactionBoundary {
summary: summary.to_string(),
pre_tokens: applied.pre_tokens.max(reported_pre_tokens),
post_tokens: applied.post_tokens,
timestamp_ms: now_ms(),
},
));
tracing::info!(
pre_tokens = applied.pre_tokens,
post_tokens = applied.post_tokens,
boundary_index = applied.boundary_index,
"background_compaction_swap_applied"
);
} else {
tracing::warn!(
boundary_message_id = boundary_id,
"background compaction completed but boundary message no longer present; skipping swap"
);
}
batch.update::<CompactionStateKey>(CompactionAction::ClearInFlight);
if let Err(error) = store.commit(batch) {
tracing::warn!(
error = %error,
"failed to commit compaction completion state"
);
}
}
e if e == COMPACTION_FAILED_EVENT => {
let error_text = inner
.and_then(|p| p.get("error"))
.and_then(|v| v.as_str())
.unwrap_or("unknown error");
tracing::warn!(
error = error_text,
"background compaction failed; clearing in-flight marker"
);
let mut batch = MutationBatch::new();
batch.update::<CompactionStateKey>(CompactionAction::ClearInFlight);
if let Err(error) = store.commit(batch) {
tracing::warn!(
error = %error,
"failed to clear in-flight marker after compaction failure"
);
}
}
_ => {}
}
true
}
fn compaction_event_type(payload: &serde_json::Value) -> Option<&str> {
if payload.get("kind").and_then(|k| k.as_str()) != Some("custom") {
return None;
}
payload
.get("event_type")
.and_then(|t| t.as_str())
.filter(|t| *t == COMPACTION_COMPLETED_EVENT || *t == COMPACTION_FAILED_EVENT)
}
fn now_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub fn apply_summary(
messages: &mut Vec<Arc<Message>>,
boundary_message_id: &str,
summary_text: &str,
) -> Option<AppliedCompaction> {
let idx = messages
.iter()
.position(|m| m.id.as_deref() == Some(boundary_message_id))?;
let pre_tokens: usize = messages[..=idx]
.iter()
.map(|m| estimate_message_tokens(m))
.sum();
messages.drain(..=idx);
let summary_message = Arc::new(Message::internal_system(format!(
"<conversation-summary>\n{summary_text}\n</conversation-summary>"
)));
let post_tokens = estimate_message_tokens(&summary_message);
messages.insert(0, summary_message);
Some(AppliedCompaction {
boundary_index: idx,
pre_tokens,
post_tokens,
})
}
#[cfg(test)]
mod tests {
use super::*;
use awaken_contract::contract::message::ToolCall;
use serde_json::json;
fn long_user(text: &str, copies: usize) -> Arc<Message> {
Arc::new(Message::user(text.repeat(copies)))
}
fn store_with_compaction_plugin() -> StateStore {
let store = StateStore::new();
store
.install_plugin(super::super::plugin::CompactionPlugin::default())
.unwrap();
store
}
fn completed_event(boundary_id: &str, summary: &str, pre_tokens: u64) -> serde_json::Value {
json!({
"kind": "custom",
"task_id": "bg_99",
"event_type": COMPACTION_COMPLETED_EVENT,
"payload": {
"boundary_message_id": boundary_id,
"summary": summary,
"pre_tokens": pre_tokens,
},
})
}
fn failed_event(boundary_id: &str, error_text: &str) -> serde_json::Value {
json!({
"kind": "custom",
"task_id": "bg_99",
"event_type": COMPACTION_FAILED_EVENT,
"payload": {
"boundary_message_id": boundary_id,
"error": error_text,
},
})
}
fn mark_in_flight(store: &StateStore, boundary_id: &str) {
let mut batch = MutationBatch::new();
batch.update::<CompactionStateKey>(record_compaction_in_flight(CompactionInFlight {
task_id: "bg_99".into(),
boundary_message_id: boundary_id.into(),
started_at_ms: 1,
}));
store.commit(batch).unwrap();
}
#[test]
fn try_consume_compaction_event_swaps_messages_and_records_boundary() {
let store = store_with_compaction_plugin();
let mut messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("OLD-1")),
Arc::new(Message::assistant("OLD-2")),
Arc::new(Message::user("BOUNDARY")),
Arc::new(Message::assistant("AFTER")),
Arc::new(Message::user("RACE-NEW")),
];
let boundary_id = messages[2].id.clone().unwrap();
mark_in_flight(&store, &boundary_id);
let consumed = try_consume_compaction_event(
&mut messages,
&completed_event(&boundary_id, "the summary", 4321),
&store,
);
assert!(consumed, "must report the event was consumed");
assert!(
messages[0]
.text()
.contains("<conversation-summary>\nthe summary"),
"summary not at front: {}",
messages[0].text()
);
assert_eq!(messages[1].text(), "AFTER");
assert_eq!(messages[2].text(), "RACE-NEW");
assert_eq!(messages.len(), 3);
let state = store.read::<CompactionStateKey>().unwrap();
assert!(!state.is_compacting(), "in-flight must be cleared");
assert_eq!(state.boundaries.len(), 1, "boundary must be recorded");
assert_eq!(state.boundaries[0].summary, "the summary");
}
#[test]
fn try_consume_compaction_event_skips_swap_when_boundary_no_longer_present() {
let store = store_with_compaction_plugin();
let mut messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("only-msg")),
Arc::new(Message::assistant("only-reply")),
];
mark_in_flight(&store, "ghost-boundary-id");
let consumed = try_consume_compaction_event(
&mut messages,
&completed_event("ghost-boundary-id", "irrelevant", 0),
&store,
);
assert!(consumed);
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].text(), "only-msg");
let state = store.read::<CompactionStateKey>().unwrap();
assert!(
!state.is_compacting(),
"in-flight must clear even on benign skip"
);
assert!(
state.boundaries.is_empty(),
"no boundary should be recorded when swap was skipped"
);
}
#[test]
fn try_consume_compaction_event_clears_in_flight_on_failure() {
let store = store_with_compaction_plugin();
let mut messages: Vec<Arc<Message>> = vec![Arc::new(Message::user("x"))];
mark_in_flight(&store, "any");
let consumed =
try_consume_compaction_event(&mut messages, &failed_event("any", "boom"), &store);
assert!(consumed);
let state = store.read::<CompactionStateKey>().unwrap();
assert!(!state.is_compacting());
assert!(
state.boundaries.is_empty(),
"failure must not record a boundary"
);
}
#[test]
fn try_consume_compaction_event_passes_through_unrelated_payloads() {
let store = store_with_compaction_plugin();
let mut messages: Vec<Arc<Message>> = vec![Arc::new(Message::user("x"))];
let other = json!({
"kind": "custom",
"task_id": "bg_42",
"event_type": "task.heartbeat",
"payload": {"pct": 50},
});
assert!(!try_consume_compaction_event(&mut messages, &other, &store));
let task_completed = json!({
"kind": "completed",
"task_id": "bg_43",
"result": null,
});
assert!(!try_consume_compaction_event(
&mut messages,
&task_completed,
&store
));
}
#[test]
fn plan_compaction_returns_none_when_savings_below_threshold() {
let messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("hi")),
Arc::new(Message::assistant("hello")),
Arc::new(Message::user("how are you?")),
Arc::new(Message::assistant("fine")),
];
let policy = ContextWindowPolicy {
compaction_raw_suffix_messages: 1,
..Default::default()
};
assert!(plan_compaction(&messages, &policy).is_none());
}
#[test]
fn plan_compaction_captures_boundary_message_id() {
let mut messages: Vec<Arc<Message>> = (0..6)
.map(|i| {
if i % 2 == 0 {
long_user("filler ", 600)
} else {
Arc::new(Message::assistant("ack"))
}
})
.collect();
messages.push(Arc::new(Message::user("recent")));
let policy = ContextWindowPolicy {
compaction_raw_suffix_messages: 1,
..Default::default()
};
let plan = plan_compaction(&messages, &policy).expect("plan");
assert!(
messages
.iter()
.any(|m| m.id.as_deref() == Some(plan.boundary_message_id.as_str()))
);
assert!(plan.pre_tokens >= MIN_COMPACTION_GAIN_TOKENS);
assert!(!plan.transcript.is_empty());
}
#[test]
fn apply_summary_swaps_when_boundary_present() {
let mut messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("old1")),
Arc::new(Message::assistant("old2")),
Arc::new(Message::user("BOUNDARY")),
Arc::new(Message::assistant("after-boundary")),
Arc::new(Message::user("appended-during-window")),
];
let boundary_id = messages[2].id.clone().unwrap();
let applied = apply_summary(&mut messages, &boundary_id, "synthetic summary").unwrap();
assert_eq!(applied.boundary_index, 2);
assert!(applied.pre_tokens > 0);
assert!(applied.post_tokens > 0);
assert!(
messages[0]
.text()
.contains("<conversation-summary>\nsynthetic summary"),
"summary missing or malformed: {}",
messages[0].text()
);
assert_eq!(messages[1].text(), "after-boundary");
assert_eq!(messages[2].text(), "appended-during-window");
assert_eq!(messages.len(), 3);
}
#[test]
fn apply_summary_returns_none_when_boundary_already_gone() {
let mut messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("a")),
Arc::new(Message::assistant("b")),
];
let original = messages.clone();
assert!(apply_summary(&mut messages, "non-existent-id", "any").is_none());
assert_eq!(messages.len(), original.len());
for (a, b) in messages.iter().zip(original.iter()) {
assert_eq!(a.text(), b.text());
}
}
#[test]
fn find_compaction_boundary_respects_tool_pairs() {
let messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("start")),
Arc::new(Message::assistant_with_tool_calls(
"",
vec![ToolCall::new("c1", "search", json!({}))],
)),
Arc::new(Message::tool("c1", "found")),
Arc::new(Message::user("next")), Arc::new(Message::assistant("reply")),
];
let boundary = find_compaction_boundary(&messages, 0, messages.len());
assert!(boundary.is_some());
let b = boundary.unwrap();
assert!(b >= 3);
}
#[test]
fn trim_to_compaction_boundary_drops_pre_summary() {
let mut messages = vec![
Arc::new(Message::user("old msg 1")),
Arc::new(Message::assistant("old reply")),
Arc::new(Message::internal_system(
"<conversation-summary>\nSummary of old messages\n</conversation-summary>",
)),
Arc::new(Message::user("new msg")),
Arc::new(Message::assistant("new reply")),
];
trim_to_compaction_boundary(&mut messages);
assert_eq!(messages.len(), 3);
assert!(messages[0].text().contains("conversation-summary"));
assert_eq!(messages[1].text(), "new msg");
}
#[test]
fn trim_to_compaction_boundary_noop_without_summary() {
let mut messages = vec![
Arc::new(Message::user("hello")),
Arc::new(Message::assistant("hi")),
];
let len_before = messages.len();
trim_to_compaction_boundary(&mut messages);
assert_eq!(messages.len(), len_before);
}
#[test]
fn find_compaction_boundary_does_not_cut_open_tool_round() {
let messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("start")),
Arc::new(Message::assistant("reply")),
Arc::new(Message::user("next")),
Arc::new(Message::assistant_with_tool_calls(
"",
vec![ToolCall::new("c1", "search", json!({}))],
)),
];
let boundary = find_compaction_boundary(&messages, 0, messages.len());
if let Some(b) = boundary {
assert!(b <= 2, "boundary should not include open tool round");
}
}
#[test]
fn trim_to_compaction_boundary_idempotent() {
let mut messages = vec![
Arc::new(Message::user("old")),
Arc::new(Message::internal_system(
"<conversation-summary>\nSummary\n</conversation-summary>",
)),
Arc::new(Message::user("new")),
];
trim_to_compaction_boundary(&mut messages);
let len_after_first = messages.len();
trim_to_compaction_boundary(&mut messages);
assert_eq!(
messages.len(),
len_after_first,
"second trim should be noop"
);
}
#[test]
fn find_boundary_skips_open_tool_rounds() {
let messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("start")),
Arc::new(Message::assistant("ok")),
Arc::new(Message::user("do something")),
Arc::new(Message::assistant_with_tool_calls(
"",
vec![ToolCall::new("c1", "search", json!({}))],
)),
];
let boundary = find_compaction_boundary(&messages, 0, messages.len());
if let Some(b) = boundary {
assert!(b < 3, "boundary {b} must be before open tool call at idx 3");
}
}
#[test]
fn find_boundary_respects_suffix_messages() {
let messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("old1")),
Arc::new(Message::assistant("reply1")),
Arc::new(Message::user("old2")),
Arc::new(Message::assistant("reply2")),
Arc::new(Message::user("recent")),
Arc::new(Message::assistant("recent_reply")),
];
let suffix_count = 2;
let search_end = messages.len().saturating_sub(suffix_count);
let boundary = find_compaction_boundary(&messages, 0, search_end);
if let Some(b) = boundary {
assert!(
b < search_end,
"boundary {b} must be before suffix start {search_end}"
);
}
}
#[test]
fn find_boundary_returns_none_when_too_few_messages() {
let messages: Vec<Arc<Message>> = vec![Arc::new(Message::user("only message"))];
let boundary = find_compaction_boundary(&messages, 0, 0);
assert!(boundary.is_none(), "empty range should yield no boundary");
let messages2: Vec<Arc<Message>> = vec![Arc::new(Message::assistant_with_tool_calls(
"",
vec![ToolCall::new("c1", "fn", json!({}))],
))];
let boundary2 = find_compaction_boundary(&messages2, 0, messages2.len());
assert!(
boundary2.is_none(),
"single open tool call should yield no boundary"
);
}
#[test]
fn find_compaction_boundary_multiple_complete_tool_rounds() {
let messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("start")),
Arc::new(Message::assistant_with_tool_calls(
"",
vec![ToolCall::new("c1", "search", json!({}))],
)),
Arc::new(Message::tool("c1", "found it")),
Arc::new(Message::user("next")),
Arc::new(Message::assistant_with_tool_calls(
"",
vec![ToolCall::new("c2", "read", json!({}))],
)),
Arc::new(Message::tool("c2", "content")),
Arc::new(Message::user("last")),
Arc::new(Message::assistant("done")),
];
let boundary = find_compaction_boundary(&messages, 0, messages.len());
assert!(boundary.is_some());
let b = boundary.unwrap();
assert!(
b >= 6,
"boundary should be after all tool rounds: got {}",
b
);
}
#[test]
fn find_compaction_boundary_empty_range() {
let messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("hello")),
Arc::new(Message::assistant("hi")),
];
let boundary = find_compaction_boundary(&messages, 0, 0);
assert!(boundary.is_none(), "empty range should yield no boundary");
}
#[test]
fn find_compaction_boundary_range_start_equals_end() {
let messages: Vec<Arc<Message>> = vec![Arc::new(Message::user("only"))];
let boundary = find_compaction_boundary(&messages, 1, 1);
assert!(boundary.is_none());
}
#[test]
fn trim_to_compaction_boundary_uses_last_summary() {
let mut messages = vec![
Arc::new(Message::user("old msg 1")),
Arc::new(Message::internal_system(
"<conversation-summary>\nFirst summary\n</conversation-summary>",
)),
Arc::new(Message::user("mid msg")),
Arc::new(Message::internal_system(
"<conversation-summary>\nSecond summary\n</conversation-summary>",
)),
Arc::new(Message::user("new msg")),
];
trim_to_compaction_boundary(&mut messages);
assert_eq!(messages.len(), 2);
assert!(messages[0].text().contains("Second summary"));
assert_eq!(messages[1].text(), "new msg");
}
#[test]
fn find_compaction_boundary_with_multiple_tool_calls_in_one_round() {
let messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("do things")),
Arc::new(Message::assistant_with_tool_calls(
"",
vec![
ToolCall::new("c1", "search", json!({})),
ToolCall::new("c2", "read", json!({})),
],
)),
Arc::new(Message::tool("c1", "found")),
Arc::new(Message::tool("c2", "content")),
Arc::new(Message::user("thanks")),
];
let boundary = find_compaction_boundary(&messages, 0, messages.len());
assert!(boundary.is_some());
let b = boundary.unwrap();
assert!(
b >= 3,
"boundary should be after all tool results: got {}",
b
);
}
#[test]
fn find_compaction_boundary_partial_tool_results() {
let messages: Vec<Arc<Message>> = vec![
Arc::new(Message::user("start")),
Arc::new(Message::assistant_with_tool_calls(
"",
vec![
ToolCall::new("c1", "search", json!({})),
ToolCall::new("c2", "read", json!({})),
],
)),
Arc::new(Message::tool("c1", "found")),
];
let boundary = find_compaction_boundary(&messages, 0, messages.len());
if let Some(b) = boundary {
assert!(b < 1, "boundary should not include incomplete tool round");
}
}
}