entelix_memory/
consolidating.rs1use std::sync::Arc;
33
34use async_trait::async_trait;
35use entelix_core::ir::Message;
36use entelix_core::{ExecutionContext, Result};
37
38use crate::buffer::BufferMemory;
39use crate::summary::SummaryMemory;
40
41#[async_trait]
47pub trait Summarizer: Send + Sync + 'static {
48 async fn summarize(&self, messages: Vec<Message>, ctx: &ExecutionContext) -> Result<String>;
52}
53
54pub struct ConsolidatingBufferMemory {
68 buffer: Arc<BufferMemory>,
69 summary: Arc<SummaryMemory>,
70 summarizer: Arc<dyn Summarizer>,
71}
72
73impl ConsolidatingBufferMemory {
74 pub fn new(
81 buffer: Arc<BufferMemory>,
82 summary: Arc<SummaryMemory>,
83 summarizer: Arc<dyn Summarizer>,
84 ) -> Self {
85 Self {
86 buffer,
87 summary,
88 summarizer,
89 }
90 }
91
92 pub const fn buffer(&self) -> &Arc<BufferMemory> {
95 &self.buffer
96 }
97
98 pub const fn summary(&self) -> &Arc<SummaryMemory> {
100 &self.summary
101 }
102
103 pub async fn append(&self, ctx: &ExecutionContext, message: Message) -> Result<()> {
108 self.buffer.append(ctx, message).await?;
109 if !self.buffer.should_consolidate(ctx).await? {
110 return Ok(());
111 }
112 let messages = self.buffer.messages(ctx).await?;
113 let summary_text = self.summarizer.summarize(messages, ctx).await?;
117 self.summary.append(ctx, &summary_text).await?;
118 self.buffer.clear(ctx).await?;
119 self.buffer.mark_consolidated_now();
120 Ok(())
121 }
122
123 pub async fn messages(&self, ctx: &ExecutionContext) -> Result<Vec<Message>> {
125 self.buffer.messages(ctx).await
126 }
127
128 pub async fn current_summary(&self, ctx: &ExecutionContext) -> Result<Option<String>> {
130 self.summary.get(ctx).await
131 }
132
133 pub async fn clear(&self, ctx: &ExecutionContext) -> Result<()> {
135 self.buffer.clear(ctx).await?;
136 self.summary.clear(ctx).await
137 }
138}
139
140#[cfg(test)]
141#[allow(clippy::unwrap_used)]
142mod tests {
143 use super::*;
144 use crate::consolidation::{ConsolidationPolicy, OnMessageCount};
145 use crate::namespace::Namespace;
146 use crate::store::InMemoryStore;
147 use entelix_core::TenantId;
148 use std::sync::atomic::{AtomicUsize, Ordering};
149
150 struct StubSummarizer {
154 calls: Arc<AtomicUsize>,
155 reply: Result<String>,
156 }
157
158 impl StubSummarizer {
159 fn ok(reply: &str) -> (Self, Arc<AtomicUsize>) {
160 let calls = Arc::new(AtomicUsize::new(0));
161 (
162 Self {
163 calls: calls.clone(),
164 reply: Ok(reply.to_owned()),
165 },
166 calls,
167 )
168 }
169
170 fn err(msg: &str) -> Self {
171 Self {
172 calls: Arc::new(AtomicUsize::new(0)),
173 reply: Err(entelix_core::Error::config(msg.to_owned())),
174 }
175 }
176 }
177
178 #[async_trait]
179 impl Summarizer for StubSummarizer {
180 async fn summarize(
181 &self,
182 _messages: Vec<Message>,
183 _ctx: &ExecutionContext,
184 ) -> Result<String> {
185 self.calls.fetch_add(1, Ordering::SeqCst);
186 match &self.reply {
187 Ok(s) => Ok(s.clone()),
188 Err(e) => Err(clone_error(e)),
189 }
190 }
191 }
192
193 fn clone_error(e: &entelix_core::Error) -> entelix_core::Error {
194 match e {
197 entelix_core::Error::Config(c) => entelix_core::Error::config(c.to_string()),
198 other => entelix_core::Error::config(format!("{other}")),
199 }
200 }
201
202 fn make_buffer(max_turns: usize, policy: Arc<dyn ConsolidationPolicy>) -> Arc<BufferMemory> {
203 Arc::new(
204 BufferMemory::new(
205 Arc::new(InMemoryStore::<Vec<Message>>::new()),
206 Namespace::new(TenantId::new("t")).with_scope("conv"),
207 max_turns,
208 )
209 .with_consolidation_policy(policy),
210 )
211 }
212
213 fn make_summary() -> Arc<SummaryMemory> {
214 Arc::new(SummaryMemory::new(
215 Arc::new(InMemoryStore::<String>::new()),
216 Namespace::new(TenantId::new("t")).with_scope("conv"),
217 ))
218 }
219
220 #[tokio::test]
221 async fn append_does_not_consolidate_below_threshold() {
222 let buf = make_buffer(10, Arc::new(OnMessageCount::new(5)));
223 let sum = make_summary();
224 let (summariser, calls) = StubSummarizer::ok("summary");
225 let mem = ConsolidatingBufferMemory::new(buf, sum.clone(), Arc::new(summariser));
226 let ctx = ExecutionContext::new();
227 for i in 0..3 {
228 mem.append(&ctx, Message::user(format!("m{i}")))
229 .await
230 .unwrap();
231 }
232 assert_eq!(calls.load(Ordering::SeqCst), 0);
233 assert_eq!(mem.messages(&ctx).await.unwrap().len(), 3);
234 assert!(mem.current_summary(&ctx).await.unwrap().is_none());
235 }
236
237 #[tokio::test]
238 async fn append_consolidates_when_threshold_reached() {
239 let buf = make_buffer(10, Arc::new(OnMessageCount::new(3)));
240 let sum = make_summary();
241 let (summariser, calls) = StubSummarizer::ok("compressed");
242 let mem = ConsolidatingBufferMemory::new(
243 Arc::clone(&buf),
244 Arc::clone(&sum),
245 Arc::new(summariser),
246 );
247 let ctx = ExecutionContext::new();
248 for i in 0..3 {
249 mem.append(&ctx, Message::user(format!("m{i}")))
250 .await
251 .unwrap();
252 }
253 assert_eq!(calls.load(Ordering::SeqCst), 1);
255 assert_eq!(mem.messages(&ctx).await.unwrap().len(), 0);
257 let summary = mem.current_summary(&ctx).await.unwrap().unwrap();
258 assert_eq!(summary, "compressed");
259 assert!(buf.last_consolidated_at().is_some());
260 }
261
262 #[tokio::test]
263 async fn summariser_failure_preserves_buffer() {
264 let buf = make_buffer(10, Arc::new(OnMessageCount::new(2)));
265 let sum = make_summary();
266 let summariser = StubSummarizer::err("summariser down");
267 let mem = ConsolidatingBufferMemory::new(
268 Arc::clone(&buf),
269 Arc::clone(&sum),
270 Arc::new(summariser),
271 );
272 let ctx = ExecutionContext::new();
273 mem.append(&ctx, Message::user("a")).await.unwrap();
274 let err = mem.append(&ctx, Message::user("b")).await.unwrap_err();
275 assert!(matches!(err, entelix_core::Error::Config(_)));
276 assert_eq!(mem.messages(&ctx).await.unwrap().len(), 2);
278 assert!(mem.current_summary(&ctx).await.unwrap().is_none());
280 assert!(buf.last_consolidated_at().is_none());
282 }
283}