use std::collections::HashMap;
use async_trait::async_trait;
use chrono::Utc;
use entelix_core::ExecutionContext;
use entelix_core::error::{Error, Result};
use entelix_core::ir::{ContentPart, Message, Role, ToolResultContent};
use crate::event::GraphEvent;
#[derive(Clone, Debug)]
pub struct ToolPair {
call_id: String,
name: String,
input: serde_json::Value,
result: ToolResultContent,
is_error: bool,
}
impl ToolPair {
pub fn id(&self) -> &str {
&self.call_id
}
pub fn name(&self) -> &str {
&self.name
}
pub const fn input(&self) -> &serde_json::Value {
&self.input
}
pub const fn result(&self) -> &ToolResultContent {
&self.result
}
pub const fn is_error(&self) -> bool {
self.is_error
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Turn {
User {
content: Vec<ContentPart>,
},
Assistant {
content: Vec<ContentPart>,
tools: Vec<ToolPair>,
},
}
#[derive(Clone, Debug)]
pub struct CompactedHistory {
turns: Vec<Turn>,
}
impl CompactedHistory {
pub fn group(events: &[GraphEvent]) -> Result<Self> {
Ok(Self {
turns: group_into_turns(events)?,
})
}
#[must_use]
pub const fn from_turns(turns: Vec<Turn>) -> Self {
Self { turns }
}
pub fn turns(&self) -> &[Turn] {
&self.turns
}
pub const fn len(&self) -> usize {
self.turns.len()
}
pub const fn is_empty(&self) -> bool {
self.turns.is_empty()
}
pub fn to_messages(&self) -> Vec<Message> {
let mut out = Vec::with_capacity(self.turns.len() * 2);
for turn in &self.turns {
match turn {
Turn::User { content } => {
out.push(Message::new(Role::User, content.clone()));
}
Turn::Assistant { content, tools } => {
out.push(Message::new(Role::Assistant, content.clone()));
for pair in tools {
out.push(Message::new(
Role::Tool,
vec![ContentPart::ToolResult {
tool_use_id: pair.call_id.clone(),
name: pair.name.clone(),
content: pair.result.clone(),
is_error: pair.is_error,
cache_control: None,
provider_echoes: Vec::new(),
}],
));
}
}
}
}
out
}
}
#[async_trait]
pub trait Compactor: Send + Sync + 'static {
async fn compact(
&self,
events: &[GraphEvent],
budget_chars: usize,
ctx: &ExecutionContext,
) -> Result<CompactedHistory>;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct HeadDropCompactor;
#[async_trait]
impl Compactor for HeadDropCompactor {
async fn compact(
&self,
events: &[GraphEvent],
budget_chars: usize,
_ctx: &ExecutionContext,
) -> Result<CompactedHistory> {
let mut turns = CompactedHistory::group(events)?.turns;
let mut remaining = budget_chars;
let mut keep_index = turns.len();
for (idx, turn) in turns.iter().enumerate().rev() {
let cost = turn_char_cost(turn);
if cost > remaining {
break;
}
remaining -= cost;
keep_index = idx;
}
let trimmed = turns.split_off(keep_index);
Ok(CompactedHistory::from_turns(trimmed))
}
}
pub fn messages_to_events(messages: &[Message]) -> Result<Vec<GraphEvent>> {
let now = Utc::now();
let mut events = Vec::with_capacity(messages.len() * 2);
for msg in messages {
match msg.role {
Role::User => {
events.push(GraphEvent::UserMessage {
content: msg.content.clone(),
timestamp: now,
});
}
Role::Assistant => {
events.push(GraphEvent::AssistantMessage {
content: msg.content.clone(),
usage: None,
timestamp: now,
});
for part in &msg.content {
if let ContentPart::ToolUse {
id, name, input, ..
} = part
{
events.push(GraphEvent::ToolCall {
id: id.clone(),
name: name.clone(),
input: input.clone(),
timestamp: now,
});
}
}
}
Role::Tool => {
for part in &msg.content {
if let ContentPart::ToolResult {
tool_use_id,
name,
content,
is_error,
..
} = part
{
events.push(GraphEvent::ToolResult {
tool_use_id: tool_use_id.clone(),
name: name.clone(),
content: content.clone(),
is_error: *is_error,
timestamp: now,
});
}
}
}
_ => {}
}
}
Ok(events)
}
#[must_use]
pub fn messages_char_size(messages: &[Message]) -> usize {
messages.iter().map(|m| content_chars(&m.content)).sum()
}
fn group_into_turns(events: &[GraphEvent]) -> Result<Vec<Turn>> {
let mut pending_calls: HashMap<String, (String, serde_json::Value)> = HashMap::new();
let mut turns: Vec<Turn> = Vec::new();
for event in events {
match event {
GraphEvent::UserMessage { content, .. } => {
turns.push(Turn::User {
content: content.clone(),
});
}
GraphEvent::AssistantMessage { content, .. } => {
turns.push(Turn::Assistant {
content: content.clone(),
tools: Vec::new(),
});
}
GraphEvent::ToolCall {
id, name, input, ..
} => {
pending_calls.insert(id.clone(), (name.clone(), input.clone()));
}
GraphEvent::ToolResult {
tool_use_id,
name,
content,
is_error,
..
} => {
let (_call_name, call_input) =
pending_calls.remove(tool_use_id).ok_or_else(|| {
Error::config(format!(
"Compactor: ToolResult tool_use_id={tool_use_id} \
has no matching ToolCall in event log"
))
})?;
let pair = ToolPair {
call_id: tool_use_id.clone(),
name: name.clone(),
input: call_input,
result: content.clone(),
is_error: *is_error,
};
let host = turns
.iter_mut()
.rev()
.find(|t| matches!(t, Turn::Assistant { .. }))
.ok_or_else(|| {
Error::config("Compactor: ToolResult appeared before any AssistantMessage")
})?;
if let Turn::Assistant { tools, .. } = host {
tools.push(pair);
}
}
_ => {}
}
}
if !pending_calls.is_empty() {
return Err(Error::config(format!(
"Compactor: {} ToolCall(s) without matching ToolResult — pair invariant violated",
pending_calls.len()
)));
}
Ok(turns)
}
fn turn_char_cost(turn: &Turn) -> usize {
match turn {
Turn::User { content } => content_chars(content),
Turn::Assistant { content, tools } => {
let mut sum = content_chars(content);
for pair in tools {
sum += pair.input.to_string().len();
sum += match &pair.result {
ToolResultContent::Text(s) => s.len(),
ToolResultContent::Json(v) => v.to_string().len(),
_ => 0,
};
}
sum
}
}
}
fn content_chars(parts: &[ContentPart]) -> usize {
parts
.iter()
.map(|p| match p {
ContentPart::Text { text, .. } | ContentPart::Thinking { text, .. } => text.len(),
ContentPart::ToolUse { input, .. } => input.to_string().len(),
ContentPart::ToolResult { content, .. } => match content {
ToolResultContent::Text(s) => s.len(),
ToolResultContent::Json(v) => v.to_string().len(),
_ => 0,
},
_ => 0,
})
.sum()
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
mod tests {
use chrono::Utc;
use serde_json::json;
use super::*;
fn user(text: &str) -> GraphEvent {
GraphEvent::UserMessage {
content: vec![ContentPart::text(text)],
timestamp: Utc::now(),
}
}
fn assistant(text: &str) -> GraphEvent {
GraphEvent::AssistantMessage {
content: vec![ContentPart::text(text)],
usage: None,
timestamp: Utc::now(),
}
}
fn tool_call(id: &str, name: &str, input: serde_json::Value) -> GraphEvent {
GraphEvent::ToolCall {
id: id.to_owned(),
name: name.to_owned(),
input,
timestamp: Utc::now(),
}
}
fn tool_result(id: &str, name: &str, text: &str) -> GraphEvent {
GraphEvent::ToolResult {
tool_use_id: id.to_owned(),
name: name.to_owned(),
content: ToolResultContent::Text(text.to_owned()),
is_error: false,
timestamp: Utc::now(),
}
}
#[tokio::test]
async fn empty_event_log_compacts_to_empty_history() {
let history = HeadDropCompactor
.compact(&[], 1024, &ExecutionContext::new())
.await
.unwrap();
assert!(history.is_empty());
}
#[tokio::test]
async fn user_assistant_round_trip_preserves_both_turns() {
let events = vec![user("hi"), assistant("hello!")];
let history = HeadDropCompactor
.compact(&events, 1024, &ExecutionContext::new())
.await
.unwrap();
assert_eq!(history.len(), 2);
assert!(matches!(history.turns()[0], Turn::User { .. }));
assert!(matches!(history.turns()[1], Turn::Assistant { .. }));
}
#[tokio::test]
async fn tool_pair_attaches_to_preceding_assistant_turn() {
let events = vec![
user("compute 1+1"),
assistant("calling calculator"),
tool_call("call_1", "calculator", json!({"expr": "1+1"})),
tool_result("call_1", "calculator", "2"),
assistant("answer is 2"),
];
let history = HeadDropCompactor
.compact(&events, 1024, &ExecutionContext::new())
.await
.unwrap();
assert_eq!(history.len(), 3); if let Turn::Assistant { tools, .. } = &history.turns()[1] {
assert_eq!(tools.len(), 1);
assert_eq!(tools[0].id(), "call_1");
assert_eq!(tools[0].name(), "calculator");
} else {
panic!("expected Assistant turn at index 1");
}
}
#[tokio::test]
async fn tool_result_without_matching_call_returns_config_error() {
let events = vec![
user("ask"),
assistant("calling"),
tool_result("orphan", "calc", "x"),
];
let err = HeadDropCompactor
.compact(&events, 1024, &ExecutionContext::new())
.await
.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("orphan"),
"diagnostic must name the unmatched id: {msg}"
);
}
#[tokio::test]
async fn tool_call_without_matching_result_returns_config_error() {
let events = vec![
user("ask"),
assistant("calling"),
tool_call("dangling", "calc", json!({})),
];
let err = HeadDropCompactor
.compact(&events, 1024, &ExecutionContext::new())
.await
.unwrap_err();
let msg = err.to_string();
assert!(msg.contains("pair invariant violated"), "got: {msg}");
}
#[tokio::test]
async fn budget_drops_oldest_turns_keeps_newest() {
let events = vec![
user("one one one"),
assistant("one reply"),
user("two two two"),
assistant("two reply"),
user("three three three"),
assistant("three reply"),
];
let history = HeadDropCompactor
.compact(&events, 50, &ExecutionContext::new())
.await
.unwrap();
assert!(!history.is_empty());
let last = history.turns().last().unwrap();
if let Turn::Assistant { content, .. } = last {
if let ContentPart::Text { text, .. } = &content[0] {
assert!(
text.contains("three"),
"newest turn must be retained, got: {text}"
);
}
} else {
panic!("expected Assistant as last turn");
}
}
#[tokio::test]
async fn to_messages_round_trips_user_assistant_tool_sequence() {
let events = vec![
user("ask"),
assistant("calling"),
tool_call("c", "tool", json!({})),
tool_result("c", "tool", "ok"),
];
let history = HeadDropCompactor
.compact(&events, 1024, &ExecutionContext::new())
.await
.unwrap();
let msgs = history.to_messages();
assert_eq!(msgs.len(), 3); assert!(matches!(msgs[0].role, Role::User));
assert!(matches!(msgs[1].role, Role::Assistant));
assert!(matches!(msgs[2].role, Role::Tool));
}
#[tokio::test]
async fn pair_invariant_holds_under_partial_budget_drop() {
let events = vec![
user("u1"),
assistant("a1"),
tool_call("t1", "x", json!({"v": 1})),
tool_result("t1", "x", "r1"),
user("u2"),
assistant("a2"),
];
let history = HeadDropCompactor
.compact(&events, 30, &ExecutionContext::new())
.await
.unwrap();
for turn in history.turns() {
if let Turn::Assistant { tools, .. } = turn {
for pair in tools {
let _ = (pair.id(), pair.name(), pair.input(), pair.result());
}
}
}
}
}