Skip to main content

entelix_agents/
compaction.rs

1//! Auto-compaction adapter + the canonical LLM-summary compactor.
2//!
3//! ## [`RunnableCompacting`] — agent-orthogonal trigger
4//!
5//! Compaction is a *message-history concern* that lives orthogonal to
6//! recipe choice (ReAct / Supervisor / Chat). Wrapping the model
7//! itself with [`RunnableCompacting`] composes through every recipe
8//! unchanged — the planner / agent / chat node calls `model.invoke(.)`
9//! and the wrapper transparently compacts before delegating. No
10//! recipe code touches the trigger logic; new recipes inherit the
11//! behaviour for free.
12//!
13//! The wrapper routes through [`messages_to_events`] +
14//! [`Compactor::compact`] + [`CompactedHistory::to_messages`], so the
15//! sealed `tool_call` / `tool_result` pair invariant
16//! ([`entelix_session::ToolPair`]) survives the round-trip. The
17//! vendor-side wire format never sees an unmatched tool block.
18//!
19//! ## [`SummaryCompactor`] — LLM-summary [`Compactor`] impl
20//!
21//! Operators wanting Claude Agent SDK's auto-compaction behaviour or
22//! LangChain's `SummarizationMiddleware` reach for [`SummaryCompactor`]:
23//! the oldest turns past `keep_recent_turns` are rendered, summarised
24//! by an operator-supplied summariser model, and replaced with a single
25//! synthetic `Turn::User` carrying the summary. Pair invariant survives
26//! because dropped turns leave with their `ToolPair`s.
27
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use entelix_core::ir::{ContentPart, Message, Role};
32use entelix_core::{ExecutionContext, Result};
33use entelix_runnable::Runnable;
34use entelix_session::{
35    CompactedHistory, Compactor, GraphEvent, Turn, messages_char_size, messages_to_events,
36};
37
38/// `Runnable<Vec<Message>, Message>` wrapper that compacts the input
39/// message slice through an operator-supplied [`Compactor`] when the
40/// total character count meets or exceeds `threshold_chars`. Below the
41/// threshold the wrapper is a no-op delegate — the inner runnable
42/// receives the original `Vec<Message>` unchanged.
43///
44/// Construct via [`MessageRunnableCompactionExt::with_compaction`]:
45///
46/// ```ignore
47/// use entelix::{Compactor, HeadDropCompactor, MessageRunnableCompactionExt};
48/// use std::sync::Arc;
49///
50/// let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
51/// let model = my_chat_model.with_compaction(compactor, 8_192);
52/// let agent = entelix::create_react_agent(model, tools, None)?;
53/// ```
54pub struct RunnableCompacting<R> {
55    inner: R,
56    compactor: Arc<dyn Compactor>,
57    threshold_chars: usize,
58}
59
60impl<R> RunnableCompacting<R> {
61    /// Threshold (in character count) at and above which the wrapper
62    /// invokes the [`Compactor`]. Mirrors the `budget_chars` semantic
63    /// the compactor uses to size its output.
64    #[must_use]
65    pub const fn threshold_chars(&self) -> usize {
66        self.threshold_chars
67    }
68
69    /// Borrow the wrapped runnable.
70    pub const fn inner(&self) -> &R {
71        &self.inner
72    }
73}
74
75#[async_trait]
76impl<R> Runnable<Vec<Message>, Message> for RunnableCompacting<R>
77where
78    R: Runnable<Vec<Message>, Message> + Send + Sync + 'static,
79{
80    async fn invoke(&self, input: Vec<Message>, ctx: &ExecutionContext) -> Result<Message> {
81        let input = if messages_char_size(&input) >= self.threshold_chars {
82            let dropped_size = messages_char_size(&input);
83            let events = messages_to_events(&input)?;
84            let compacted = self
85                .compactor
86                .compact(&events, self.threshold_chars, ctx)
87                .await?
88                .to_messages();
89            let retained_size = messages_char_size(&compacted);
90            if let Some(handle) = ctx.audit_sink() {
91                handle.as_sink().record_context_compacted(
92                    dropped_size.saturating_sub(retained_size),
93                    retained_size,
94                );
95            }
96            compacted
97        } else {
98            input
99        };
100        self.inner.invoke(input, ctx).await
101    }
102}
103
104/// Extension trait that attaches [`RunnableCompacting`] to any
105/// `Runnable<Vec<Message>, Message>`. Blanket-impl'd for every such
106/// runnable so a model accepting messages — including layered models
107/// (`OtelLayer`, `PolicyLayer`, `RetryService`) — can chain `.with_compaction(.)`
108/// without a separate import per concrete type.
109pub trait MessageRunnableCompactionExt: Runnable<Vec<Message>, Message> + Sized {
110    /// Wrap with auto-compaction. The wrapper is itself a
111    /// `Runnable<Vec<Message>, Message>`, so it composes back into
112    /// any recipe that takes a model.
113    fn with_compaction(
114        self,
115        compactor: Arc<dyn Compactor>,
116        threshold_chars: usize,
117    ) -> RunnableCompacting<Self> {
118        RunnableCompacting {
119            inner: self,
120            compactor,
121            threshold_chars,
122        }
123    }
124}
125
126impl<R> MessageRunnableCompactionExt for R where R: Runnable<Vec<Message>, Message> + Sized {}
127
128/// Default system prompt the [`SummaryCompactor`] sends to its
129/// summariser model when the operator does not override. Phrased as a
130/// neutral compress-the-prior-conversation instruction so it works
131/// across vendors that route system prompts identically.
132pub const DEFAULT_SUMMARY_SYSTEM_PROMPT: &str = "You are a conversation summariser. Distil the conversation below into 100-200 words preserving key facts, decisions, entities, and tool outcomes. Output ONLY the summary text — no preamble, no commentary.";
133
134/// Default count of newest turns the [`SummaryCompactor`] keeps verbatim
135/// before summarising the older history into one synthetic turn. Four
136/// matches the typical LLM-agent rhythm (most recent user/assistant
137/// pair plus one preceding pair) — small enough that summarisation
138/// kicks in early, large enough that adjacent context survives.
139pub const DEFAULT_SUMMARY_KEEP_RECENT_TURNS: usize = 4;
140
141/// LLM-summary [`Compactor`] — drops the oldest turns past
142/// `keep_recent_turns` into a single summarised `Turn::User`, leaving
143/// the most recent turns verbatim.
144///
145/// Pair invariant: dropped turns carry their `ToolPair`s away with
146/// them — the retained set keeps every `Turn::Assistant`'s `tools`
147/// vector intact, so the wire-side codec never sees an unmatched
148/// tool block.
149///
150/// Construct with [`SummaryCompactor::new`] then chain
151/// [`SummaryCompactor::with_system_prompt`] /
152/// [`SummaryCompactor::with_keep_recent_turns`] for tuning. The
153/// summariser model is any `Runnable<Vec<Message>, Message>` — the
154/// operator's `ChatModel`, a layered model, or a stub for tests.
155pub struct SummaryCompactor<M> {
156    model: Arc<M>,
157    system_prompt: String,
158    keep_recent_turns: usize,
159}
160
161impl<M> SummaryCompactor<M> {
162    /// Construct with the default system prompt and keep-recent count.
163    #[must_use]
164    pub fn new(model: Arc<M>) -> Self {
165        Self {
166            model,
167            system_prompt: DEFAULT_SUMMARY_SYSTEM_PROMPT.to_owned(),
168            keep_recent_turns: DEFAULT_SUMMARY_KEEP_RECENT_TURNS,
169        }
170    }
171
172    /// Override the system prompt. Operators with a custom voice or
173    /// downstream-format requirement (e.g. JSON envelope) point the
174    /// summariser via this knob.
175    #[must_use]
176    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
177        self.system_prompt = prompt.into();
178        self
179    }
180
181    /// Override how many newest turns are retained verbatim. Higher
182    /// values preserve more recent context at the cost of leaving more
183    /// budget pressure for the summariser to manage.
184    #[must_use]
185    pub const fn with_keep_recent_turns(mut self, n: usize) -> Self {
186        self.keep_recent_turns = n;
187        self
188    }
189}
190
191#[async_trait]
192impl<M> Compactor for SummaryCompactor<M>
193where
194    M: Runnable<Vec<Message>, Message> + Send + Sync + 'static,
195{
196    async fn compact(
197        &self,
198        events: &[GraphEvent],
199        _budget_chars: usize,
200        ctx: &ExecutionContext,
201    ) -> Result<CompactedHistory> {
202        let grouped = CompactedHistory::group(events)?;
203        let total = grouped.len();
204        if total <= self.keep_recent_turns {
205            return Ok(grouped);
206        }
207        let split_at = total - self.keep_recent_turns;
208        let mut all = grouped.turns().to_vec();
209        let recent = all.split_off(split_at);
210        let older = all;
211        if older.is_empty() {
212            return Ok(CompactedHistory::from_turns(recent));
213        }
214        let older_messages = CompactedHistory::from_turns(older).to_messages();
215        let mut prompt = Vec::with_capacity(older_messages.len() + 1);
216        prompt.push(Message::new(
217            Role::System,
218            vec![ContentPart::text(self.system_prompt.clone())],
219        ));
220        prompt.extend(older_messages);
221        let summary_msg = self.model.invoke(prompt, ctx).await?;
222        let summary_text = extract_text(&summary_msg.content);
223        let summary_turn = Turn::User {
224            content: vec![ContentPart::text(format!(
225                "[Summary of earlier conversation]\n{summary_text}"
226            ))],
227        };
228        let mut combined = Vec::with_capacity(1 + recent.len());
229        combined.push(summary_turn);
230        combined.extend(recent);
231        Ok(CompactedHistory::from_turns(combined))
232    }
233}
234
235fn extract_text(parts: &[ContentPart]) -> String {
236    let mut out = String::new();
237    for part in parts {
238        if let ContentPart::Text { text, .. } = part {
239            if !out.is_empty() {
240                out.push('\n');
241            }
242            out.push_str(text);
243        }
244    }
245    out
246}
247
248#[cfg(test)]
249#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
250mod tests {
251    use std::sync::atomic::{AtomicUsize, Ordering};
252
253    use entelix_core::ir::{ContentPart, Message, Role};
254    use entelix_session::HeadDropCompactor;
255    use parking_lot::Mutex;
256
257    use super::*;
258
259    struct EchoModel {
260        invocations: AtomicUsize,
261        last_input_len: AtomicUsize,
262    }
263
264    impl EchoModel {
265        fn new() -> Self {
266            Self {
267                invocations: AtomicUsize::new(0),
268                last_input_len: AtomicUsize::new(0),
269            }
270        }
271    }
272
273    #[async_trait]
274    impl Runnable<Vec<Message>, Message> for EchoModel {
275        async fn invoke(&self, input: Vec<Message>, _ctx: &ExecutionContext) -> Result<Message> {
276            self.invocations.fetch_add(1, Ordering::SeqCst);
277            self.last_input_len.store(input.len(), Ordering::SeqCst);
278            Ok(Message::new(Role::Assistant, vec![ContentPart::text("ok")]))
279        }
280    }
281
282    fn user(text: &str) -> Message {
283        Message::new(Role::User, vec![ContentPart::text(text)])
284    }
285
286    fn assistant(text: &str) -> Message {
287        Message::new(Role::Assistant, vec![ContentPart::text(text)])
288    }
289
290    #[tokio::test]
291    async fn passes_through_below_threshold() {
292        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
293        let wrapped = EchoModel::new().with_compaction(compactor, 1024);
294
295        let input = vec![user("short"), assistant("ok")];
296        let _ = wrapped
297            .invoke(input.clone(), &ExecutionContext::new())
298            .await
299            .unwrap();
300        assert_eq!(
301            wrapped.inner().last_input_len.load(Ordering::SeqCst),
302            input.len()
303        );
304    }
305
306    #[tokio::test]
307    async fn compacts_when_threshold_exceeded() {
308        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
309        let model = EchoModel::new();
310        // Threshold = 30 characters. Three round-trips well above that
311        // trigger compaction; the head-drop strategy retains the
312        // newest turns that fit under the budget.
313        let wrapped = model.with_compaction(compactor, 30);
314
315        let input = vec![
316            user("one one one one"),
317            assistant("first reply long enough"),
318            user("two two two two"),
319            assistant("second reply long enough"),
320            user("three three three three"),
321            assistant("third reply"),
322        ];
323        let _ = wrapped
324            .invoke(input.clone(), &ExecutionContext::new())
325            .await
326            .unwrap();
327        let observed_len = wrapped.inner().last_input_len.load(Ordering::SeqCst);
328        assert!(
329            observed_len < input.len(),
330            "compaction must trim — got {observed_len}, input had {}",
331            input.len()
332        );
333    }
334
335    /// `AuditSink` test impl that records every `record_*` call so
336    /// the compaction-emit assertion can verify the audit channel is
337    /// actually crossed (invariant 18).
338    struct CapturingAuditSink {
339        compactions: Mutex<Vec<(usize, usize)>>,
340    }
341
342    impl CapturingAuditSink {
343        fn new() -> Self {
344            Self {
345                compactions: Mutex::new(Vec::new()),
346            }
347        }
348    }
349
350    impl entelix_core::AuditSink for CapturingAuditSink {
351        fn record_sub_agent_invoked(&self, _agent_id: &str, _sub_thread_id: &str) {}
352        fn record_agent_handoff(&self, _from: Option<&str>, _to: &str) {}
353        fn record_interrupted(
354            &self,
355            _kind: &entelix_core::InterruptionKind,
356            _payload: &serde_json::Value,
357        ) {
358        }
359        fn record_resumed(&self, _from_checkpoint: &str) {}
360        fn record_memory_recall(&self, _tier: &str, _namespace_key: &str, _hits: usize) {}
361        fn record_usage_limit_exceeded(&self, _breach: &entelix_core::UsageLimitBreach) {}
362        fn record_context_compacted(&self, dropped_chars: usize, retained_chars: usize) {
363            self.compactions
364                .lock()
365                .push((dropped_chars, retained_chars));
366        }
367        fn record_tool_error_terminal(&self, _kind: entelix_core::ToolErrorKind, _tool_name: &str) {
368        }
369    }
370
371    #[tokio::test]
372    async fn compaction_records_audit_event_when_threshold_exceeded() {
373        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
374        let model = EchoModel::new();
375        let wrapped = model.with_compaction(compactor, 30);
376        let sink = Arc::new(CapturingAuditSink::new());
377        let ctx = ExecutionContext::new()
378            .with_audit_sink(entelix_core::AuditSinkHandle::new(sink.clone()));
379
380        let input = vec![
381            user("padding to force compaction one one one one"),
382            assistant("more padding to force compaction"),
383            user("trailing turn"),
384            assistant("ok"),
385        ];
386        let _ = wrapped.invoke(input, &ctx).await.unwrap();
387
388        let captured = sink.compactions.lock().clone();
389        assert_eq!(captured.len(), 1, "exactly one compaction event expected");
390        let (dropped, _retained) = captured[0];
391        assert!(dropped > 0, "must report some dropped characters");
392    }
393
394    #[tokio::test]
395    async fn compaction_records_no_audit_event_below_threshold() {
396        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
397        let model = EchoModel::new();
398        let wrapped = model.with_compaction(compactor, 1024);
399        let sink = Arc::new(CapturingAuditSink::new());
400        let ctx = ExecutionContext::new()
401            .with_audit_sink(entelix_core::AuditSinkHandle::new(sink.clone()));
402
403        let input = vec![user("short"), assistant("ok")];
404        let _ = wrapped.invoke(input, &ctx).await.unwrap();
405
406        assert!(
407            sink.compactions.lock().is_empty(),
408            "no audit event expected when threshold is not crossed"
409        );
410    }
411
412    #[tokio::test]
413    async fn empty_messages_pass_through() {
414        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
415        let model = EchoModel::new();
416        let wrapped = model.with_compaction(compactor, 1024);
417        let _ = wrapped
418            .invoke(Vec::new(), &ExecutionContext::new())
419            .await
420            .unwrap();
421        assert_eq!(wrapped.inner().last_input_len.load(Ordering::SeqCst), 0);
422    }
423
424    /// Stub summariser model that records the prompt it received and
425    /// always replies with a fixed summary text. Lets the
426    /// `SummaryCompactor` tests assert exactly which turns were sent
427    /// to the summariser.
428    struct StubSummariser {
429        captured_prompt: Mutex<Vec<Message>>,
430        reply: String,
431    }
432
433    impl StubSummariser {
434        fn new(reply: impl Into<String>) -> Self {
435            Self {
436                captured_prompt: Mutex::new(Vec::new()),
437                reply: reply.into(),
438            }
439        }
440    }
441
442    #[async_trait]
443    impl Runnable<Vec<Message>, Message> for StubSummariser {
444        async fn invoke(&self, input: Vec<Message>, _ctx: &ExecutionContext) -> Result<Message> {
445            *self.captured_prompt.lock() = input;
446            Ok(Message::new(
447                Role::Assistant,
448                vec![ContentPart::text(self.reply.clone())],
449            ))
450        }
451    }
452
453    fn user_event(text: &str) -> entelix_session::GraphEvent {
454        entelix_session::GraphEvent::UserMessage {
455            content: vec![ContentPart::text(text)],
456            timestamp: chrono::Utc::now(),
457        }
458    }
459
460    fn assistant_event(text: &str) -> entelix_session::GraphEvent {
461        entelix_session::GraphEvent::AssistantMessage {
462            content: vec![ContentPart::text(text)],
463            usage: None,
464            timestamp: chrono::Utc::now(),
465        }
466    }
467
468    #[tokio::test]
469    async fn summary_compactor_skips_when_under_keep_recent_threshold() {
470        let summariser = Arc::new(StubSummariser::new("never invoked"));
471        let compactor = SummaryCompactor::new(summariser.clone()).with_keep_recent_turns(8);
472        let events = vec![
473            user_event("u1"),
474            assistant_event("a1"),
475            user_event("u2"),
476            assistant_event("a2"),
477        ];
478        let history = compactor
479            .compact(&events, 0, &ExecutionContext::new())
480            .await
481            .unwrap();
482        assert_eq!(history.len(), 4);
483        assert!(
484            summariser.captured_prompt.lock().is_empty(),
485            "summariser must NOT be invoked when total <= keep_recent_turns"
486        );
487    }
488
489    #[tokio::test]
490    async fn summary_compactor_replaces_older_turns_with_summary() {
491        let summariser = Arc::new(StubSummariser::new("brief recap"));
492        let compactor = SummaryCompactor::new(summariser.clone()).with_keep_recent_turns(2);
493        let events = vec![
494            user_event("oldest user"),
495            assistant_event("oldest assistant"),
496            user_event("middle user"),
497            assistant_event("middle assistant"),
498            user_event("newest user"),
499            assistant_event("newest assistant"),
500        ];
501        let history = compactor
502            .compact(&events, 0, &ExecutionContext::new())
503            .await
504            .unwrap();
505        // Summary turn (1) + retained newest turns (2) = 3
506        assert_eq!(history.len(), 3);
507        // Head is the synthetic User summary.
508        if let Turn::User { content } = &history.turns()[0] {
509            if let ContentPart::Text { text, .. } = &content[0] {
510                assert!(text.contains("Summary"), "summary marker missing: {text}");
511                assert!(
512                    text.contains("brief recap"),
513                    "summariser reply missing: {text}"
514                );
515            }
516        } else {
517            panic!("expected User turn at head");
518        }
519        // Summariser was invoked with system + 4 older turns rendered as messages.
520        let captured_len;
521        let captured_role;
522        {
523            let captured = summariser.captured_prompt.lock();
524            captured_len = captured.len();
525            captured_role = captured[0].role;
526        }
527        assert!(
528            captured_len >= 5,
529            "expected system + ≥4 older messages, got {captured_len}"
530        );
531        assert!(matches!(captured_role, Role::System));
532    }
533
534    #[tokio::test]
535    async fn summary_compactor_with_system_prompt_overrides_default() {
536        let summariser = Arc::new(StubSummariser::new("ok"));
537        let compactor = SummaryCompactor::new(summariser.clone())
538            .with_keep_recent_turns(0)
539            .with_system_prompt("CUSTOM PROMPT MARKER");
540        let events = vec![user_event("hi"), assistant_event("hello")];
541        let _ = compactor
542            .compact(&events, 0, &ExecutionContext::new())
543            .await
544            .unwrap();
545        let prompt_text = {
546            let captured = summariser.captured_prompt.lock();
547            if let ContentPart::Text { text, .. } = &captured[0].content[0] {
548                text.clone()
549            } else {
550                panic!("expected Text part at system position");
551            }
552        };
553        assert!(
554            prompt_text.contains("CUSTOM PROMPT MARKER"),
555            "operator-supplied prompt must reach the summariser, got: {prompt_text}"
556        );
557    }
558}