Skip to main content

entelix_memory/
buffer.rs

1//! `BufferMemory` — keep the last *N* turns of a conversation in a
2//! `Store<Vec<Message>>`. Simplest of the LangChain-style memory
3//! patterns.
4
5use std::sync::Arc;
6
7use chrono::{DateTime, Utc};
8use entelix_core::ir::Message;
9use entelix_core::{ExecutionContext, Result};
10use parking_lot::Mutex;
11
12use crate::consolidation::{ConsolidationContext, ConsolidationPolicy};
13use crate::namespace::Namespace;
14use crate::store::Store;
15
16const DEFAULT_KEY: &str = "buffer";
17
18/// Bounded conversation buffer.
19///
20/// Each call to `append` pushes one message and (if the buffer exceeds
21/// `max_turns`) drops the oldest. `messages()` returns the full retained
22/// list.
23///
24/// An optional [`ConsolidationPolicy`] can be attached via
25/// [`Self::with_consolidation_policy`]. When attached,
26/// [`Self::should_consolidate`] surfaces the policy's decision so the
27/// caller (typically an agent recipe) can run an LLM-driven
28/// summarisation pass and write the result into a sibling
29/// [`crate::SummaryMemory`] before clearing the buffer. The buffer
30/// tracks `last_consolidated_at` itself — recipes call
31/// [`Self::mark_consolidated`] after a successful summarisation pass
32/// and the time-aware policies see the updated timestamp on the next
33/// check, so the consolidation loop reduces to:
34///
35/// ```ignore
36/// if buffer.should_consolidate(&ctx).await? {
37///     run_summariser(&ctx, &buffer, &summary).await?;
38///     buffer.clear(&ctx).await?;
39///     buffer.mark_consolidated_now();
40/// }
41/// ```
42pub struct BufferMemory {
43    store: Arc<dyn Store<Vec<Message>>>,
44    namespace: Namespace,
45    max_turns: usize,
46    consolidation: Option<Arc<dyn ConsolidationPolicy>>,
47    last_consolidated_at: Mutex<Option<DateTime<Utc>>>,
48}
49
50impl BufferMemory {
51    /// Build a buffer over `store` scoped to `namespace`, retaining at
52    /// most `max_turns` messages.
53    pub fn new(
54        store: Arc<dyn Store<Vec<Message>>>,
55        namespace: Namespace,
56        max_turns: usize,
57    ) -> Self {
58        Self {
59            store,
60            namespace,
61            max_turns,
62            consolidation: None,
63            last_consolidated_at: Mutex::new(None),
64        }
65    }
66
67    /// Attach a [`ConsolidationPolicy`]. The buffer itself never
68    /// performs the summarisation — the policy only *decides* when
69    /// the caller should — but having the policy bound here means
70    /// agent recipes can ask the buffer directly via
71    /// [`Self::should_consolidate`] without threading the policy
72    /// through every call site.
73    #[must_use]
74    pub fn with_consolidation_policy(mut self, policy: Arc<dyn ConsolidationPolicy>) -> Self {
75        self.consolidation = Some(policy);
76        self
77    }
78
79    /// Effective retention cap.
80    pub const fn max_turns(&self) -> usize {
81        self.max_turns
82    }
83
84    /// Borrow the bound namespace.
85    pub const fn namespace(&self) -> &Namespace {
86        &self.namespace
87    }
88
89    /// Wall-clock time of the most recent successful consolidation,
90    /// as recorded via [`Self::mark_consolidated`]. Returns `None`
91    /// before the first consolidation has been marked.
92    pub fn last_consolidated_at(&self) -> Option<DateTime<Utc>> {
93        *self.last_consolidated_at.lock()
94    }
95
96    /// Record that a consolidation pass completed at `at`. Recipes
97    /// call this after a successful summarise-and-clear cycle so
98    /// time-aware policies (throttling, "at most once per hour")
99    /// observe the new floor on the next check.
100    pub fn mark_consolidated(&self, at: DateTime<Utc>) {
101        *self.last_consolidated_at.lock() = Some(at);
102    }
103
104    /// Convenience over [`Self::mark_consolidated`] using
105    /// [`chrono::Utc::now`]. Use when the caller doesn't already
106    /// have a timestamp in hand.
107    pub fn mark_consolidated_now(&self) {
108        self.mark_consolidated(Utc::now());
109    }
110
111    /// Append `message`, dropping the oldest entries when over capacity.
112    pub async fn append(&self, ctx: &ExecutionContext, message: Message) -> Result<()> {
113        let mut messages = self
114            .store
115            .get(ctx, &self.namespace, DEFAULT_KEY)
116            .await?
117            .unwrap_or_default();
118        messages.push(message);
119        // Drop oldest while over budget.
120        while messages.len() > self.max_turns {
121            messages.remove(0);
122        }
123        self.store
124            .put(ctx, &self.namespace, DEFAULT_KEY, messages)
125            .await
126    }
127
128    /// Read the retained messages oldest-first.
129    pub async fn messages(&self, ctx: &ExecutionContext) -> Result<Vec<Message>> {
130        Ok(self
131            .store
132            .get(ctx, &self.namespace, DEFAULT_KEY)
133            .await?
134            .unwrap_or_default())
135    }
136
137    /// Clear the buffer.
138    pub async fn clear(&self, ctx: &ExecutionContext) -> Result<()> {
139        self.store.delete(ctx, &self.namespace, DEFAULT_KEY).await
140    }
141
142    /// Consult the bound [`ConsolidationPolicy`] (if any) against the
143    /// current buffer and the buffer's tracked `last_consolidated_at`.
144    /// Returns `Ok(false)` when no policy is bound — that path is the
145    /// explicit "consolidation disabled" answer.
146    ///
147    /// For token-budget policies that need the active model's usage,
148    /// use [`Self::should_consolidate_with`] and supply the values
149    /// in [`PolicyExtras`].
150    pub async fn should_consolidate(&self, ctx: &ExecutionContext) -> Result<bool> {
151        self.should_consolidate_with(ctx, PolicyExtras::default())
152            .await
153    }
154
155    /// As [`Self::should_consolidate`], but lets the caller layer on
156    /// model-specific token usage signals or override the buffer's
157    /// internally-tracked `last_consolidated_at` (rare — useful when
158    /// a downstream system has more authoritative state).
159    pub async fn should_consolidate_with(
160        &self,
161        ctx: &ExecutionContext,
162        extras: PolicyExtras,
163    ) -> Result<bool> {
164        let Some(policy) = self.consolidation.as_ref() else {
165            return Ok(false);
166        };
167        let buffer = self.messages(ctx).await?;
168        let mut consolidation_ctx = ConsolidationContext::new(&buffer);
169        let effective_last = extras
170            .last_consolidated_at
171            .or_else(|| *self.last_consolidated_at.lock());
172        if let Some(at) = effective_last {
173            consolidation_ctx = consolidation_ctx.with_last_consolidated_at(at);
174        }
175        if let (Some(used), Some(available)) =
176            (extras.context_tokens_used, extras.context_tokens_available)
177        {
178            consolidation_ctx = consolidation_ctx.with_context_tokens(used, available);
179        }
180        Ok(policy.should_consolidate(&consolidation_ctx))
181    }
182}
183
184/// Optional signals fed into [`BufferMemory::should_consolidate_with`].
185/// Operators that don't track tokens or last-consolidated time can
186/// pass [`Self::default`] and the policy will see the buffer plus
187/// whatever the buffer itself tracks via [`BufferMemory::mark_consolidated`].
188#[derive(Clone, Copy, Debug, Default)]
189pub struct PolicyExtras {
190    /// Wall-clock time of the most recent consolidation. When
191    /// supplied, overrides the buffer's internally-tracked value
192    /// for this single check.
193    pub last_consolidated_at: Option<DateTime<Utc>>,
194    /// Tokens currently consumed in the model's context window.
195    pub context_tokens_used: Option<usize>,
196    /// Total context-window capacity for the active model.
197    pub context_tokens_available: Option<usize>,
198}