Skip to main content

entelix_agents/
compaction.rs

1//! Auto-compaction adapter + the canonical LLM-summary compactor.
2//!
3//! ## [`RunnableCompacting`] — agent-orthogonal trigger
4//!
5//! Compaction is a *message-history concern* that lives orthogonal to
6//! recipe choice (ReAct / Supervisor / Chat). Wrapping the model
7//! itself with [`RunnableCompacting`] composes through every recipe
8//! unchanged — the planner / agent / chat node calls `model.invoke(.)`
9//! and the wrapper transparently compacts before delegating. No
10//! recipe code touches the trigger logic; new recipes inherit the
11//! behaviour for free.
12//!
13//! The wrapper routes through [`messages_to_events`] +
14//! [`Compactor::compact`] + [`CompactedHistory::to_messages`], so the
15//! sealed `tool_call` / `tool_result` pair invariant
16//! ([`entelix_session::ToolPair`]) survives the round-trip. The
17//! vendor-side wire format never sees an unmatched tool block.
18//!
19//! ## [`SummaryCompactor`] — LLM-summary [`Compactor`] impl
20//!
21//! Operators wanting Claude Agent SDK's auto-compaction behaviour or
22//! LangChain's `SummarizationMiddleware` reach for [`SummaryCompactor`]:
23//! the oldest turns past `keep_recent_turns` are rendered, summarised
24//! by an operator-supplied summariser model, and replaced with a single
25//! synthetic `Turn::User` carrying the summary. Pair invariant survives
26//! because dropped turns leave with their `ToolPair`s.
27
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use entelix_core::ir::{ContentPart, Message, Role};
32use entelix_core::{ExecutionContext, Result};
33use entelix_runnable::Runnable;
34use entelix_session::{
35    CompactedHistory, Compactor, GraphEvent, Turn, messages_char_size, messages_to_events,
36};
37
38/// `Runnable<Vec<Message>, Message>` wrapper that compacts the input
39/// message slice through an operator-supplied [`Compactor`] when the
40/// total character count meets or exceeds `threshold_chars`. Below the
41/// threshold the wrapper is a no-op delegate — the inner runnable
42/// receives the original `Vec<Message>` unchanged.
43///
44/// Construct via [`MessageRunnableCompactionExt::with_compaction`]:
45///
46/// ```ignore
47/// use entelix::{Compactor, HeadDropCompactor, MessageRunnableCompactionExt};
48/// use std::sync::Arc;
49///
50/// let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
51/// let model = my_chat_model.with_compaction(compactor, 8_192);
52/// let agent = entelix::create_react_agent(model, tools, None)?;
53/// ```
54pub struct RunnableCompacting<R> {
55    inner: R,
56    compactor: Arc<dyn Compactor>,
57    threshold_chars: usize,
58}
59
60impl<R> RunnableCompacting<R> {
61    /// Threshold (in character count) at and above which the wrapper
62    /// invokes the [`Compactor`]. Mirrors the `budget_chars` semantic
63    /// the compactor uses to size its output.
64    #[must_use]
65    pub const fn threshold_chars(&self) -> usize {
66        self.threshold_chars
67    }
68
69    /// Borrow the wrapped runnable.
70    pub const fn inner(&self) -> &R {
71        &self.inner
72    }
73}
74
75#[async_trait]
76impl<R> Runnable<Vec<Message>, Message> for RunnableCompacting<R>
77where
78    R: Runnable<Vec<Message>, Message> + Send + Sync + 'static,
79{
80    async fn invoke(&self, input: Vec<Message>, ctx: &ExecutionContext) -> Result<Message> {
81        let input = if messages_char_size(&input) >= self.threshold_chars {
82            let dropped_size = messages_char_size(&input);
83            let events = messages_to_events(&input)?;
84            let compacted = self
85                .compactor
86                .compact(&events, self.threshold_chars, ctx)
87                .await?
88                .to_messages();
89            let retained_size = messages_char_size(&compacted);
90            if let Some(handle) = ctx.audit_sink() {
91                handle.as_sink().record_context_compacted(
92                    dropped_size.saturating_sub(retained_size),
93                    retained_size,
94                );
95            }
96            compacted
97        } else {
98            input
99        };
100        self.inner.invoke(input, ctx).await
101    }
102}
103
104/// Extension trait that attaches [`RunnableCompacting`] to any
105/// `Runnable<Vec<Message>, Message>`. Blanket-impl'd for every such
106/// runnable so a model accepting messages — including layered models
107/// (`OtelLayer`, `PolicyLayer`, `RetryService`) — can chain `.with_compaction(.)`
108/// without a separate import per concrete type.
109pub trait MessageRunnableCompactionExt: Runnable<Vec<Message>, Message> + Sized {
110    /// Wrap with auto-compaction. The wrapper is itself a
111    /// `Runnable<Vec<Message>, Message>`, so it composes back into
112    /// any recipe that takes a model.
113    fn with_compaction(
114        self,
115        compactor: Arc<dyn Compactor>,
116        threshold_chars: usize,
117    ) -> RunnableCompacting<Self> {
118        RunnableCompacting {
119            inner: self,
120            compactor,
121            threshold_chars,
122        }
123    }
124}
125
126impl<R> MessageRunnableCompactionExt for R where R: Runnable<Vec<Message>, Message> + Sized {}
127
128/// Default system prompt the [`SummaryCompactor`] sends to its
129/// summariser model when the operator does not override. Phrased as a
130/// neutral compress-the-prior-conversation instruction so it works
131/// across vendors that route system prompts identically.
132pub const DEFAULT_SUMMARY_SYSTEM_PROMPT: &str = "You are a conversation summariser. Distil the conversation below into 100-200 words preserving key facts, decisions, entities, and tool outcomes. Output ONLY the summary text — no preamble, no commentary.";
133
134/// Default count of newest turns the [`SummaryCompactor`] keeps verbatim
135/// before summarising the older history into one synthetic turn. Four
136/// matches the typical LLM-agent rhythm (most recent user/assistant
137/// pair plus one preceding pair) — small enough that summarisation
138/// kicks in early, large enough that adjacent context survives.
139pub const DEFAULT_SUMMARY_KEEP_RECENT_TURNS: usize = 4;
140
141/// LLM-summary [`Compactor`] — drops the oldest turns past
142/// `keep_recent_turns` into a single summarised `Turn::User`, leaving
143/// the most recent turns verbatim.
144///
145/// Pair invariant: dropped turns carry their `ToolPair`s away with
146/// them — the retained set keeps every `Turn::Assistant`'s `tools`
147/// vector intact, so the wire-side codec never sees an unmatched
148/// tool block.
149///
150/// Construct with [`SummaryCompactor::new`] then chain
151/// [`SummaryCompactor::with_system_prompt`] /
152/// [`SummaryCompactor::with_keep_recent_turns`] for tuning. The
153/// summariser model is any `Runnable<Vec<Message>, Message>` — the
154/// operator's `ChatModel`, a layered model, or a stub for tests.
155pub struct SummaryCompactor<M> {
156    model: Arc<M>,
157    system_prompt: String,
158    keep_recent_turns: usize,
159}
160
161impl<M> SummaryCompactor<M> {
162    /// Construct with the default system prompt and keep-recent count.
163    #[must_use]
164    pub fn new(model: Arc<M>) -> Self {
165        Self {
166            model,
167            system_prompt: DEFAULT_SUMMARY_SYSTEM_PROMPT.to_owned(),
168            keep_recent_turns: DEFAULT_SUMMARY_KEEP_RECENT_TURNS,
169        }
170    }
171
172    /// Override the system prompt. Operators with a custom voice or
173    /// downstream-format requirement (e.g. JSON envelope) point the
174    /// summariser via this knob.
175    #[must_use]
176    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
177        self.system_prompt = prompt.into();
178        self
179    }
180
181    /// Override how many newest turns are retained verbatim. Higher
182    /// values preserve more recent context at the cost of leaving more
183    /// budget pressure for the summariser to manage.
184    #[must_use]
185    pub const fn with_keep_recent_turns(mut self, n: usize) -> Self {
186        self.keep_recent_turns = n;
187        self
188    }
189}
190
191#[async_trait]
192impl<M> Compactor for SummaryCompactor<M>
193where
194    M: Runnable<Vec<Message>, Message> + Send + Sync + 'static,
195{
196    async fn compact(
197        &self,
198        events: &[GraphEvent],
199        _budget_chars: usize,
200        ctx: &ExecutionContext,
201    ) -> Result<CompactedHistory> {
202        let grouped = CompactedHistory::group(events)?;
203        let total = grouped.len();
204        if total <= self.keep_recent_turns {
205            return Ok(grouped);
206        }
207        let split_at = total - self.keep_recent_turns;
208        let mut all = grouped.turns().to_vec();
209        let recent = all.split_off(split_at);
210        let older = all;
211        if older.is_empty() {
212            return Ok(CompactedHistory::from_turns(recent));
213        }
214        let older_messages = CompactedHistory::from_turns(older).to_messages();
215        let mut prompt = Vec::with_capacity(older_messages.len() + 1);
216        prompt.push(Message::new(
217            Role::System,
218            vec![ContentPart::text(self.system_prompt.clone())],
219        ));
220        prompt.extend(older_messages);
221        let summary_msg = self.model.invoke(prompt, ctx).await?;
222        let summary_text = extract_text(&summary_msg.content);
223        let summary_turn = Turn::User {
224            content: vec![ContentPart::text(format!(
225                "[Summary of earlier conversation]\n{summary_text}"
226            ))],
227        };
228        let mut combined = Vec::with_capacity(1 + recent.len());
229        combined.push(summary_turn);
230        combined.extend(recent);
231        Ok(CompactedHistory::from_turns(combined))
232    }
233}
234
235fn extract_text(parts: &[ContentPart]) -> String {
236    let mut out = String::new();
237    for part in parts {
238        if let ContentPart::Text { text, .. } = part {
239            if !out.is_empty() {
240                out.push('\n');
241            }
242            out.push_str(text);
243        }
244    }
245    out
246}
247
248#[cfg(test)]
249#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
250mod tests {
251    use std::sync::atomic::{AtomicUsize, Ordering};
252
253    use entelix_core::ir::{ContentPart, Message, Role};
254    use entelix_session::HeadDropCompactor;
255    use parking_lot::Mutex;
256
257    use super::*;
258
259    struct EchoModel {
260        invocations: AtomicUsize,
261        last_input_len: AtomicUsize,
262    }
263
264    impl EchoModel {
265        fn new() -> Self {
266            Self {
267                invocations: AtomicUsize::new(0),
268                last_input_len: AtomicUsize::new(0),
269            }
270        }
271    }
272
273    #[async_trait]
274    impl Runnable<Vec<Message>, Message> for EchoModel {
275        async fn invoke(&self, input: Vec<Message>, _ctx: &ExecutionContext) -> Result<Message> {
276            self.invocations.fetch_add(1, Ordering::SeqCst);
277            self.last_input_len.store(input.len(), Ordering::SeqCst);
278            Ok(Message::new(Role::Assistant, vec![ContentPart::text("ok")]))
279        }
280    }
281
282    fn user(text: &str) -> Message {
283        Message::new(Role::User, vec![ContentPart::text(text)])
284    }
285
286    fn assistant(text: &str) -> Message {
287        Message::new(Role::Assistant, vec![ContentPart::text(text)])
288    }
289
290    #[tokio::test]
291    async fn passes_through_below_threshold() {
292        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
293        let wrapped = EchoModel::new().with_compaction(compactor, 1024);
294
295        let input = vec![user("short"), assistant("ok")];
296        let _ = wrapped
297            .invoke(input.clone(), &ExecutionContext::new())
298            .await
299            .unwrap();
300        assert_eq!(
301            wrapped.inner().last_input_len.load(Ordering::SeqCst),
302            input.len()
303        );
304    }
305
306    #[tokio::test]
307    async fn compacts_when_threshold_exceeded() {
308        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
309        let model = EchoModel::new();
310        // Threshold = 30 characters. Three round-trips well above that
311        // trigger compaction; the head-drop strategy retains the
312        // newest turns that fit under the budget.
313        let wrapped = model.with_compaction(compactor, 30);
314
315        let input = vec![
316            user("one one one one"),
317            assistant("first reply long enough"),
318            user("two two two two"),
319            assistant("second reply long enough"),
320            user("three three three three"),
321            assistant("third reply"),
322        ];
323        let _ = wrapped
324            .invoke(input.clone(), &ExecutionContext::new())
325            .await
326            .unwrap();
327        let observed_len = wrapped.inner().last_input_len.load(Ordering::SeqCst);
328        assert!(
329            observed_len < input.len(),
330            "compaction must trim — got {observed_len}, input had {}",
331            input.len()
332        );
333    }
334
335    /// `AuditSink` test impl that records every `record_*` call so
336    /// the compaction-emit assertion can verify the audit channel is
337    /// actually crossed (invariant 18).
338    struct CapturingAuditSink {
339        compactions: Mutex<Vec<(usize, usize)>>,
340    }
341
342    impl CapturingAuditSink {
343        fn new() -> Self {
344            Self {
345                compactions: Mutex::new(Vec::new()),
346            }
347        }
348    }
349
350    impl entelix_core::AuditSink for CapturingAuditSink {
351        fn record_sub_agent_invoked(&self, _agent_id: &str, _sub_thread_id: &str) {}
352        fn record_agent_handoff(&self, _from: Option<&str>, _to: &str) {}
353        fn record_resumed(&self, _from_checkpoint: &str) {}
354        fn record_memory_recall(&self, _tier: &str, _namespace_key: &str, _hits: usize) {}
355        fn record_usage_limit_exceeded(&self, _breach: &entelix_core::UsageLimitBreach) {}
356        fn record_context_compacted(&self, dropped_chars: usize, retained_chars: usize) {
357            self.compactions
358                .lock()
359                .push((dropped_chars, retained_chars));
360        }
361    }
362
363    #[tokio::test]
364    async fn compaction_records_audit_event_when_threshold_exceeded() {
365        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
366        let model = EchoModel::new();
367        let wrapped = model.with_compaction(compactor, 30);
368        let sink = Arc::new(CapturingAuditSink::new());
369        let ctx = ExecutionContext::new()
370            .with_audit_sink(entelix_core::AuditSinkHandle::new(sink.clone()));
371
372        let input = vec![
373            user("padding to force compaction one one one one"),
374            assistant("more padding to force compaction"),
375            user("trailing turn"),
376            assistant("ok"),
377        ];
378        let _ = wrapped.invoke(input, &ctx).await.unwrap();
379
380        let captured = sink.compactions.lock().clone();
381        assert_eq!(captured.len(), 1, "exactly one compaction event expected");
382        let (dropped, _retained) = captured[0];
383        assert!(dropped > 0, "must report some dropped characters");
384    }
385
386    #[tokio::test]
387    async fn compaction_records_no_audit_event_below_threshold() {
388        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
389        let model = EchoModel::new();
390        let wrapped = model.with_compaction(compactor, 1024);
391        let sink = Arc::new(CapturingAuditSink::new());
392        let ctx = ExecutionContext::new()
393            .with_audit_sink(entelix_core::AuditSinkHandle::new(sink.clone()));
394
395        let input = vec![user("short"), assistant("ok")];
396        let _ = wrapped.invoke(input, &ctx).await.unwrap();
397
398        assert!(
399            sink.compactions.lock().is_empty(),
400            "no audit event expected when threshold is not crossed"
401        );
402    }
403
404    #[tokio::test]
405    async fn empty_messages_pass_through() {
406        let compactor: Arc<dyn Compactor> = Arc::new(HeadDropCompactor);
407        let model = EchoModel::new();
408        let wrapped = model.with_compaction(compactor, 1024);
409        let _ = wrapped
410            .invoke(Vec::new(), &ExecutionContext::new())
411            .await
412            .unwrap();
413        assert_eq!(wrapped.inner().last_input_len.load(Ordering::SeqCst), 0);
414    }
415
416    /// Stub summariser model that records the prompt it received and
417    /// always replies with a fixed summary text. Lets the
418    /// `SummaryCompactor` tests assert exactly which turns were sent
419    /// to the summariser.
420    struct StubSummariser {
421        captured_prompt: Mutex<Vec<Message>>,
422        reply: String,
423    }
424
425    impl StubSummariser {
426        fn new(reply: impl Into<String>) -> Self {
427            Self {
428                captured_prompt: Mutex::new(Vec::new()),
429                reply: reply.into(),
430            }
431        }
432    }
433
434    #[async_trait]
435    impl Runnable<Vec<Message>, Message> for StubSummariser {
436        async fn invoke(&self, input: Vec<Message>, _ctx: &ExecutionContext) -> Result<Message> {
437            *self.captured_prompt.lock() = input;
438            Ok(Message::new(
439                Role::Assistant,
440                vec![ContentPart::text(self.reply.clone())],
441            ))
442        }
443    }
444
445    fn user_event(text: &str) -> entelix_session::GraphEvent {
446        entelix_session::GraphEvent::UserMessage {
447            content: vec![ContentPart::text(text)],
448            timestamp: chrono::Utc::now(),
449        }
450    }
451
452    fn assistant_event(text: &str) -> entelix_session::GraphEvent {
453        entelix_session::GraphEvent::AssistantMessage {
454            content: vec![ContentPart::text(text)],
455            usage: None,
456            timestamp: chrono::Utc::now(),
457        }
458    }
459
460    #[tokio::test]
461    async fn summary_compactor_skips_when_under_keep_recent_threshold() {
462        let summariser = Arc::new(StubSummariser::new("never invoked"));
463        let compactor = SummaryCompactor::new(summariser.clone()).with_keep_recent_turns(8);
464        let events = vec![
465            user_event("u1"),
466            assistant_event("a1"),
467            user_event("u2"),
468            assistant_event("a2"),
469        ];
470        let history = compactor
471            .compact(&events, 0, &ExecutionContext::new())
472            .await
473            .unwrap();
474        assert_eq!(history.len(), 4);
475        assert!(
476            summariser.captured_prompt.lock().is_empty(),
477            "summariser must NOT be invoked when total <= keep_recent_turns"
478        );
479    }
480
481    #[tokio::test]
482    async fn summary_compactor_replaces_older_turns_with_summary() {
483        let summariser = Arc::new(StubSummariser::new("brief recap"));
484        let compactor = SummaryCompactor::new(summariser.clone()).with_keep_recent_turns(2);
485        let events = vec![
486            user_event("oldest user"),
487            assistant_event("oldest assistant"),
488            user_event("middle user"),
489            assistant_event("middle assistant"),
490            user_event("newest user"),
491            assistant_event("newest assistant"),
492        ];
493        let history = compactor
494            .compact(&events, 0, &ExecutionContext::new())
495            .await
496            .unwrap();
497        // Summary turn (1) + retained newest turns (2) = 3
498        assert_eq!(history.len(), 3);
499        // Head is the synthetic User summary.
500        if let Turn::User { content } = &history.turns()[0] {
501            if let ContentPart::Text { text, .. } = &content[0] {
502                assert!(text.contains("Summary"), "summary marker missing: {text}");
503                assert!(
504                    text.contains("brief recap"),
505                    "summariser reply missing: {text}"
506                );
507            }
508        } else {
509            panic!("expected User turn at head");
510        }
511        // Summariser was invoked with system + 4 older turns rendered as messages.
512        let captured_len;
513        let captured_role;
514        {
515            let captured = summariser.captured_prompt.lock();
516            captured_len = captured.len();
517            captured_role = captured[0].role;
518        }
519        assert!(
520            captured_len >= 5,
521            "expected system + ≥4 older messages, got {captured_len}"
522        );
523        assert!(matches!(captured_role, Role::System));
524    }
525
526    #[tokio::test]
527    async fn summary_compactor_with_system_prompt_overrides_default() {
528        let summariser = Arc::new(StubSummariser::new("ok"));
529        let compactor = SummaryCompactor::new(summariser.clone())
530            .with_keep_recent_turns(0)
531            .with_system_prompt("CUSTOM PROMPT MARKER");
532        let events = vec![user_event("hi"), assistant_event("hello")];
533        let _ = compactor
534            .compact(&events, 0, &ExecutionContext::new())
535            .await
536            .unwrap();
537        let prompt_text = {
538            let captured = summariser.captured_prompt.lock();
539            if let ContentPart::Text { text, .. } = &captured[0].content[0] {
540                text.clone()
541            } else {
542                panic!("expected Text part at system position");
543            }
544        };
545        assert!(
546            prompt_text.contains("CUSTOM PROMPT MARKER"),
547            "operator-supplied prompt must reach the summariser, got: {prompt_text}"
548        );
549    }
550}