entelix_session/compaction.rs
1//! `Compactor` + sealed `CompactedHistory<Turn>` — type-enforced
2//! `ToolCall` / `ToolResult` pair invariant for context compaction.
3//!
4//! Long agent runs accumulate event logs that exceed the model's
5//! context window. Operators must drop *some* events without
6//! breaking the conversation invariants vendors enforce — chiefly
7//! that every `tool_use` block has a matching `tool_result`.
8//! Mismatched pairs surface as HTTP 400s on the next call;
9//! pydantic-ai's [issue #4137](https://github.com/pydantic/pydantic-ai/issues/4137)
10//! catalogues the recurring footgun across SDKs.
11//!
12//! `entelix-session` closes the foot-gun by exposing compaction
13//! through this module. A [`Compactor`] consumes `&[GraphEvent]`
14//! and returns a [`CompactedHistory`] whose constructor is sealed
15//! to this module — operators cannot hand-build a `CompactedHistory`
16//! that violates the pair invariant. The sealed [`Turn`] enum
17//! groups events into:
18//!
19//! - [`Turn::User`] — one `UserMessage`.
20//! - [`Turn::Assistant`] — one `AssistantMessage` plus zero or
21//! more [`ToolPair`]s, each binding a `ToolCall` to its
22//! matching `ToolResult` *by structure*.
23//!
24//! Because `ToolPair` cannot be constructed with only one half,
25//! every compaction strategy operates on whole `Turn`s — the
26//! model never receives a `tool_use` without its `tool_result`.
27//!
28//! ## Reference impl
29//!
30//! [`HeadDropCompactor`] is the canonical "drop oldest" strategy:
31//! walks turns from newest backwards, keeps turns that fit under
32//! the character budget, returns the trimmed window. Operators
33//! whose use case wants summary-style compaction (LLM-generated
34//! synopsis of dropped turns) implement [`Compactor`] directly.
35
36use std::collections::HashMap;
37
38use async_trait::async_trait;
39use chrono::Utc;
40use entelix_core::ExecutionContext;
41use entelix_core::error::{Error, Result};
42use entelix_core::ir::{ContentPart, Message, Role, ToolResultContent};
43
44use crate::event::GraphEvent;
45
46/// One matched `ToolCall` / `ToolResult` pair. Sealed: the only
47/// path to construction is [`Compactor::compact`] internal grouping,
48/// so a pair without both halves cannot exist.
49///
50/// Read-only accessors expose the call's id / name / input and the
51/// result's content / error flag for operators that inspect the
52/// compacted view (rendering, dashboards). Mutation is not
53/// supported — a `Turn` carrying a different pair set is a fresh
54/// compaction.
55#[derive(Clone, Debug)]
56pub struct ToolPair {
57 call_id: String,
58 name: String,
59 input: serde_json::Value,
60 result: ToolResultContent,
61 is_error: bool,
62}
63
64impl ToolPair {
65 /// Stable tool-use id binding the call to its result.
66 pub fn id(&self) -> &str {
67 &self.call_id
68 }
69
70 /// Tool name as registered with the dispatching `ToolRegistry`.
71 pub fn name(&self) -> &str {
72 &self.name
73 }
74
75 /// Tool input as JSON.
76 pub const fn input(&self) -> &serde_json::Value {
77 &self.input
78 }
79
80 /// Result payload returned by the tool.
81 pub const fn result(&self) -> &ToolResultContent {
82 &self.result
83 }
84
85 /// Whether the tool reported an error path.
86 pub const fn is_error(&self) -> bool {
87 self.is_error
88 }
89}
90
91/// One turn in a compacted conversation. Sealed so a `Turn::Assistant`
92/// can only carry [`ToolPair`]s constructed via the compactor's
93/// internal grouping code (paired calls + results).
94#[derive(Clone, Debug)]
95#[non_exhaustive]
96pub enum Turn {
97 /// User-authored message — opaque content, never paired.
98 User {
99 /// Multi-part content (text, image, …).
100 content: Vec<ContentPart>,
101 },
102 /// Assistant-authored message + the tool round-trips it
103 /// initiated. Empty `tools` means the assistant turn produced
104 /// final text only.
105 Assistant {
106 /// Assistant's content (may include `ContentPart::ToolUse`
107 /// blocks; the embedded tool-use ids match the
108 /// corresponding [`ToolPair::id`]s).
109 content: Vec<ContentPart>,
110 /// Matched tool round-trips initiated by this turn.
111 tools: Vec<ToolPair>,
112 },
113}
114
115/// Compacted view over a `SessionGraph`'s event log.
116///
117/// External operators implementing [`Compactor`] for a custom
118/// strategy (LLM-summary compaction, importance-weighted retention,
119/// …) construct the initial form via [`CompactedHistory::group`]
120/// and return either the same value or one rebuilt with
121/// [`CompactedHistory::from_turns`] after filtering. The
122/// `tool_call` / `tool_result` pair invariant stays type-enforced:
123/// the only path to a [`ToolPair`] is the internal grouping code,
124/// so external impls can drop or pass through tool round-trips
125/// but can't synthesize unmatched ones.
126#[derive(Clone, Debug)]
127pub struct CompactedHistory {
128 turns: Vec<Turn>,
129}
130
131impl CompactedHistory {
132 /// Group `events` into the type-enforced [`Turn`] shape and
133 /// return the un-trimmed compaction. The grouping rejects an
134 /// event log that violates the pair invariant *before*
135 /// compaction (e.g. `ToolResult` without a preceding
136 /// `ToolCall`); a well-formed `SessionGraph` never hits the
137 /// error path.
138 ///
139 /// External [`Compactor`] impls call this to get the initial
140 /// grouped form, then choose which turns to retain.
141 pub fn group(events: &[GraphEvent]) -> Result<Self> {
142 Ok(Self {
143 turns: group_into_turns(events)?,
144 })
145 }
146
147 /// Build a `CompactedHistory` from a pre-grouped `Vec<Turn>`.
148 /// External [`Compactor`] impls reach for this after filtering
149 /// or transforming the turns returned by
150 /// [`CompactedHistory::group`]. The pair invariant survives
151 /// the round-trip because the only path to a [`ToolPair`] is
152 /// still the internal grouping — operators pass them through
153 /// but can't synthesize new ones.
154 #[must_use]
155 pub const fn from_turns(turns: Vec<Turn>) -> Self {
156 Self { turns }
157 }
158
159 /// Borrow the compacted turns.
160 pub fn turns(&self) -> &[Turn] {
161 &self.turns
162 }
163
164 /// Number of turns retained.
165 pub const fn len(&self) -> usize {
166 self.turns.len()
167 }
168
169 /// Whether the compacted history is empty.
170 pub const fn is_empty(&self) -> bool {
171 self.turns.is_empty()
172 }
173
174 /// Render as `Vec<Message>` suitable for `ChatModel::complete`.
175 /// Mirrors [`crate::SessionGraph::current_branch_messages`] but
176 /// over the compacted view: every assistant turn's `tool_use`
177 /// blocks are followed by a synthetic `Role::Tool` message
178 /// per [`ToolPair`], so the wire-side codec sees the matched
179 /// pairs the vendor expects.
180 pub fn to_messages(&self) -> Vec<Message> {
181 let mut out = Vec::with_capacity(self.turns.len() * 2);
182 for turn in &self.turns {
183 match turn {
184 Turn::User { content } => {
185 out.push(Message::new(Role::User, content.clone()));
186 }
187 Turn::Assistant { content, tools } => {
188 out.push(Message::new(Role::Assistant, content.clone()));
189 for pair in tools {
190 out.push(Message::new(
191 Role::Tool,
192 vec![ContentPart::ToolResult {
193 tool_use_id: pair.call_id.clone(),
194 name: pair.name.clone(),
195 content: pair.result.clone(),
196 is_error: pair.is_error,
197 cache_control: None,
198 provider_echoes: Vec::new(),
199 }],
200 ));
201 }
202 }
203 }
204 }
205 out
206 }
207}
208
209/// Operator-supplied compaction strategy.
210///
211/// Receives the full event log plus a character-budget hint and
212/// returns the trimmed view. Async by default so summary-style
213/// implementations can dispatch a `ChatModel` call (`SummaryCompactor`
214/// in `entelix-agents` is the canonical reference); pure-retention
215/// strategies (`HeadDropCompactor`) simply ignore the future point
216/// and return synchronously inside the async fn body.
217///
218/// Implementations must preserve the `ToolCall` / `ToolResult` pair
219/// invariant — the [`CompactedHistory`] return type enforces that
220/// structurally; trait authors only need to choose *which* turns to
221/// retain.
222#[async_trait]
223pub trait Compactor: Send + Sync + 'static {
224 /// Compact `events` to fit within `budget_chars`. The budget is
225 /// approximate — implementations measure character length of
226 /// the rendered text (closest free proxy for token count
227 /// without pulling a tokenizer dependency). The
228 /// [`ExecutionContext`] carries cancellation + deadline so a
229 /// long-running summarisation respects the same lifetime as the
230 /// dispatch that triggered it. Returns [`Error::Config`] when
231 /// the event log violates the pair invariant *before*
232 /// compaction (e.g. `ToolResult` without a preceding
233 /// `ToolCall`); a well-formed `SessionGraph` never hits this
234 /// path.
235 async fn compact(
236 &self,
237 events: &[GraphEvent],
238 budget_chars: usize,
239 ctx: &ExecutionContext,
240 ) -> Result<CompactedHistory>;
241}
242
243/// Reference compactor: drop oldest turns until the rendered
244/// character count fits under `budget_chars`. Tool round-trips
245/// stay paired by construction; the strategy never partially
246/// includes a turn.
247///
248/// Synchronous in spirit — the async fn body runs to completion
249/// without awaiting any future. Operators that want LLM-generated
250/// summary compaction reach for `entelix_agents::SummaryCompactor`
251/// instead.
252#[derive(Clone, Copy, Debug, Default)]
253pub struct HeadDropCompactor;
254
255#[async_trait]
256impl Compactor for HeadDropCompactor {
257 async fn compact(
258 &self,
259 events: &[GraphEvent],
260 budget_chars: usize,
261 _ctx: &ExecutionContext,
262 ) -> Result<CompactedHistory> {
263 let mut turns = CompactedHistory::group(events)?.turns;
264 // Walk newest to oldest, keep turns that fit under budget.
265 let mut remaining = budget_chars;
266 let mut keep_index = turns.len();
267 for (idx, turn) in turns.iter().enumerate().rev() {
268 let cost = turn_char_cost(turn);
269 if cost > remaining {
270 break;
271 }
272 remaining -= cost;
273 keep_index = idx;
274 }
275 let trimmed = turns.split_off(keep_index);
276 Ok(CompactedHistory::from_turns(trimmed))
277 }
278}
279
280/// Render an in-flight `Vec<Message>` (the shape an agent's working
281/// state carries) into a `Vec<GraphEvent>` that [`Compactor::compact`]
282/// can consume. Inverse of [`CompactedHistory::to_messages`].
283///
284/// Auto-compaction wiring: an agent loop holds messages, not events,
285/// so the trigger path needs this helper to feed the existing
286/// event-shaped compaction surface — preserving the type-enforced
287/// `tool_call` / `tool_result` pair invariant end-to-end.
288///
289/// `Role::System` messages are dropped — system prompts ride outside
290/// the event log by design (configured separately on the model). All
291/// timestamps are stamped with [`Utc::now`] since per-message wall-clock
292/// is unavailable from the message representation; compaction does not
293/// rely on event ordering by timestamp (it uses positional ordering).
294///
295/// Returns [`Error::Config`] when the message sequence violates the
296/// `tool_call` / `tool_result` pair invariant before compaction (e.g.
297/// `Role::Tool` content carrying a `tool_use_id` with no preceding
298/// assistant `ToolUse` part).
299pub fn messages_to_events(messages: &[Message]) -> Result<Vec<GraphEvent>> {
300 let now = Utc::now();
301 let mut events = Vec::with_capacity(messages.len() * 2);
302 for msg in messages {
303 match msg.role {
304 Role::User => {
305 events.push(GraphEvent::UserMessage {
306 content: msg.content.clone(),
307 timestamp: now,
308 });
309 }
310 Role::Assistant => {
311 events.push(GraphEvent::AssistantMessage {
312 content: msg.content.clone(),
313 usage: None,
314 timestamp: now,
315 });
316 for part in &msg.content {
317 if let ContentPart::ToolUse {
318 id, name, input, ..
319 } = part
320 {
321 events.push(GraphEvent::ToolCall {
322 id: id.clone(),
323 name: name.clone(),
324 input: input.clone(),
325 timestamp: now,
326 });
327 }
328 }
329 }
330 Role::Tool => {
331 for part in &msg.content {
332 if let ContentPart::ToolResult {
333 tool_use_id,
334 name,
335 content,
336 is_error,
337 ..
338 } = part
339 {
340 events.push(GraphEvent::ToolResult {
341 tool_use_id: tool_use_id.clone(),
342 name: name.clone(),
343 content: content.clone(),
344 is_error: *is_error,
345 timestamp: now,
346 });
347 }
348 }
349 }
350 // `Role::System` rides outside the event log (configured
351 // separately on the model); future variants similarly do
352 // not represent appendable conversation turns.
353 _ => {}
354 }
355 }
356 Ok(events)
357}
358
359/// Character-length proxy for the token cost of a message slice. Same
360/// metric [`HeadDropCompactor`] uses to compare against `budget_chars`,
361/// so threshold-driven auto-compaction can use the same yardstick.
362#[must_use]
363pub fn messages_char_size(messages: &[Message]) -> usize {
364 messages.iter().map(|m| content_chars(&m.content)).sum()
365}
366
367/// Group events into the type-enforced [`Turn`] shape. Every
368/// `ToolCall` must have a matching `ToolResult` (paired by `id`);
369/// every `ToolResult` must follow an `AssistantMessage`. Returns
370/// [`Error::Config`] on either violation.
371fn group_into_turns(events: &[GraphEvent]) -> Result<Vec<Turn>> {
372 let mut pending_calls: HashMap<String, (String, serde_json::Value)> = HashMap::new();
373 let mut turns: Vec<Turn> = Vec::new();
374 for event in events {
375 match event {
376 GraphEvent::UserMessage { content, .. } => {
377 turns.push(Turn::User {
378 content: content.clone(),
379 });
380 }
381 GraphEvent::AssistantMessage { content, .. } => {
382 turns.push(Turn::Assistant {
383 content: content.clone(),
384 tools: Vec::new(),
385 });
386 }
387 GraphEvent::ToolCall {
388 id, name, input, ..
389 } => {
390 pending_calls.insert(id.clone(), (name.clone(), input.clone()));
391 }
392 GraphEvent::ToolResult {
393 tool_use_id,
394 name,
395 content,
396 is_error,
397 ..
398 } => {
399 let (_call_name, call_input) =
400 pending_calls.remove(tool_use_id).ok_or_else(|| {
401 Error::config(format!(
402 "Compactor: ToolResult tool_use_id={tool_use_id} \
403 has no matching ToolCall in event log"
404 ))
405 })?;
406 let pair = ToolPair {
407 call_id: tool_use_id.clone(),
408 name: name.clone(),
409 input: call_input,
410 result: content.clone(),
411 is_error: *is_error,
412 };
413 let host = turns
414 .iter_mut()
415 .rev()
416 .find(|t| matches!(t, Turn::Assistant { .. }))
417 .ok_or_else(|| {
418 Error::config("Compactor: ToolResult appeared before any AssistantMessage")
419 })?;
420 if let Turn::Assistant { tools, .. } = host {
421 tools.push(pair);
422 }
423 }
424 _ => {}
425 }
426 }
427 if !pending_calls.is_empty() {
428 return Err(Error::config(format!(
429 "Compactor: {} ToolCall(s) without matching ToolResult — pair invariant violated",
430 pending_calls.len()
431 )));
432 }
433 Ok(turns)
434}
435
436/// Character-length proxy for token cost. Walks the turn's content
437/// blocks summing text bytes (UTF-8 byte count, not grapheme count
438/// — the cheap-monotonic property is what matters for "drop until
439/// under budget"). Tool inputs / outputs contribute their JSON
440/// serialisation length.
441fn turn_char_cost(turn: &Turn) -> usize {
442 match turn {
443 Turn::User { content } => content_chars(content),
444 Turn::Assistant { content, tools } => {
445 let mut sum = content_chars(content);
446 for pair in tools {
447 sum += pair.input.to_string().len();
448 sum += match &pair.result {
449 ToolResultContent::Text(s) => s.len(),
450 ToolResultContent::Json(v) => v.to_string().len(),
451 _ => 0,
452 };
453 }
454 sum
455 }
456 }
457}
458
459fn content_chars(parts: &[ContentPart]) -> usize {
460 parts
461 .iter()
462 .map(|p| match p {
463 ContentPart::Text { text, .. } | ContentPart::Thinking { text, .. } => text.len(),
464 ContentPart::ToolUse { input, .. } => input.to_string().len(),
465 ContentPart::ToolResult { content, .. } => match content {
466 ToolResultContent::Text(s) => s.len(),
467 ToolResultContent::Json(v) => v.to_string().len(),
468 _ => 0,
469 },
470 _ => 0,
471 })
472 .sum()
473}
474
475#[cfg(test)]
476#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
477mod tests {
478 use chrono::Utc;
479 use serde_json::json;
480
481 use super::*;
482
483 fn user(text: &str) -> GraphEvent {
484 GraphEvent::UserMessage {
485 content: vec![ContentPart::text(text)],
486 timestamp: Utc::now(),
487 }
488 }
489
490 fn assistant(text: &str) -> GraphEvent {
491 GraphEvent::AssistantMessage {
492 content: vec![ContentPart::text(text)],
493 usage: None,
494 timestamp: Utc::now(),
495 }
496 }
497
498 fn tool_call(id: &str, name: &str, input: serde_json::Value) -> GraphEvent {
499 GraphEvent::ToolCall {
500 id: id.to_owned(),
501 name: name.to_owned(),
502 input,
503 timestamp: Utc::now(),
504 }
505 }
506
507 fn tool_result(id: &str, name: &str, text: &str) -> GraphEvent {
508 GraphEvent::ToolResult {
509 tool_use_id: id.to_owned(),
510 name: name.to_owned(),
511 content: ToolResultContent::Text(text.to_owned()),
512 is_error: false,
513 timestamp: Utc::now(),
514 }
515 }
516
517 #[tokio::test]
518 async fn empty_event_log_compacts_to_empty_history() {
519 let history = HeadDropCompactor
520 .compact(&[], 1024, &ExecutionContext::new())
521 .await
522 .unwrap();
523 assert!(history.is_empty());
524 }
525
526 #[tokio::test]
527 async fn user_assistant_round_trip_preserves_both_turns() {
528 let events = vec![user("hi"), assistant("hello!")];
529 let history = HeadDropCompactor
530 .compact(&events, 1024, &ExecutionContext::new())
531 .await
532 .unwrap();
533 assert_eq!(history.len(), 2);
534 assert!(matches!(history.turns()[0], Turn::User { .. }));
535 assert!(matches!(history.turns()[1], Turn::Assistant { .. }));
536 }
537
538 #[tokio::test]
539 async fn tool_pair_attaches_to_preceding_assistant_turn() {
540 let events = vec![
541 user("compute 1+1"),
542 assistant("calling calculator"),
543 tool_call("call_1", "calculator", json!({"expr": "1+1"})),
544 tool_result("call_1", "calculator", "2"),
545 assistant("answer is 2"),
546 ];
547 let history = HeadDropCompactor
548 .compact(&events, 1024, &ExecutionContext::new())
549 .await
550 .unwrap();
551 assert_eq!(history.len(), 3); // user + assistant + assistant
552 if let Turn::Assistant { tools, .. } = &history.turns()[1] {
553 assert_eq!(tools.len(), 1);
554 assert_eq!(tools[0].id(), "call_1");
555 assert_eq!(tools[0].name(), "calculator");
556 } else {
557 panic!("expected Assistant turn at index 1");
558 }
559 }
560
561 #[tokio::test]
562 async fn tool_result_without_matching_call_returns_config_error() {
563 let events = vec![
564 user("ask"),
565 assistant("calling"),
566 tool_result("orphan", "calc", "x"),
567 ];
568 let err = HeadDropCompactor
569 .compact(&events, 1024, &ExecutionContext::new())
570 .await
571 .unwrap_err();
572 let msg = err.to_string();
573 assert!(
574 msg.contains("orphan"),
575 "diagnostic must name the unmatched id: {msg}"
576 );
577 }
578
579 #[tokio::test]
580 async fn tool_call_without_matching_result_returns_config_error() {
581 let events = vec![
582 user("ask"),
583 assistant("calling"),
584 tool_call("dangling", "calc", json!({})),
585 ];
586 let err = HeadDropCompactor
587 .compact(&events, 1024, &ExecutionContext::new())
588 .await
589 .unwrap_err();
590 let msg = err.to_string();
591 assert!(msg.contains("pair invariant violated"), "got: {msg}");
592 }
593
594 #[tokio::test]
595 async fn budget_drops_oldest_turns_keeps_newest() {
596 // Three user/assistant round-trips. budget_chars selected
597 // to fit only the last two turns.
598 let events = vec![
599 user("one one one"),
600 assistant("one reply"),
601 user("two two two"),
602 assistant("two reply"),
603 user("three three three"),
604 assistant("three reply"),
605 ];
606 let history = HeadDropCompactor
607 .compact(&events, 50, &ExecutionContext::new())
608 .await
609 .unwrap();
610 // Must include the LAST turns under budget — never partial.
611 assert!(!history.is_empty());
612 let last = history.turns().last().unwrap();
613 if let Turn::Assistant { content, .. } = last {
614 if let ContentPart::Text { text, .. } = &content[0] {
615 assert!(
616 text.contains("three"),
617 "newest turn must be retained, got: {text}"
618 );
619 }
620 } else {
621 panic!("expected Assistant as last turn");
622 }
623 }
624
625 #[tokio::test]
626 async fn to_messages_round_trips_user_assistant_tool_sequence() {
627 let events = vec![
628 user("ask"),
629 assistant("calling"),
630 tool_call("c", "tool", json!({})),
631 tool_result("c", "tool", "ok"),
632 ];
633 let history = HeadDropCompactor
634 .compact(&events, 1024, &ExecutionContext::new())
635 .await
636 .unwrap();
637 let msgs = history.to_messages();
638 assert_eq!(msgs.len(), 3); // user, assistant, tool
639 assert!(matches!(msgs[0].role, Role::User));
640 assert!(matches!(msgs[1].role, Role::Assistant));
641 assert!(matches!(msgs[2].role, Role::Tool));
642 }
643
644 #[tokio::test]
645 async fn pair_invariant_holds_under_partial_budget_drop() {
646 // Even when budget forces dropping turns, the retained set
647 // must NEVER contain an unpaired tool — the structural
648 // guarantee of `Turn::Assistant`'s `tools: Vec<ToolPair>`
649 // makes this true by construction; the test pins the
650 // round-trip to catch any future refactor that loosens it.
651 let events = vec![
652 user("u1"),
653 assistant("a1"),
654 tool_call("t1", "x", json!({"v": 1})),
655 tool_result("t1", "x", "r1"),
656 user("u2"),
657 assistant("a2"),
658 ];
659 let history = HeadDropCompactor
660 .compact(&events, 30, &ExecutionContext::new())
661 .await
662 .unwrap();
663 for turn in history.turns() {
664 if let Turn::Assistant { tools, .. } = turn {
665 for pair in tools {
666 // Both halves accessible — proves no half is missing.
667 let _ = (pair.id(), pair.name(), pair.input(), pair.result());
668 }
669 }
670 }
671 }
672}