Skip to main content

entelix_memory/
consolidating.rs

1//! `ConsolidatingBufferMemory` — opinionated layered memory that
2//! ties a `BufferMemory`, a `SummaryMemory`, and a [`Summarizer`]
3//! into a single `append`-driven loop.
4//!
5//! Without this adapter every agent recipe rewrites the same
6//! sequence: ask the buffer's policy, run an LLM summarisation pass,
7//! append the result to the summary, clear the buffer, mark
8//! consolidated. The wrapper bakes that loop into one type so the
9//! recipe code reduces to `mem.append(ctx, msg).await?`.
10//!
11//! ## Provider-agnostic
12//!
13//! [`Summarizer`] is a trait over `(messages, ctx) -> String` —
14//! independent of which LLM provider runs the summarisation. The
15//! `entelix-agents` crate provides `RunnableToSummarizerAdapter`, which
16//! adapts any `Runnable<Vec<Message>, Message>` (Anthropic codec,
17//! `OpenAI` codec, a stubbed test runnable, …) into this trait by
18//! prepending a configurable system instruction and extracting the
19//! response's text content. Memory itself stays decoupled from the
20//! `Runnable` abstraction so backends can plug in non-LLM
21//! summarisers (heuristic compression, cached templates) without
22//! pulling in the runnable dependency.
23//!
24//! ## Failure semantics
25//!
26//! When the summariser fails, the underlying buffer is **not**
27//! cleared and `last_consolidated_at` is **not** updated — the next
28//! call to `append` will re-attempt consolidation. This keeps a
29//! transient summariser outage from silently dropping conversation
30//! history.
31
32use std::sync::Arc;
33
34use async_trait::async_trait;
35use entelix_core::ir::Message;
36use entelix_core::{ExecutionContext, Result};
37
38use crate::buffer::BufferMemory;
39use crate::summary::SummaryMemory;
40
41/// Reduces a buffer of conversation messages to a summary string.
42///
43/// Implementations decide *how* to summarise — typically by calling
44/// an LLM. The trait stays provider-agnostic; concrete impls (such
45/// as `entelix_agents::RunnableToSummarizerAdapter`) wire in the model.
46#[async_trait]
47pub trait Summarizer: Send + Sync + 'static {
48    /// Summarise `messages` into a single string. Returning `Err`
49    /// signals a transient failure — the consolidating buffer
50    /// keeps the original messages and re-attempts next call.
51    async fn summarize(&self, messages: Vec<Message>, ctx: &ExecutionContext) -> Result<String>;
52}
53
54/// Layered memory: a [`BufferMemory`] for recent turns, a
55/// [`SummaryMemory`] for the running summary, and a [`Summarizer`]
56/// that bridges the two when the buffer's policy fires.
57///
58/// `append` drives the full loop:
59///
60/// ```ignore
61/// let mem = ConsolidatingBufferMemory::new(buffer, summary, summariser);
62/// mem.append(&ctx, Message::user("hi")).await?;
63/// // — buffer now has the new message; if the policy fires,
64/// //   the previous buffer has already been summarised into
65/// //   `summary` and the buffer cleared.
66/// ```
67pub struct ConsolidatingBufferMemory {
68    buffer: Arc<BufferMemory>,
69    summary: Arc<SummaryMemory>,
70    summarizer: Arc<dyn Summarizer>,
71}
72
73impl ConsolidatingBufferMemory {
74    /// Build a layered memory from an existing buffer, summary, and
75    /// summariser. The buffer must already have a
76    /// [`crate::ConsolidationPolicy`] attached via
77    /// [`BufferMemory::with_consolidation_policy`] — without one the
78    /// adapter never consolidates and behaves as a thin
79    /// `BufferMemory` proxy.
80    pub fn new(
81        buffer: Arc<BufferMemory>,
82        summary: Arc<SummaryMemory>,
83        summarizer: Arc<dyn Summarizer>,
84    ) -> Self {
85        Self {
86            buffer,
87            summary,
88            summarizer,
89        }
90    }
91
92    /// Borrow the underlying buffer (for direct queries that bypass
93    /// the consolidation loop, such as size accounting).
94    pub const fn buffer(&self) -> &Arc<BufferMemory> {
95        &self.buffer
96    }
97
98    /// Borrow the underlying summary memory.
99    pub const fn summary(&self) -> &Arc<SummaryMemory> {
100        &self.summary
101    }
102
103    /// Append `message` to the buffer, then check the bound
104    /// consolidation policy. When it fires, summarise the buffered
105    /// messages, append the summary to [`SummaryMemory`], clear the
106    /// buffer, and mark the buffer's `last_consolidated_at`.
107    pub async fn append(&self, ctx: &ExecutionContext, message: Message) -> Result<()> {
108        self.buffer.append(ctx, message).await?;
109        if !self.buffer.should_consolidate(ctx).await? {
110            return Ok(());
111        }
112        let messages = self.buffer.messages(ctx).await?;
113        // Summarise BEFORE mutating either store. If summarise
114        // fails, we surface the error and leave the buffer intact;
115        // the caller can retry on the next append.
116        let summary_text = self.summarizer.summarize(messages, ctx).await?;
117        self.summary.append(ctx, &summary_text).await?;
118        self.buffer.clear(ctx).await?;
119        self.buffer.mark_consolidated_now();
120        Ok(())
121    }
122
123    /// Fetch the current buffered messages.
124    pub async fn messages(&self, ctx: &ExecutionContext) -> Result<Vec<Message>> {
125        self.buffer.messages(ctx).await
126    }
127
128    /// Fetch the current running summary.
129    pub async fn current_summary(&self, ctx: &ExecutionContext) -> Result<Option<String>> {
130        self.summary.get(ctx).await
131    }
132
133    /// Reset both layers — buffer and summary.
134    pub async fn clear(&self, ctx: &ExecutionContext) -> Result<()> {
135        self.buffer.clear(ctx).await?;
136        self.summary.clear(ctx).await
137    }
138}
139
140#[cfg(test)]
141#[allow(clippy::unwrap_used)]
142mod tests {
143    use super::*;
144    use crate::consolidation::{ConsolidationPolicy, OnMessageCount};
145    use crate::namespace::Namespace;
146    use crate::store::InMemoryStore;
147    use entelix_core::TenantId;
148    use std::sync::atomic::{AtomicUsize, Ordering};
149
150    /// Stub summariser that records call count and returns a fixed
151    /// reply (or a fixed error) — keeps the memory crate's tests
152    /// independent of any LLM runnable.
153    struct StubSummarizer {
154        calls: Arc<AtomicUsize>,
155        reply: Result<String>,
156    }
157
158    impl StubSummarizer {
159        fn ok(reply: &str) -> (Self, Arc<AtomicUsize>) {
160            let calls = Arc::new(AtomicUsize::new(0));
161            (
162                Self {
163                    calls: calls.clone(),
164                    reply: Ok(reply.to_owned()),
165                },
166                calls,
167            )
168        }
169
170        fn err(msg: &str) -> Self {
171            Self {
172                calls: Arc::new(AtomicUsize::new(0)),
173                reply: Err(entelix_core::Error::config(msg.to_owned())),
174            }
175        }
176    }
177
178    #[async_trait]
179    impl Summarizer for StubSummarizer {
180        async fn summarize(
181            &self,
182            _messages: Vec<Message>,
183            _ctx: &ExecutionContext,
184        ) -> Result<String> {
185            self.calls.fetch_add(1, Ordering::SeqCst);
186            match &self.reply {
187                Ok(s) => Ok(s.clone()),
188                Err(e) => Err(clone_error(e)),
189            }
190        }
191    }
192
193    fn clone_error(e: &entelix_core::Error) -> entelix_core::Error {
194        // Tests only ever clone Config errors; other variants would
195        // require a richer cloning strategy than this minimal stub.
196        match e {
197            entelix_core::Error::Config(c) => entelix_core::Error::config(c.to_string()),
198            other => entelix_core::Error::config(format!("{other}")),
199        }
200    }
201
202    fn make_buffer(max_turns: usize, policy: Arc<dyn ConsolidationPolicy>) -> Arc<BufferMemory> {
203        Arc::new(
204            BufferMemory::new(
205                Arc::new(InMemoryStore::<Vec<Message>>::new()),
206                Namespace::new(TenantId::new("t")).with_scope("conv"),
207                max_turns,
208            )
209            .with_consolidation_policy(policy),
210        )
211    }
212
213    fn make_summary() -> Arc<SummaryMemory> {
214        Arc::new(SummaryMemory::new(
215            Arc::new(InMemoryStore::<String>::new()),
216            Namespace::new(TenantId::new("t")).with_scope("conv"),
217        ))
218    }
219
220    #[tokio::test]
221    async fn append_does_not_consolidate_below_threshold() {
222        let buf = make_buffer(10, Arc::new(OnMessageCount::new(5)));
223        let sum = make_summary();
224        let (summariser, calls) = StubSummarizer::ok("summary");
225        let mem = ConsolidatingBufferMemory::new(buf, sum.clone(), Arc::new(summariser));
226        let ctx = ExecutionContext::new();
227        for i in 0..3 {
228            mem.append(&ctx, Message::user(format!("m{i}")))
229                .await
230                .unwrap();
231        }
232        assert_eq!(calls.load(Ordering::SeqCst), 0);
233        assert_eq!(mem.messages(&ctx).await.unwrap().len(), 3);
234        assert!(mem.current_summary(&ctx).await.unwrap().is_none());
235    }
236
237    #[tokio::test]
238    async fn append_consolidates_when_threshold_reached() {
239        let buf = make_buffer(10, Arc::new(OnMessageCount::new(3)));
240        let sum = make_summary();
241        let (summariser, calls) = StubSummarizer::ok("compressed");
242        let mem = ConsolidatingBufferMemory::new(
243            Arc::clone(&buf),
244            Arc::clone(&sum),
245            Arc::new(summariser),
246        );
247        let ctx = ExecutionContext::new();
248        for i in 0..3 {
249            mem.append(&ctx, Message::user(format!("m{i}")))
250                .await
251                .unwrap();
252        }
253        // Threshold of 3: third append triggers consolidation.
254        assert_eq!(calls.load(Ordering::SeqCst), 1);
255        // Buffer is cleared, summary now holds the summarisation.
256        assert_eq!(mem.messages(&ctx).await.unwrap().len(), 0);
257        let summary = mem.current_summary(&ctx).await.unwrap().unwrap();
258        assert_eq!(summary, "compressed");
259        assert!(buf.last_consolidated_at().is_some());
260    }
261
262    #[tokio::test]
263    async fn summariser_failure_preserves_buffer() {
264        let buf = make_buffer(10, Arc::new(OnMessageCount::new(2)));
265        let sum = make_summary();
266        let summariser = StubSummarizer::err("summariser down");
267        let mem = ConsolidatingBufferMemory::new(
268            Arc::clone(&buf),
269            Arc::clone(&sum),
270            Arc::new(summariser),
271        );
272        let ctx = ExecutionContext::new();
273        mem.append(&ctx, Message::user("a")).await.unwrap();
274        let err = mem.append(&ctx, Message::user("b")).await.unwrap_err();
275        assert!(matches!(err, entelix_core::Error::Config(_)));
276        // Buffer NOT cleared — caller can retry next turn.
277        assert_eq!(mem.messages(&ctx).await.unwrap().len(), 2);
278        // Summary NOT touched — no partial state.
279        assert!(mem.current_summary(&ctx).await.unwrap().is_none());
280        // last_consolidated_at NOT advanced.
281        assert!(buf.last_consolidated_at().is_none());
282    }
283}