entelix_memory/buffer.rs
1//! `BufferMemory` — keep the last *N* turns of a conversation in a
2//! `Store<Vec<Message>>`. Simplest of the LangChain-style memory
3//! patterns.
4
5use std::sync::Arc;
6
7use chrono::{DateTime, Utc};
8use entelix_core::ir::Message;
9use entelix_core::{ExecutionContext, Result};
10use parking_lot::Mutex;
11
12use crate::consolidation::{ConsolidationContext, ConsolidationPolicy};
13use crate::namespace::Namespace;
14use crate::store::Store;
15
16const DEFAULT_KEY: &str = "buffer";
17
18/// Bounded conversation buffer.
19///
20/// Each call to `append` pushes one message and (if the buffer exceeds
21/// `max_turns`) drops the oldest. `messages()` returns the full retained
22/// list.
23///
24/// An optional [`ConsolidationPolicy`] can be attached via
25/// [`Self::with_consolidation_policy`]. When attached,
26/// [`Self::should_consolidate`] surfaces the policy's decision so the
27/// caller (typically an agent recipe) can run an LLM-driven
28/// summarisation pass and write the result into a sibling
29/// [`crate::SummaryMemory`] before clearing the buffer. The buffer
30/// tracks `last_consolidated_at` itself — recipes call
31/// [`Self::mark_consolidated`] after a successful summarisation pass
32/// and the time-aware policies see the updated timestamp on the next
33/// check, so the consolidation loop reduces to:
34///
35/// ```ignore
36/// if buffer.should_consolidate(&ctx).await? {
37/// run_summariser(&ctx, &buffer, &summary).await?;
38/// buffer.clear(&ctx).await?;
39/// buffer.mark_consolidated_now();
40/// }
41/// ```
42pub struct BufferMemory {
43 store: Arc<dyn Store<Vec<Message>>>,
44 namespace: Namespace,
45 max_turns: usize,
46 consolidation: Option<Arc<dyn ConsolidationPolicy>>,
47 last_consolidated_at: Mutex<Option<DateTime<Utc>>>,
48}
49
50impl BufferMemory {
51 /// Build a buffer over `store` scoped to `namespace`, retaining at
52 /// most `max_turns` messages.
53 pub fn new(
54 store: Arc<dyn Store<Vec<Message>>>,
55 namespace: Namespace,
56 max_turns: usize,
57 ) -> Self {
58 Self {
59 store,
60 namespace,
61 max_turns,
62 consolidation: None,
63 last_consolidated_at: Mutex::new(None),
64 }
65 }
66
67 /// Attach a [`ConsolidationPolicy`]. The buffer itself never
68 /// performs the summarisation — the policy only *decides* when
69 /// the caller should — but having the policy bound here means
70 /// agent recipes can ask the buffer directly via
71 /// [`Self::should_consolidate`] without threading the policy
72 /// through every call site.
73 #[must_use]
74 pub fn with_consolidation_policy(mut self, policy: Arc<dyn ConsolidationPolicy>) -> Self {
75 self.consolidation = Some(policy);
76 self
77 }
78
79 /// Effective retention cap.
80 pub const fn max_turns(&self) -> usize {
81 self.max_turns
82 }
83
84 /// Borrow the bound namespace.
85 pub const fn namespace(&self) -> &Namespace {
86 &self.namespace
87 }
88
89 /// Wall-clock time of the most recent successful consolidation,
90 /// as recorded via [`Self::mark_consolidated`]. Returns `None`
91 /// before the first consolidation has been marked.
92 pub fn last_consolidated_at(&self) -> Option<DateTime<Utc>> {
93 *self.last_consolidated_at.lock()
94 }
95
96 /// Record that a consolidation pass completed at `at`. Recipes
97 /// call this after a successful summarise-and-clear cycle so
98 /// time-aware policies (throttling, "at most once per hour")
99 /// observe the new floor on the next check.
100 pub fn mark_consolidated(&self, at: DateTime<Utc>) {
101 *self.last_consolidated_at.lock() = Some(at);
102 }
103
104 /// Convenience over [`Self::mark_consolidated`] using
105 /// [`chrono::Utc::now`]. Use when the caller doesn't already
106 /// have a timestamp in hand.
107 pub fn mark_consolidated_now(&self) {
108 self.mark_consolidated(Utc::now());
109 }
110
111 /// Append `message`, dropping the oldest entries when over capacity.
112 pub async fn append(&self, ctx: &ExecutionContext, message: Message) -> Result<()> {
113 let mut messages = self
114 .store
115 .get(ctx, &self.namespace, DEFAULT_KEY)
116 .await?
117 .unwrap_or_default();
118 messages.push(message);
119 // Drop oldest while over budget.
120 while messages.len() > self.max_turns {
121 messages.remove(0);
122 }
123 self.store
124 .put(ctx, &self.namespace, DEFAULT_KEY, messages)
125 .await
126 }
127
128 /// Read the retained messages oldest-first.
129 pub async fn messages(&self, ctx: &ExecutionContext) -> Result<Vec<Message>> {
130 Ok(self
131 .store
132 .get(ctx, &self.namespace, DEFAULT_KEY)
133 .await?
134 .unwrap_or_default())
135 }
136
137 /// Clear the buffer.
138 pub async fn clear(&self, ctx: &ExecutionContext) -> Result<()> {
139 self.store.delete(ctx, &self.namespace, DEFAULT_KEY).await
140 }
141
142 /// Consult the bound [`ConsolidationPolicy`] (if any) against the
143 /// current buffer and the buffer's tracked `last_consolidated_at`.
144 /// Returns `Ok(false)` when no policy is bound — that path is the
145 /// explicit "consolidation disabled" answer.
146 ///
147 /// For token-budget policies that need the active model's usage,
148 /// use [`Self::should_consolidate_with`] and supply the values
149 /// in [`PolicyExtras`].
150 pub async fn should_consolidate(&self, ctx: &ExecutionContext) -> Result<bool> {
151 self.should_consolidate_with(ctx, PolicyExtras::default())
152 .await
153 }
154
155 /// As [`Self::should_consolidate`], but lets the caller layer on
156 /// model-specific token usage signals or override the buffer's
157 /// internally-tracked `last_consolidated_at` (rare — useful when
158 /// a downstream system has more authoritative state).
159 pub async fn should_consolidate_with(
160 &self,
161 ctx: &ExecutionContext,
162 extras: PolicyExtras,
163 ) -> Result<bool> {
164 let Some(policy) = self.consolidation.as_ref() else {
165 return Ok(false);
166 };
167 let buffer = self.messages(ctx).await?;
168 let mut consolidation_ctx = ConsolidationContext::new(&buffer);
169 let effective_last = extras
170 .last_consolidated_at
171 .or_else(|| *self.last_consolidated_at.lock());
172 if let Some(at) = effective_last {
173 consolidation_ctx = consolidation_ctx.with_last_consolidated_at(at);
174 }
175 if let (Some(used), Some(available)) =
176 (extras.context_tokens_used, extras.context_tokens_available)
177 {
178 consolidation_ctx = consolidation_ctx.with_context_tokens(used, available);
179 }
180 Ok(policy.should_consolidate(&consolidation_ctx))
181 }
182}
183
184/// Optional signals fed into [`BufferMemory::should_consolidate_with`].
185/// Operators that don't track tokens or last-consolidated time can
186/// pass [`Self::default`] and the policy will see the buffer plus
187/// whatever the buffer itself tracks via [`BufferMemory::mark_consolidated`].
188#[derive(Clone, Copy, Debug, Default)]
189pub struct PolicyExtras {
190 /// Wall-clock time of the most recent consolidation. When
191 /// supplied, overrides the buffer's internally-tracked value
192 /// for this single check.
193 pub last_consolidated_at: Option<DateTime<Utc>>,
194 /// Tokens currently consumed in the model's context window.
195 pub context_tokens_used: Option<usize>,
196 /// Total context-window capacity for the active model.
197 pub context_tokens_available: Option<usize>,
198}