Skip to main content

entelix_session/
compaction.rs

1//! `Compactor` + sealed `CompactedHistory<Turn>` — type-enforced
2//! `ToolCall` / `ToolResult` pair invariant for context compaction.
3//!
4//! Long agent runs accumulate event logs that exceed the model's
5//! context window. Operators must drop *some* events without
6//! breaking the conversation invariants vendors enforce — chiefly
7//! that every `tool_use` block has a matching `tool_result`.
8//! Mismatched pairs surface as HTTP 400s on the next call;
9//! pydantic-ai's [issue #4137](https://github.com/pydantic/pydantic-ai/issues/4137)
10//! catalogues the recurring footgun across SDKs.
11//!
12//! `entelix-session` closes the foot-gun by exposing compaction
13//! through this module. A [`Compactor`] consumes `&[GraphEvent]`
14//! and returns a [`CompactedHistory`] whose constructor is sealed
15//! to this module — operators cannot hand-build a `CompactedHistory`
16//! that violates the pair invariant. The sealed [`Turn`] enum
17//! groups events into:
18//!
19//! - [`Turn::User`] — one `UserMessage`.
20//! - [`Turn::Assistant`] — one `AssistantMessage` plus zero or
21//!   more [`ToolPair`]s, each binding a `ToolCall` to its
22//!   matching `ToolResult` *by structure*.
23//!
24//! Because `ToolPair` cannot be constructed with only one half,
25//! every compaction strategy operates on whole `Turn`s — the
26//! model never receives a `tool_use` without its `tool_result`.
27//!
28//! ## Reference impl
29//!
30//! [`HeadDropCompactor`] is the canonical "drop oldest" strategy:
31//! walks turns from newest backwards, keeps turns that fit under
32//! the character budget, returns the trimmed window. Operators
33//! whose use case wants summary-style compaction (LLM-generated
34//! synopsis of dropped turns) implement [`Compactor`] directly.
35
36use std::collections::HashMap;
37
38use async_trait::async_trait;
39use chrono::Utc;
40use entelix_core::ExecutionContext;
41use entelix_core::error::{Error, Result};
42use entelix_core::ir::{ContentPart, Message, Role, ToolResultContent};
43
44use crate::event::GraphEvent;
45
46/// One matched `ToolCall` / `ToolResult` pair. Sealed: the only
47/// path to construction is [`Compactor::compact`] internal grouping,
48/// so a pair without both halves cannot exist.
49///
50/// Read-only accessors expose the call's id / name / input and the
51/// result's content / error flag for operators that inspect the
52/// compacted view (rendering, dashboards). Mutation is not
53/// supported — a `Turn` carrying a different pair set is a fresh
54/// compaction.
55#[derive(Clone, Debug)]
56pub struct ToolPair {
57    call_id: String,
58    name: String,
59    input: serde_json::Value,
60    result: ToolResultContent,
61    is_error: bool,
62}
63
64impl ToolPair {
65    /// Stable tool-use id binding the call to its result.
66    pub fn id(&self) -> &str {
67        &self.call_id
68    }
69
70    /// Tool name as registered with the dispatching `ToolRegistry`.
71    pub fn name(&self) -> &str {
72        &self.name
73    }
74
75    /// Tool input as JSON.
76    pub const fn input(&self) -> &serde_json::Value {
77        &self.input
78    }
79
80    /// Result payload returned by the tool.
81    pub const fn result(&self) -> &ToolResultContent {
82        &self.result
83    }
84
85    /// Whether the tool reported an error path.
86    pub const fn is_error(&self) -> bool {
87        self.is_error
88    }
89}
90
91/// One turn in a compacted conversation. Sealed so a `Turn::Assistant`
92/// can only carry [`ToolPair`]s constructed via the compactor's
93/// internal grouping code (paired calls + results).
94#[derive(Clone, Debug)]
95#[non_exhaustive]
96pub enum Turn {
97    /// User-authored message — opaque content, never paired.
98    User {
99        /// Multi-part content (text, image, …).
100        content: Vec<ContentPart>,
101    },
102    /// Assistant-authored message + the tool round-trips it
103    /// initiated. Empty `tools` means the assistant turn produced
104    /// final text only.
105    Assistant {
106        /// Assistant's content (may include `ContentPart::ToolUse`
107        /// blocks; the embedded tool-use ids match the
108        /// corresponding [`ToolPair::id`]s).
109        content: Vec<ContentPart>,
110        /// Matched tool round-trips initiated by this turn.
111        tools: Vec<ToolPair>,
112    },
113}
114
115/// Compacted view over a `SessionGraph`'s event log.
116///
117/// External operators implementing [`Compactor`] for a custom
118/// strategy (LLM-summary compaction, importance-weighted retention,
119/// …) construct the initial form via [`CompactedHistory::group`]
120/// and return either the same value or one rebuilt with
121/// [`CompactedHistory::from_turns`] after filtering. The
122/// `tool_call` / `tool_result` pair invariant stays type-enforced:
123/// the only path to a [`ToolPair`] is the internal grouping code,
124/// so external impls can drop or pass through tool round-trips
125/// but can't synthesize unmatched ones.
126#[derive(Clone, Debug)]
127pub struct CompactedHistory {
128    turns: Vec<Turn>,
129}
130
131impl CompactedHistory {
132    /// Group `events` into the type-enforced [`Turn`] shape and
133    /// return the un-trimmed compaction. The grouping rejects an
134    /// event log that violates the pair invariant *before*
135    /// compaction (e.g. `ToolResult` without a preceding
136    /// `ToolCall`); a well-formed `SessionGraph` never hits the
137    /// error path.
138    ///
139    /// External [`Compactor`] impls call this to get the initial
140    /// grouped form, then choose which turns to retain.
141    pub fn group(events: &[GraphEvent]) -> Result<Self> {
142        Ok(Self {
143            turns: group_into_turns(events)?,
144        })
145    }
146
147    /// Build a `CompactedHistory` from a pre-grouped `Vec<Turn>`.
148    /// External [`Compactor`] impls reach for this after filtering
149    /// or transforming the turns returned by
150    /// [`CompactedHistory::group`]. The pair invariant survives
151    /// the round-trip because the only path to a [`ToolPair`] is
152    /// still the internal grouping — operators pass them through
153    /// but can't synthesize new ones.
154    #[must_use]
155    pub const fn from_turns(turns: Vec<Turn>) -> Self {
156        Self { turns }
157    }
158
159    /// Borrow the compacted turns.
160    pub fn turns(&self) -> &[Turn] {
161        &self.turns
162    }
163
164    /// Number of turns retained.
165    pub const fn len(&self) -> usize {
166        self.turns.len()
167    }
168
169    /// Whether the compacted history is empty.
170    pub const fn is_empty(&self) -> bool {
171        self.turns.is_empty()
172    }
173
174    /// Render as `Vec<Message>` suitable for `ChatModel::complete`.
175    /// Mirrors [`crate::SessionGraph::current_branch_messages`] but
176    /// over the compacted view: every assistant turn's `tool_use`
177    /// blocks are followed by a synthetic `Role::Tool` message
178    /// per [`ToolPair`], so the wire-side codec sees the matched
179    /// pairs the vendor expects.
180    pub fn to_messages(&self) -> Vec<Message> {
181        let mut out = Vec::with_capacity(self.turns.len() * 2);
182        for turn in &self.turns {
183            match turn {
184                Turn::User { content } => {
185                    out.push(Message::new(Role::User, content.clone()));
186                }
187                Turn::Assistant { content, tools } => {
188                    out.push(Message::new(Role::Assistant, content.clone()));
189                    for pair in tools {
190                        out.push(Message::new(
191                            Role::Tool,
192                            vec![ContentPart::ToolResult {
193                                tool_use_id: pair.call_id.clone(),
194                                name: pair.name.clone(),
195                                content: pair.result.clone(),
196                                is_error: pair.is_error,
197                                cache_control: None,
198                                provider_echoes: Vec::new(),
199                            }],
200                        ));
201                    }
202                }
203            }
204        }
205        out
206    }
207}
208
209/// Operator-supplied compaction strategy.
210///
211/// Receives the full event log plus a character-budget hint and
212/// returns the trimmed view. Async by default so summary-style
213/// implementations can dispatch a `ChatModel` call (`SummaryCompactor`
214/// in `entelix-agents` is the canonical reference); pure-retention
215/// strategies (`HeadDropCompactor`) simply ignore the future point
216/// and return synchronously inside the async fn body.
217///
218/// Implementations must preserve the `ToolCall` / `ToolResult` pair
219/// invariant — the [`CompactedHistory`] return type enforces that
220/// structurally; trait authors only need to choose *which* turns to
221/// retain.
222#[async_trait]
223pub trait Compactor: Send + Sync + 'static {
224    /// Compact `events` to fit within `budget_chars`. The budget is
225    /// approximate — implementations measure character length of
226    /// the rendered text (closest free proxy for token count
227    /// without pulling a tokenizer dependency). The
228    /// [`ExecutionContext`] carries cancellation + deadline so a
229    /// long-running summarisation respects the same lifetime as the
230    /// dispatch that triggered it. Returns [`Error::Config`] when
231    /// the event log violates the pair invariant *before*
232    /// compaction (e.g. `ToolResult` without a preceding
233    /// `ToolCall`); a well-formed `SessionGraph` never hits this
234    /// path.
235    async fn compact(
236        &self,
237        events: &[GraphEvent],
238        budget_chars: usize,
239        ctx: &ExecutionContext,
240    ) -> Result<CompactedHistory>;
241}
242
243/// Reference compactor: drop oldest turns until the rendered
244/// character count fits under `budget_chars`. Tool round-trips
245/// stay paired by construction; the strategy never partially
246/// includes a turn.
247///
248/// Synchronous in spirit — the async fn body runs to completion
249/// without awaiting any future. Operators that want LLM-generated
250/// summary compaction reach for `entelix_agents::SummaryCompactor`
251/// instead.
252#[derive(Clone, Copy, Debug, Default)]
253pub struct HeadDropCompactor;
254
255#[async_trait]
256impl Compactor for HeadDropCompactor {
257    async fn compact(
258        &self,
259        events: &[GraphEvent],
260        budget_chars: usize,
261        _ctx: &ExecutionContext,
262    ) -> Result<CompactedHistory> {
263        let mut turns = CompactedHistory::group(events)?.turns;
264        // Walk newest to oldest, keep turns that fit under budget.
265        let mut remaining = budget_chars;
266        let mut keep_index = turns.len();
267        for (idx, turn) in turns.iter().enumerate().rev() {
268            let cost = turn_char_cost(turn);
269            if cost > remaining {
270                break;
271            }
272            remaining -= cost;
273            keep_index = idx;
274        }
275        let trimmed = turns.split_off(keep_index);
276        Ok(CompactedHistory::from_turns(trimmed))
277    }
278}
279
280/// Render an in-flight `Vec<Message>` (the shape an agent's working
281/// state carries) into a `Vec<GraphEvent>` that [`Compactor::compact`]
282/// can consume. Inverse of [`CompactedHistory::to_messages`].
283///
284/// Auto-compaction wiring: an agent loop holds messages, not events,
285/// so the trigger path needs this helper to feed the existing
286/// event-shaped compaction surface — preserving the type-enforced
287/// `tool_call` / `tool_result` pair invariant end-to-end.
288///
289/// `Role::System` messages are dropped — system prompts ride outside
290/// the event log by design (configured separately on the model). All
291/// timestamps are stamped with [`Utc::now`] since per-message wall-clock
292/// is unavailable from the message representation; compaction does not
293/// rely on event ordering by timestamp (it uses positional ordering).
294///
295/// Returns [`Error::Config`] when the message sequence violates the
296/// `tool_call` / `tool_result` pair invariant before compaction (e.g.
297/// `Role::Tool` content carrying a `tool_use_id` with no preceding
298/// assistant `ToolUse` part).
299pub fn messages_to_events(messages: &[Message]) -> Result<Vec<GraphEvent>> {
300    let now = Utc::now();
301    let mut events = Vec::with_capacity(messages.len() * 2);
302    for msg in messages {
303        match msg.role {
304            Role::User => {
305                events.push(GraphEvent::UserMessage {
306                    content: msg.content.clone(),
307                    timestamp: now,
308                });
309            }
310            Role::Assistant => {
311                events.push(GraphEvent::AssistantMessage {
312                    content: msg.content.clone(),
313                    usage: None,
314                    timestamp: now,
315                });
316                for part in &msg.content {
317                    if let ContentPart::ToolUse {
318                        id, name, input, ..
319                    } = part
320                    {
321                        events.push(GraphEvent::ToolCall {
322                            id: id.clone(),
323                            name: name.clone(),
324                            input: input.clone(),
325                            timestamp: now,
326                        });
327                    }
328                }
329            }
330            Role::Tool => {
331                for part in &msg.content {
332                    if let ContentPart::ToolResult {
333                        tool_use_id,
334                        name,
335                        content,
336                        is_error,
337                        ..
338                    } = part
339                    {
340                        events.push(GraphEvent::ToolResult {
341                            tool_use_id: tool_use_id.clone(),
342                            name: name.clone(),
343                            content: content.clone(),
344                            is_error: *is_error,
345                            timestamp: now,
346                        });
347                    }
348                }
349            }
350            // `Role::System` rides outside the event log (configured
351            // separately on the model); future variants similarly do
352            // not represent appendable conversation turns.
353            _ => {}
354        }
355    }
356    Ok(events)
357}
358
359/// Character-length proxy for the token cost of a message slice. Same
360/// metric [`HeadDropCompactor`] uses to compare against `budget_chars`,
361/// so threshold-driven auto-compaction can use the same yardstick.
362#[must_use]
363pub fn messages_char_size(messages: &[Message]) -> usize {
364    messages.iter().map(|m| content_chars(&m.content)).sum()
365}
366
367/// Group events into the type-enforced [`Turn`] shape. Every
368/// `ToolCall` must have a matching `ToolResult` (paired by `id`);
369/// every `ToolResult` must follow an `AssistantMessage`. Returns
370/// [`Error::Config`] on either violation.
371fn group_into_turns(events: &[GraphEvent]) -> Result<Vec<Turn>> {
372    let mut pending_calls: HashMap<String, (String, serde_json::Value)> = HashMap::new();
373    let mut turns: Vec<Turn> = Vec::new();
374    for event in events {
375        match event {
376            GraphEvent::UserMessage { content, .. } => {
377                turns.push(Turn::User {
378                    content: content.clone(),
379                });
380            }
381            GraphEvent::AssistantMessage { content, .. } => {
382                turns.push(Turn::Assistant {
383                    content: content.clone(),
384                    tools: Vec::new(),
385                });
386            }
387            GraphEvent::ToolCall {
388                id, name, input, ..
389            } => {
390                pending_calls.insert(id.clone(), (name.clone(), input.clone()));
391            }
392            GraphEvent::ToolResult {
393                tool_use_id,
394                name,
395                content,
396                is_error,
397                ..
398            } => {
399                let (_call_name, call_input) =
400                    pending_calls.remove(tool_use_id).ok_or_else(|| {
401                        Error::config(format!(
402                            "Compactor: ToolResult tool_use_id={tool_use_id} \
403                             has no matching ToolCall in event log"
404                        ))
405                    })?;
406                let pair = ToolPair {
407                    call_id: tool_use_id.clone(),
408                    name: name.clone(),
409                    input: call_input,
410                    result: content.clone(),
411                    is_error: *is_error,
412                };
413                let host = turns
414                    .iter_mut()
415                    .rev()
416                    .find(|t| matches!(t, Turn::Assistant { .. }))
417                    .ok_or_else(|| {
418                        Error::config("Compactor: ToolResult appeared before any AssistantMessage")
419                    })?;
420                if let Turn::Assistant { tools, .. } = host {
421                    tools.push(pair);
422                }
423            }
424            _ => {}
425        }
426    }
427    if !pending_calls.is_empty() {
428        return Err(Error::config(format!(
429            "Compactor: {} ToolCall(s) without matching ToolResult — pair invariant violated",
430            pending_calls.len()
431        )));
432    }
433    Ok(turns)
434}
435
436/// Character-length proxy for token cost. Walks the turn's content
437/// blocks summing text bytes (UTF-8 byte count, not grapheme count
438/// — the cheap-monotonic property is what matters for "drop until
439/// under budget"). Tool inputs / outputs contribute their JSON
440/// serialisation length.
441fn turn_char_cost(turn: &Turn) -> usize {
442    match turn {
443        Turn::User { content } => content_chars(content),
444        Turn::Assistant { content, tools } => {
445            let mut sum = content_chars(content);
446            for pair in tools {
447                sum += pair.input.to_string().len();
448                sum += match &pair.result {
449                    ToolResultContent::Text(s) => s.len(),
450                    ToolResultContent::Json(v) => v.to_string().len(),
451                    _ => 0,
452                };
453            }
454            sum
455        }
456    }
457}
458
459fn content_chars(parts: &[ContentPart]) -> usize {
460    parts
461        .iter()
462        .map(|p| match p {
463            ContentPart::Text { text, .. } | ContentPart::Thinking { text, .. } => text.len(),
464            ContentPart::ToolUse { input, .. } => input.to_string().len(),
465            ContentPart::ToolResult { content, .. } => match content {
466                ToolResultContent::Text(s) => s.len(),
467                ToolResultContent::Json(v) => v.to_string().len(),
468                _ => 0,
469            },
470            _ => 0,
471        })
472        .sum()
473}
474
475#[cfg(test)]
476#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
477mod tests {
478    use chrono::Utc;
479    use serde_json::json;
480
481    use super::*;
482
483    fn user(text: &str) -> GraphEvent {
484        GraphEvent::UserMessage {
485            content: vec![ContentPart::text(text)],
486            timestamp: Utc::now(),
487        }
488    }
489
490    fn assistant(text: &str) -> GraphEvent {
491        GraphEvent::AssistantMessage {
492            content: vec![ContentPart::text(text)],
493            usage: None,
494            timestamp: Utc::now(),
495        }
496    }
497
498    fn tool_call(id: &str, name: &str, input: serde_json::Value) -> GraphEvent {
499        GraphEvent::ToolCall {
500            id: id.to_owned(),
501            name: name.to_owned(),
502            input,
503            timestamp: Utc::now(),
504        }
505    }
506
507    fn tool_result(id: &str, name: &str, text: &str) -> GraphEvent {
508        GraphEvent::ToolResult {
509            tool_use_id: id.to_owned(),
510            name: name.to_owned(),
511            content: ToolResultContent::Text(text.to_owned()),
512            is_error: false,
513            timestamp: Utc::now(),
514        }
515    }
516
517    #[tokio::test]
518    async fn empty_event_log_compacts_to_empty_history() {
519        let history = HeadDropCompactor
520            .compact(&[], 1024, &ExecutionContext::new())
521            .await
522            .unwrap();
523        assert!(history.is_empty());
524    }
525
526    #[tokio::test]
527    async fn user_assistant_round_trip_preserves_both_turns() {
528        let events = vec![user("hi"), assistant("hello!")];
529        let history = HeadDropCompactor
530            .compact(&events, 1024, &ExecutionContext::new())
531            .await
532            .unwrap();
533        assert_eq!(history.len(), 2);
534        assert!(matches!(history.turns()[0], Turn::User { .. }));
535        assert!(matches!(history.turns()[1], Turn::Assistant { .. }));
536    }
537
538    #[tokio::test]
539    async fn tool_pair_attaches_to_preceding_assistant_turn() {
540        let events = vec![
541            user("compute 1+1"),
542            assistant("calling calculator"),
543            tool_call("call_1", "calculator", json!({"expr": "1+1"})),
544            tool_result("call_1", "calculator", "2"),
545            assistant("answer is 2"),
546        ];
547        let history = HeadDropCompactor
548            .compact(&events, 1024, &ExecutionContext::new())
549            .await
550            .unwrap();
551        assert_eq!(history.len(), 3); // user + assistant + assistant
552        if let Turn::Assistant { tools, .. } = &history.turns()[1] {
553            assert_eq!(tools.len(), 1);
554            assert_eq!(tools[0].id(), "call_1");
555            assert_eq!(tools[0].name(), "calculator");
556        } else {
557            panic!("expected Assistant turn at index 1");
558        }
559    }
560
561    #[tokio::test]
562    async fn tool_result_without_matching_call_returns_config_error() {
563        let events = vec![
564            user("ask"),
565            assistant("calling"),
566            tool_result("orphan", "calc", "x"),
567        ];
568        let err = HeadDropCompactor
569            .compact(&events, 1024, &ExecutionContext::new())
570            .await
571            .unwrap_err();
572        let msg = err.to_string();
573        assert!(
574            msg.contains("orphan"),
575            "diagnostic must name the unmatched id: {msg}"
576        );
577    }
578
579    #[tokio::test]
580    async fn tool_call_without_matching_result_returns_config_error() {
581        let events = vec![
582            user("ask"),
583            assistant("calling"),
584            tool_call("dangling", "calc", json!({})),
585        ];
586        let err = HeadDropCompactor
587            .compact(&events, 1024, &ExecutionContext::new())
588            .await
589            .unwrap_err();
590        let msg = err.to_string();
591        assert!(msg.contains("pair invariant violated"), "got: {msg}");
592    }
593
594    #[tokio::test]
595    async fn budget_drops_oldest_turns_keeps_newest() {
596        // Three user/assistant round-trips. budget_chars selected
597        // to fit only the last two turns.
598        let events = vec![
599            user("one one one"),
600            assistant("one reply"),
601            user("two two two"),
602            assistant("two reply"),
603            user("three three three"),
604            assistant("three reply"),
605        ];
606        let history = HeadDropCompactor
607            .compact(&events, 50, &ExecutionContext::new())
608            .await
609            .unwrap();
610        // Must include the LAST turns under budget — never partial.
611        assert!(!history.is_empty());
612        let last = history.turns().last().unwrap();
613        if let Turn::Assistant { content, .. } = last {
614            if let ContentPart::Text { text, .. } = &content[0] {
615                assert!(
616                    text.contains("three"),
617                    "newest turn must be retained, got: {text}"
618                );
619            }
620        } else {
621            panic!("expected Assistant as last turn");
622        }
623    }
624
625    #[tokio::test]
626    async fn to_messages_round_trips_user_assistant_tool_sequence() {
627        let events = vec![
628            user("ask"),
629            assistant("calling"),
630            tool_call("c", "tool", json!({})),
631            tool_result("c", "tool", "ok"),
632        ];
633        let history = HeadDropCompactor
634            .compact(&events, 1024, &ExecutionContext::new())
635            .await
636            .unwrap();
637        let msgs = history.to_messages();
638        assert_eq!(msgs.len(), 3); // user, assistant, tool
639        assert!(matches!(msgs[0].role, Role::User));
640        assert!(matches!(msgs[1].role, Role::Assistant));
641        assert!(matches!(msgs[2].role, Role::Tool));
642    }
643
644    #[tokio::test]
645    async fn pair_invariant_holds_under_partial_budget_drop() {
646        // Even when budget forces dropping turns, the retained set
647        // must NEVER contain an unpaired tool — the structural
648        // guarantee of `Turn::Assistant`'s `tools: Vec<ToolPair>`
649        // makes this true by construction; the test pins the
650        // round-trip to catch any future refactor that loosens it.
651        let events = vec![
652            user("u1"),
653            assistant("a1"),
654            tool_call("t1", "x", json!({"v": 1})),
655            tool_result("t1", "x", "r1"),
656            user("u2"),
657            assistant("a2"),
658        ];
659        let history = HeadDropCompactor
660            .compact(&events, 30, &ExecutionContext::new())
661            .await
662            .unwrap();
663        for turn in history.turns() {
664            if let Turn::Assistant { tools, .. } = turn {
665                for pair in tools {
666                    // Both halves accessible — proves no half is missing.
667                    let _ = (pair.id(), pair.name(), pair.input(), pair.result());
668                }
669            }
670        }
671    }
672}