Skip to main content

entelix_memory/
episodic.rs

1//! `EpisodicMemory<V>` — append-only, time-ordered store of
2//! domain-shaped `Episode<V>` records keyed by [`Namespace`].
3//!
4//! Where [`crate::EntityMemory`] answers "what is the current fact
5//! about X?" and [`crate::SemanticMemory`] answers "what stored
6//! content resembles this query?", `EpisodicMemory` answers
7//! questions about *time* — "what happened in this thread between
8//! Tuesday and Friday?", "what were the last five things this
9//! agent did?". The payload `V` stays operator-domain-shaped so
10//! the same memory pattern serves conversation episodes,
11//! task-completion records, decision logs, or any other
12//! time-stamped event the agent wants to revisit.
13//!
14//! ## Storage shape
15//!
16//! Every namespace holds a single `Vec<Episode<V>>` under one
17//! store key. The vector is maintained in non-decreasing
18//! `timestamp` order — fresh appends use `Utc::now()` (always
19//! ≥ the prior tail), and [`EpisodicMemory::append_at`]'s
20//! caller-supplied timestamp is binary-inserted to preserve the
21//! invariant. The single-key design mirrors [`crate::EntityMemory`]
22//! so any [`crate::Store`] backend works unchanged. Companion
23//! crates that need per-row indexing for very long histories
24//! ship a dedicated backend without changing the surface here.
25//!
26//! ## Episode identity
27//!
28//! Each [`Episode`] carries an [`EpisodeId`] (UUID v7 — time
29//! ordered) the operator can quote in audit trails or use to
30//! correlate with external systems. The id is generated at
31//! `append` time; callers backfilling from an external source can
32//! call [`EpisodicMemory::append_record`] with a pre-built
33//! [`Episode`] instead.
34
35use std::marker::PhantomData;
36use std::sync::Arc;
37use std::time::Duration;
38
39use chrono::{DateTime, Utc};
40use entelix_core::{ExecutionContext, Result};
41use serde::de::DeserializeOwned;
42use serde::{Deserialize, Serialize};
43
44use crate::namespace::Namespace;
45use crate::store::Store;
46
47/// Stable identifier for one episode. Backed by UUID v7 so two ids
48/// minted in order compare in the same order — the audit trail and
49/// external correlation paths stay consistent without a separate
50/// sequence column.
51#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
52#[serde(transparent)]
53pub struct EpisodeId(uuid::Uuid);
54
55impl EpisodeId {
56    /// Generate a fresh time-ordered id.
57    #[must_use]
58    pub fn new() -> Self {
59        Self(uuid::Uuid::now_v7())
60    }
61
62    /// Reconstruct an id from a `uuid::Uuid` — used by persistence
63    /// backends decoding stored rows.
64    #[must_use]
65    pub const fn from_uuid(uuid: uuid::Uuid) -> Self {
66        Self(uuid)
67    }
68
69    /// Borrow the underlying UUID.
70    #[must_use]
71    pub const fn as_uuid(&self) -> &uuid::Uuid {
72        &self.0
73    }
74
75    /// Render as a hyphenated string. Mirrors
76    /// `CheckpointId::to_hyphenated_string` (entelix-graph) so id
77    /// surfaces line up across audit channels.
78    #[must_use]
79    pub fn to_hyphenated_string(&self) -> String {
80        self.0.to_string()
81    }
82}
83
84impl Default for EpisodeId {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90/// One time-stamped episode of operator-shaped payload.
91#[derive(Clone, Debug, Serialize, Deserialize)]
92pub struct Episode<V> {
93    /// Unique identifier (UUID v7).
94    pub id: EpisodeId,
95    /// Wall-clock time the episode was recorded.
96    pub timestamp: DateTime<Utc>,
97    /// Operator-supplied payload.
98    pub payload: V,
99}
100
101const DEFAULT_KEY: &str = "episodes";
102
103/// Time-ordered append-only episode store keyed by [`Namespace`].
104pub struct EpisodicMemory<V>
105where
106    V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
107{
108    store: Arc<dyn Store<Vec<Episode<V>>>>,
109    namespace: Namespace,
110    _marker: PhantomData<fn() -> V>,
111}
112
113impl<V> EpisodicMemory<V>
114where
115    V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
116{
117    /// Build an episodic memory over `store` scoped to `namespace`.
118    pub fn new(store: Arc<dyn Store<Vec<Episode<V>>>>, namespace: Namespace) -> Self {
119        Self {
120            store,
121            namespace,
122            _marker: PhantomData,
123        }
124    }
125
126    /// Borrow the bound namespace.
127    #[must_use]
128    pub const fn namespace(&self) -> &Namespace {
129        &self.namespace
130    }
131
132    /// Append a fresh episode timestamped at `Utc::now()`. Returns
133    /// the id so the caller can correlate with audit / external
134    /// systems.
135    pub async fn append(&self, ctx: &ExecutionContext, payload: V) -> Result<EpisodeId> {
136        let episode = Episode {
137            id: EpisodeId::new(),
138            timestamp: Utc::now(),
139            payload,
140        };
141        let id = episode.id.clone();
142        self.append_record(ctx, episode).await?;
143        Ok(id)
144    }
145
146    /// Append at a caller-supplied timestamp. Use when backfilling
147    /// from an external ledger or replaying historical events. The
148    /// new entry is binary-inserted so the stored vector stays in
149    /// non-decreasing `timestamp` order.
150    pub async fn append_at(
151        &self,
152        ctx: &ExecutionContext,
153        payload: V,
154        timestamp: DateTime<Utc>,
155    ) -> Result<EpisodeId> {
156        let episode = Episode {
157            id: EpisodeId::new(),
158            timestamp,
159            payload,
160        };
161        let id = episode.id.clone();
162        self.append_record(ctx, episode).await?;
163        Ok(id)
164    }
165
166    /// Append a fully-formed [`Episode`]. Use when the caller is
167    /// migrating records minted elsewhere (a UUID + timestamp pair
168    /// already exists). The entry is inserted at the correct
169    /// position to preserve chronological order.
170    pub async fn append_record(&self, ctx: &ExecutionContext, episode: Episode<V>) -> Result<()> {
171        let mut all = self
172            .store
173            .get(ctx, &self.namespace, DEFAULT_KEY)
174            .await?
175            .unwrap_or_default();
176        let pos = all.partition_point(|e| e.timestamp <= episode.timestamp);
177        all.insert(pos, episode);
178        self.store.put(ctx, &self.namespace, DEFAULT_KEY, all).await
179    }
180
181    /// Read every episode in chronological order. Empty namespaces
182    /// return `Ok(vec![])`.
183    pub async fn all(&self, ctx: &ExecutionContext) -> Result<Vec<Episode<V>>> {
184        Ok(self
185            .store
186            .get(ctx, &self.namespace, DEFAULT_KEY)
187            .await?
188            .unwrap_or_default())
189    }
190
191    /// Most-recent-first slice of up to `n` episodes. `n = 0`
192    /// returns an empty vector.
193    pub async fn recent(&self, ctx: &ExecutionContext, n: usize) -> Result<Vec<Episode<V>>> {
194        let mut all = self.all(ctx).await?;
195        all.reverse();
196        all.truncate(n);
197        Ok(all)
198    }
199
200    /// Episodes whose `timestamp` falls in the inclusive range
201    /// `[start, end]`. Order is chronological. `start > end`
202    /// returns an empty vector rather than erroring — the question
203    /// "what happened between two timestamps?" is well-defined
204    /// even when the answer is empty.
205    pub async fn range(
206        &self,
207        ctx: &ExecutionContext,
208        start: DateTime<Utc>,
209        end: DateTime<Utc>,
210    ) -> Result<Vec<Episode<V>>> {
211        if start > end {
212            return Ok(Vec::new());
213        }
214        let all = self.all(ctx).await?;
215        let lo = all.partition_point(|e| e.timestamp < start);
216        let hi = all.partition_point(|e| e.timestamp <= end);
217        // partition_point returns 0..=len; `lo <= hi` holds because the
218        // first predicate (`< start`) implies the second (`<= end`).
219        // `into_iter().skip(lo).take(hi - lo)` avoids the indexing-
220        // slicing lint without sacrificing clarity.
221        Ok(all
222            .into_iter()
223            .skip(lo)
224            .take(hi.saturating_sub(lo))
225            .collect())
226    }
227
228    /// Episodes whose `timestamp` is greater than or equal to
229    /// `start`. Order is chronological.
230    pub async fn since(
231        &self,
232        ctx: &ExecutionContext,
233        start: DateTime<Utc>,
234    ) -> Result<Vec<Episode<V>>> {
235        let all = self.all(ctx).await?;
236        let lo = all.partition_point(|e| e.timestamp < start);
237        Ok(all.into_iter().skip(lo).collect())
238    }
239
240    /// Total stored episode count.
241    pub async fn count(&self, ctx: &ExecutionContext) -> Result<usize> {
242        Ok(self.all(ctx).await?.len())
243    }
244
245    /// Drop every episode older than `ttl`. Returns the removal
246    /// count so callers can log or expose pruning metrics. Atomic
247    /// per-thread (single read-modify-write under one store key,
248    /// matching [`crate::EntityMemory::prune_older_than`]).
249    pub async fn prune_older_than(&self, ctx: &ExecutionContext, ttl: Duration) -> Result<usize> {
250        let Some(mut all) = self.store.get(ctx, &self.namespace, DEFAULT_KEY).await? else {
251            return Ok(0);
252        };
253        // chrono::Duration is signed and uses i64 nanoseconds; for
254        // pathological ttls (above i64::MAX seconds) saturate to
255        // chrono::Duration::MAX so the cutoff stays in the past.
256        let cutoff = Utc::now() - chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX);
257        let before = all.len();
258        all.retain(|e| e.timestamp >= cutoff);
259        let removed = before - all.len();
260        if removed > 0 {
261            self.store
262                .put(ctx, &self.namespace, DEFAULT_KEY, all)
263                .await?;
264        }
265        Ok(removed)
266    }
267
268    /// Drop every episode in this namespace.
269    pub async fn clear(&self, ctx: &ExecutionContext) -> Result<()> {
270        self.store.delete(ctx, &self.namespace, DEFAULT_KEY).await
271    }
272}
273
274#[cfg(test)]
275#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
276mod tests {
277    use super::*;
278    use crate::store::InMemoryStore;
279    use entelix_core::TenantId;
280
281    fn ns(scope: &str) -> Namespace {
282        Namespace::new(TenantId::new("test-tenant")).with_scope(scope)
283    }
284
285    fn build() -> EpisodicMemory<String> {
286        let store: Arc<dyn Store<Vec<Episode<String>>>> = Arc::new(InMemoryStore::new());
287        EpisodicMemory::new(store, ns("conv"))
288    }
289
290    #[tokio::test]
291    async fn append_then_all_returns_chronological_payloads() {
292        let mem = build();
293        let ctx = ExecutionContext::new();
294        mem.append(&ctx, "first".to_owned()).await.unwrap();
295        mem.append(&ctx, "second".to_owned()).await.unwrap();
296        let all = mem.all(&ctx).await.unwrap();
297        assert_eq!(all.len(), 2);
298        assert_eq!(all[0].payload, "first");
299        assert_eq!(all[1].payload, "second");
300        assert!(all[0].timestamp <= all[1].timestamp);
301    }
302
303    #[tokio::test]
304    async fn recent_returns_descending_capped() {
305        let mem = build();
306        let ctx = ExecutionContext::new();
307        for i in 0..5 {
308            mem.append(&ctx, format!("ep-{i}")).await.unwrap();
309        }
310        let recent = mem.recent(&ctx, 3).await.unwrap();
311        assert_eq!(recent.len(), 3);
312        assert_eq!(recent[0].payload, "ep-4");
313        assert_eq!(recent[1].payload, "ep-3");
314        assert_eq!(recent[2].payload, "ep-2");
315    }
316
317    #[tokio::test]
318    async fn recent_zero_returns_empty() {
319        let mem = build();
320        let ctx = ExecutionContext::new();
321        mem.append(&ctx, "x".to_owned()).await.unwrap();
322        assert!(mem.recent(&ctx, 0).await.unwrap().is_empty());
323    }
324
325    #[tokio::test]
326    async fn range_filters_inclusive_endpoints() {
327        let mem = build();
328        let ctx = ExecutionContext::new();
329        let base = Utc::now();
330        for offset in [-30, -20, -10, 0, 10] {
331            mem.append_at(
332                &ctx,
333                format!("t{offset}"),
334                base + chrono::Duration::seconds(offset),
335            )
336            .await
337            .unwrap();
338        }
339        let window = mem
340            .range(
341                &ctx,
342                base + chrono::Duration::seconds(-20),
343                base + chrono::Duration::seconds(0),
344            )
345            .await
346            .unwrap();
347        assert_eq!(
348            window
349                .iter()
350                .map(|e| e.payload.as_str())
351                .collect::<Vec<_>>(),
352            vec!["t-20", "t-10", "t0"]
353        );
354    }
355
356    #[tokio::test]
357    async fn range_with_start_after_end_is_empty() {
358        let mem = build();
359        let ctx = ExecutionContext::new();
360        mem.append(&ctx, "x".to_owned()).await.unwrap();
361        let now = Utc::now();
362        let later = now + chrono::Duration::seconds(60);
363        assert!(mem.range(&ctx, later, now).await.unwrap().is_empty());
364    }
365
366    #[tokio::test]
367    async fn since_returns_episodes_at_or_after_cutoff() {
368        let mem = build();
369        let ctx = ExecutionContext::new();
370        let base = Utc::now();
371        mem.append_at(&ctx, "old".to_owned(), base - chrono::Duration::seconds(60))
372            .await
373            .unwrap();
374        mem.append_at(&ctx, "edge".to_owned(), base).await.unwrap();
375        mem.append_at(&ctx, "new".to_owned(), base + chrono::Duration::seconds(60))
376            .await
377            .unwrap();
378        let after = mem.since(&ctx, base).await.unwrap();
379        assert_eq!(
380            after.iter().map(|e| e.payload.as_str()).collect::<Vec<_>>(),
381            vec!["edge", "new"]
382        );
383    }
384
385    #[tokio::test]
386    async fn append_at_preserves_chronological_invariant() {
387        let mem = build();
388        let ctx = ExecutionContext::new();
389        let base = Utc::now();
390        // Out-of-order arrivals — store must binary-insert.
391        mem.append_at(
392            &ctx,
393            "late".to_owned(),
394            base + chrono::Duration::seconds(60),
395        )
396        .await
397        .unwrap();
398        mem.append_at(
399            &ctx,
400            "early".to_owned(),
401            base - chrono::Duration::seconds(60),
402        )
403        .await
404        .unwrap();
405        mem.append_at(&ctx, "mid".to_owned(), base).await.unwrap();
406        let all = mem.all(&ctx).await.unwrap();
407        assert_eq!(
408            all.iter().map(|e| e.payload.as_str()).collect::<Vec<_>>(),
409            vec!["early", "mid", "late"]
410        );
411    }
412
413    #[tokio::test]
414    async fn prune_older_than_drops_stale_and_returns_count() {
415        let mem = build();
416        let ctx = ExecutionContext::new();
417        let now = Utc::now();
418        mem.append_at(&ctx, "old".to_owned(), now - chrono::Duration::seconds(120))
419            .await
420            .unwrap();
421        mem.append_at(&ctx, "fresh".to_owned(), now - chrono::Duration::seconds(5))
422            .await
423            .unwrap();
424        let removed = mem
425            .prune_older_than(&ctx, Duration::from_mins(1))
426            .await
427            .unwrap();
428        assert_eq!(removed, 1);
429        let remaining = mem.all(&ctx).await.unwrap();
430        assert_eq!(remaining.len(), 1);
431        assert_eq!(remaining[0].payload, "fresh");
432    }
433
434    #[tokio::test]
435    async fn prune_on_empty_namespace_is_noop() {
436        let mem = build();
437        let ctx = ExecutionContext::new();
438        assert_eq!(
439            mem.prune_older_than(&ctx, Duration::from_secs(0))
440                .await
441                .unwrap(),
442            0
443        );
444    }
445
446    #[tokio::test]
447    async fn count_and_clear_round_trip() {
448        let mem = build();
449        let ctx = ExecutionContext::new();
450        for i in 0..3 {
451            mem.append(&ctx, format!("e{i}")).await.unwrap();
452        }
453        assert_eq!(mem.count(&ctx).await.unwrap(), 3);
454        mem.clear(&ctx).await.unwrap();
455        assert_eq!(mem.count(&ctx).await.unwrap(), 0);
456        assert!(mem.all(&ctx).await.unwrap().is_empty());
457    }
458
459    #[tokio::test]
460    async fn namespaces_are_isolated() {
461        let store: Arc<dyn Store<Vec<Episode<String>>>> = Arc::new(InMemoryStore::new());
462        let alpha = EpisodicMemory::new(Arc::clone(&store), ns("alpha"));
463        let beta = EpisodicMemory::new(store, ns("beta"));
464        let ctx = ExecutionContext::new();
465        alpha.append(&ctx, "alpha-1".to_owned()).await.unwrap();
466        beta.append(&ctx, "beta-1".to_owned()).await.unwrap();
467        let alpha_all = alpha.all(&ctx).await.unwrap();
468        let beta_all = beta.all(&ctx).await.unwrap();
469        assert_eq!(alpha_all.len(), 1);
470        assert_eq!(beta_all.len(), 1);
471        assert_eq!(alpha_all[0].payload, "alpha-1");
472        assert_eq!(beta_all[0].payload, "beta-1");
473    }
474
475    #[tokio::test]
476    async fn append_record_with_external_id_preserves_id() {
477        let mem = build();
478        let ctx = ExecutionContext::new();
479        let id = EpisodeId::from_uuid(uuid::Uuid::now_v7());
480        mem.append_record(
481            &ctx,
482            Episode {
483                id: id.clone(),
484                timestamp: Utc::now(),
485                payload: "imported".to_owned(),
486            },
487        )
488        .await
489        .unwrap();
490        let all = mem.all(&ctx).await.unwrap();
491        assert_eq!(all[0].id, id);
492    }
493}