1use std::marker::PhantomData;
36use std::sync::Arc;
37use std::time::Duration;
38
39use chrono::{DateTime, Utc};
40use entelix_core::{ExecutionContext, Result};
41use serde::de::DeserializeOwned;
42use serde::{Deserialize, Serialize};
43
44use crate::namespace::Namespace;
45use crate::store::Store;
46
47#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
52#[serde(transparent)]
53pub struct EpisodeId(uuid::Uuid);
54
55impl EpisodeId {
56 #[must_use]
58 pub fn new() -> Self {
59 Self(uuid::Uuid::now_v7())
60 }
61
62 #[must_use]
65 pub const fn from_uuid(uuid: uuid::Uuid) -> Self {
66 Self(uuid)
67 }
68
69 #[must_use]
71 pub const fn as_uuid(&self) -> &uuid::Uuid {
72 &self.0
73 }
74
75 #[must_use]
79 pub fn to_hyphenated_string(&self) -> String {
80 self.0.to_string()
81 }
82}
83
84impl Default for EpisodeId {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize)]
92pub struct Episode<V> {
93 pub id: EpisodeId,
95 pub timestamp: DateTime<Utc>,
97 pub payload: V,
99}
100
101const DEFAULT_KEY: &str = "episodes";
102
103pub struct EpisodicMemory<V>
105where
106 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
107{
108 store: Arc<dyn Store<Vec<Episode<V>>>>,
109 namespace: Namespace,
110 _marker: PhantomData<fn() -> V>,
111}
112
113impl<V> EpisodicMemory<V>
114where
115 V: Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
116{
117 pub fn new(store: Arc<dyn Store<Vec<Episode<V>>>>, namespace: Namespace) -> Self {
119 Self {
120 store,
121 namespace,
122 _marker: PhantomData,
123 }
124 }
125
126 #[must_use]
128 pub const fn namespace(&self) -> &Namespace {
129 &self.namespace
130 }
131
132 pub async fn append(&self, ctx: &ExecutionContext, payload: V) -> Result<EpisodeId> {
136 let episode = Episode {
137 id: EpisodeId::new(),
138 timestamp: Utc::now(),
139 payload,
140 };
141 let id = episode.id.clone();
142 self.append_record(ctx, episode).await?;
143 Ok(id)
144 }
145
146 pub async fn append_at(
151 &self,
152 ctx: &ExecutionContext,
153 payload: V,
154 timestamp: DateTime<Utc>,
155 ) -> Result<EpisodeId> {
156 let episode = Episode {
157 id: EpisodeId::new(),
158 timestamp,
159 payload,
160 };
161 let id = episode.id.clone();
162 self.append_record(ctx, episode).await?;
163 Ok(id)
164 }
165
166 pub async fn append_record(&self, ctx: &ExecutionContext, episode: Episode<V>) -> Result<()> {
171 let mut all = self
172 .store
173 .get(ctx, &self.namespace, DEFAULT_KEY)
174 .await?
175 .unwrap_or_default();
176 let pos = all.partition_point(|e| e.timestamp <= episode.timestamp);
177 all.insert(pos, episode);
178 self.store.put(ctx, &self.namespace, DEFAULT_KEY, all).await
179 }
180
181 pub async fn all(&self, ctx: &ExecutionContext) -> Result<Vec<Episode<V>>> {
184 Ok(self
185 .store
186 .get(ctx, &self.namespace, DEFAULT_KEY)
187 .await?
188 .unwrap_or_default())
189 }
190
191 pub async fn recent(&self, ctx: &ExecutionContext, n: usize) -> Result<Vec<Episode<V>>> {
194 let mut all = self.all(ctx).await?;
195 all.reverse();
196 all.truncate(n);
197 Ok(all)
198 }
199
200 pub async fn range(
206 &self,
207 ctx: &ExecutionContext,
208 start: DateTime<Utc>,
209 end: DateTime<Utc>,
210 ) -> Result<Vec<Episode<V>>> {
211 if start > end {
212 return Ok(Vec::new());
213 }
214 let all = self.all(ctx).await?;
215 let lo = all.partition_point(|e| e.timestamp < start);
216 let hi = all.partition_point(|e| e.timestamp <= end);
217 Ok(all
222 .into_iter()
223 .skip(lo)
224 .take(hi.saturating_sub(lo))
225 .collect())
226 }
227
228 pub async fn since(
231 &self,
232 ctx: &ExecutionContext,
233 start: DateTime<Utc>,
234 ) -> Result<Vec<Episode<V>>> {
235 let all = self.all(ctx).await?;
236 let lo = all.partition_point(|e| e.timestamp < start);
237 Ok(all.into_iter().skip(lo).collect())
238 }
239
240 pub async fn count(&self, ctx: &ExecutionContext) -> Result<usize> {
242 Ok(self.all(ctx).await?.len())
243 }
244
245 pub async fn prune_older_than(&self, ctx: &ExecutionContext, ttl: Duration) -> Result<usize> {
250 let Some(mut all) = self.store.get(ctx, &self.namespace, DEFAULT_KEY).await? else {
251 return Ok(0);
252 };
253 let cutoff = Utc::now() - chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX);
257 let before = all.len();
258 all.retain(|e| e.timestamp >= cutoff);
259 let removed = before - all.len();
260 if removed > 0 {
261 self.store
262 .put(ctx, &self.namespace, DEFAULT_KEY, all)
263 .await?;
264 }
265 Ok(removed)
266 }
267
268 pub async fn clear(&self, ctx: &ExecutionContext) -> Result<()> {
270 self.store.delete(ctx, &self.namespace, DEFAULT_KEY).await
271 }
272}
273
274#[cfg(test)]
275#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
276mod tests {
277 use super::*;
278 use crate::store::InMemoryStore;
279 use entelix_core::TenantId;
280
281 fn ns(scope: &str) -> Namespace {
282 Namespace::new(TenantId::new("test-tenant")).with_scope(scope)
283 }
284
285 fn build() -> EpisodicMemory<String> {
286 let store: Arc<dyn Store<Vec<Episode<String>>>> = Arc::new(InMemoryStore::new());
287 EpisodicMemory::new(store, ns("conv"))
288 }
289
290 #[tokio::test]
291 async fn append_then_all_returns_chronological_payloads() {
292 let mem = build();
293 let ctx = ExecutionContext::new();
294 mem.append(&ctx, "first".to_owned()).await.unwrap();
295 mem.append(&ctx, "second".to_owned()).await.unwrap();
296 let all = mem.all(&ctx).await.unwrap();
297 assert_eq!(all.len(), 2);
298 assert_eq!(all[0].payload, "first");
299 assert_eq!(all[1].payload, "second");
300 assert!(all[0].timestamp <= all[1].timestamp);
301 }
302
303 #[tokio::test]
304 async fn recent_returns_descending_capped() {
305 let mem = build();
306 let ctx = ExecutionContext::new();
307 for i in 0..5 {
308 mem.append(&ctx, format!("ep-{i}")).await.unwrap();
309 }
310 let recent = mem.recent(&ctx, 3).await.unwrap();
311 assert_eq!(recent.len(), 3);
312 assert_eq!(recent[0].payload, "ep-4");
313 assert_eq!(recent[1].payload, "ep-3");
314 assert_eq!(recent[2].payload, "ep-2");
315 }
316
317 #[tokio::test]
318 async fn recent_zero_returns_empty() {
319 let mem = build();
320 let ctx = ExecutionContext::new();
321 mem.append(&ctx, "x".to_owned()).await.unwrap();
322 assert!(mem.recent(&ctx, 0).await.unwrap().is_empty());
323 }
324
325 #[tokio::test]
326 async fn range_filters_inclusive_endpoints() {
327 let mem = build();
328 let ctx = ExecutionContext::new();
329 let base = Utc::now();
330 for offset in [-30, -20, -10, 0, 10] {
331 mem.append_at(
332 &ctx,
333 format!("t{offset}"),
334 base + chrono::Duration::seconds(offset),
335 )
336 .await
337 .unwrap();
338 }
339 let window = mem
340 .range(
341 &ctx,
342 base + chrono::Duration::seconds(-20),
343 base + chrono::Duration::seconds(0),
344 )
345 .await
346 .unwrap();
347 assert_eq!(
348 window
349 .iter()
350 .map(|e| e.payload.as_str())
351 .collect::<Vec<_>>(),
352 vec!["t-20", "t-10", "t0"]
353 );
354 }
355
356 #[tokio::test]
357 async fn range_with_start_after_end_is_empty() {
358 let mem = build();
359 let ctx = ExecutionContext::new();
360 mem.append(&ctx, "x".to_owned()).await.unwrap();
361 let now = Utc::now();
362 let later = now + chrono::Duration::seconds(60);
363 assert!(mem.range(&ctx, later, now).await.unwrap().is_empty());
364 }
365
366 #[tokio::test]
367 async fn since_returns_episodes_at_or_after_cutoff() {
368 let mem = build();
369 let ctx = ExecutionContext::new();
370 let base = Utc::now();
371 mem.append_at(&ctx, "old".to_owned(), base - chrono::Duration::seconds(60))
372 .await
373 .unwrap();
374 mem.append_at(&ctx, "edge".to_owned(), base).await.unwrap();
375 mem.append_at(&ctx, "new".to_owned(), base + chrono::Duration::seconds(60))
376 .await
377 .unwrap();
378 let after = mem.since(&ctx, base).await.unwrap();
379 assert_eq!(
380 after.iter().map(|e| e.payload.as_str()).collect::<Vec<_>>(),
381 vec!["edge", "new"]
382 );
383 }
384
385 #[tokio::test]
386 async fn append_at_preserves_chronological_invariant() {
387 let mem = build();
388 let ctx = ExecutionContext::new();
389 let base = Utc::now();
390 mem.append_at(
392 &ctx,
393 "late".to_owned(),
394 base + chrono::Duration::seconds(60),
395 )
396 .await
397 .unwrap();
398 mem.append_at(
399 &ctx,
400 "early".to_owned(),
401 base - chrono::Duration::seconds(60),
402 )
403 .await
404 .unwrap();
405 mem.append_at(&ctx, "mid".to_owned(), base).await.unwrap();
406 let all = mem.all(&ctx).await.unwrap();
407 assert_eq!(
408 all.iter().map(|e| e.payload.as_str()).collect::<Vec<_>>(),
409 vec!["early", "mid", "late"]
410 );
411 }
412
413 #[tokio::test]
414 async fn prune_older_than_drops_stale_and_returns_count() {
415 let mem = build();
416 let ctx = ExecutionContext::new();
417 let now = Utc::now();
418 mem.append_at(&ctx, "old".to_owned(), now - chrono::Duration::seconds(120))
419 .await
420 .unwrap();
421 mem.append_at(&ctx, "fresh".to_owned(), now - chrono::Duration::seconds(5))
422 .await
423 .unwrap();
424 let removed = mem
425 .prune_older_than(&ctx, Duration::from_mins(1))
426 .await
427 .unwrap();
428 assert_eq!(removed, 1);
429 let remaining = mem.all(&ctx).await.unwrap();
430 assert_eq!(remaining.len(), 1);
431 assert_eq!(remaining[0].payload, "fresh");
432 }
433
434 #[tokio::test]
435 async fn prune_on_empty_namespace_is_noop() {
436 let mem = build();
437 let ctx = ExecutionContext::new();
438 assert_eq!(
439 mem.prune_older_than(&ctx, Duration::from_secs(0))
440 .await
441 .unwrap(),
442 0
443 );
444 }
445
446 #[tokio::test]
447 async fn count_and_clear_round_trip() {
448 let mem = build();
449 let ctx = ExecutionContext::new();
450 for i in 0..3 {
451 mem.append(&ctx, format!("e{i}")).await.unwrap();
452 }
453 assert_eq!(mem.count(&ctx).await.unwrap(), 3);
454 mem.clear(&ctx).await.unwrap();
455 assert_eq!(mem.count(&ctx).await.unwrap(), 0);
456 assert!(mem.all(&ctx).await.unwrap().is_empty());
457 }
458
459 #[tokio::test]
460 async fn namespaces_are_isolated() {
461 let store: Arc<dyn Store<Vec<Episode<String>>>> = Arc::new(InMemoryStore::new());
462 let alpha = EpisodicMemory::new(Arc::clone(&store), ns("alpha"));
463 let beta = EpisodicMemory::new(store, ns("beta"));
464 let ctx = ExecutionContext::new();
465 alpha.append(&ctx, "alpha-1".to_owned()).await.unwrap();
466 beta.append(&ctx, "beta-1".to_owned()).await.unwrap();
467 let alpha_all = alpha.all(&ctx).await.unwrap();
468 let beta_all = beta.all(&ctx).await.unwrap();
469 assert_eq!(alpha_all.len(), 1);
470 assert_eq!(beta_all.len(), 1);
471 assert_eq!(alpha_all[0].payload, "alpha-1");
472 assert_eq!(beta_all[0].payload, "beta-1");
473 }
474
475 #[tokio::test]
476 async fn append_record_with_external_id_preserves_id() {
477 let mem = build();
478 let ctx = ExecutionContext::new();
479 let id = EpisodeId::from_uuid(uuid::Uuid::now_v7());
480 mem.append_record(
481 &ctx,
482 Episode {
483 id: id.clone(),
484 timestamp: Utc::now(),
485 payload: "imported".to_owned(),
486 },
487 )
488 .await
489 .unwrap();
490 let all = mem.all(&ctx).await.unwrap();
491 assert_eq!(all[0].id, id);
492 }
493}