Skip to main content

zeph_memory/
eviction.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Memory eviction subsystem.
5//!
6//! Provides a trait-based eviction policy framework with an Ebbinghaus
7//! forgetting curve implementation. The background sweep loop runs
8//! periodically, scoring entries and soft-deleting the lowest-scoring ones
9//! from `SQLite` before removing their `Qdrant` vectors in a second phase.
10//!
11//! Two-phase design ensures crash safety: soft-deleted `SQLite` rows are
12//! invisible to the application immediately, and `Qdrant` cleanup is retried
13//! on the next sweep if the agent crashes between phases.
14
15use std::sync::Arc;
16
17use tokio::time::{Duration, interval};
18use tokio_util::sync::CancellationToken;
19
20use crate::embedding_store::EmbeddingStore;
21use crate::error::MemoryError;
22use crate::store::SqliteStore;
23use crate::types::MessageId;
24
25// ── Public types ──────────────────────────────────────────────────────────────
26
27/// Metadata for a single memory entry evaluated by [`EvictionPolicy::score`].
28#[derive(Debug, Clone)]
29pub struct EvictionEntry {
30    /// `SQLite` row ID of the message.
31    pub id: MessageId,
32    /// ISO 8601 creation timestamp (TEXT column from `SQLite`, UTC).
33    pub created_at: String,
34    /// ISO 8601 last-accessed timestamp, or `None` if never accessed after creation.
35    pub last_accessed: Option<String>,
36    /// Number of times this message has been retrieved via recall.
37    pub access_count: u32,
38}
39
40/// Trait for eviction scoring strategies.
41///
42/// Implementations must be `Send + Sync` so they can be shared across threads.
43pub trait EvictionPolicy: Send + Sync {
44    /// Compute a retention score for the given entry.
45    ///
46    /// Higher scores mean the entry is more likely to be retained.
47    /// Lower scores mean the entry is a candidate for eviction.
48    fn score(&self, entry: &EvictionEntry) -> f64;
49}
50
51use zeph_config::memory::EvictionConfig;
52
53// ── Ebbinghaus policy ─────────────────────────────────────────────────────────
54
55/// Ebbinghaus forgetting curve eviction policy.
56///
57/// Score formula:
58///   `score = exp(-t / (S * ln(1 + n)))`
59///
60/// Where:
61/// - `t` = seconds since `last_accessed` (or `created_at` if never accessed)
62/// - `S` = `retention_strength` (higher = slower decay)
63/// - `n` = `access_count`
64///
65/// Entries with a high access count or recent access get higher scores
66/// and are less likely to be evicted.
67pub struct EbbinghausPolicy {
68    retention_strength: f64,
69}
70
71impl EbbinghausPolicy {
72    /// Create a new policy with the given retention strength.
73    ///
74    /// A good default is `86400.0` (one day in seconds).
75    #[must_use]
76    pub fn new(retention_strength: f64) -> Self {
77        Self { retention_strength }
78    }
79}
80
81impl Default for EbbinghausPolicy {
82    fn default() -> Self {
83        Self::new(86_400.0) // 1 day
84    }
85}
86
87impl EvictionPolicy for EbbinghausPolicy {
88    fn score(&self, entry: &EvictionEntry) -> f64 {
89        let now_secs = unix_now_secs();
90
91        let reference_secs = entry
92            .last_accessed
93            .as_deref()
94            .and_then(parse_sqlite_timestamp_secs)
95            .unwrap_or_else(|| parse_sqlite_timestamp_secs(&entry.created_at).unwrap_or(now_secs));
96
97        // Clamp t >= 0 to handle clock skew or future timestamps.
98        #[allow(clippy::cast_precision_loss)]
99        let t = now_secs.saturating_sub(reference_secs) as f64;
100        let n = f64::from(entry.access_count);
101
102        // ln(1 + 0) = 0 which would divide by zero — use 1.0 as minimum denominator.
103        let denominator = (self.retention_strength * (1.0_f64 + n).ln()).max(1.0);
104        (-t / denominator).exp()
105    }
106}
107
108fn unix_now_secs() -> u64 {
109    std::time::SystemTime::now()
110        .duration_since(std::time::UNIX_EPOCH)
111        .map_or(0, |d| d.as_secs())
112}
113
114/// Parse a `SQLite` TEXT timestamp ("YYYY-MM-DD HH:MM:SS") into Unix seconds.
115///
116/// Does not use `chrono` to avoid adding a dependency to `zeph-memory`.
117fn parse_sqlite_timestamp_secs(s: &str) -> Option<u64> {
118    // Expected format: "YYYY-MM-DD HH:MM:SS"
119    let s = s.trim();
120    if s.len() < 19 {
121        return None;
122    }
123    let year: u64 = s[0..4].parse().ok()?;
124    let month: u64 = s[5..7].parse().ok()?;
125    let day: u64 = s[8..10].parse().ok()?;
126    let hour: u64 = s[11..13].parse().ok()?;
127    let min: u64 = s[14..16].parse().ok()?;
128    let sec: u64 = s[17..19].parse().ok()?;
129
130    // Days since Unix epoch (1970-01-01). Simple but accurate for years 1970-2099.
131    // Leap year calculation: divisible by 4 and not 100, or divisible by 400.
132    let is_leap = |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
133    let days_in_month = |y: u64, m: u64| -> u64 {
134        match m {
135            1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
136            4 | 6 | 9 | 11 => 30,
137            2 => {
138                if is_leap(y) {
139                    29
140                } else {
141                    28
142                }
143            }
144            _ => 0,
145        }
146    };
147
148    let mut days: u64 = 0;
149    for y in 1970..year {
150        days += if is_leap(y) { 366 } else { 365 };
151    }
152    for m in 1..month {
153        days += days_in_month(year, m);
154    }
155    days += day.saturating_sub(1);
156
157    Some(days * 86400 + hour * 3600 + min * 60 + sec)
158}
159
160// ── Sweep loop ────────────────────────────────────────────────────────────────
161
162/// Start the background eviction loop.
163///
164/// The loop runs every `config.sweep_interval_secs` seconds. Each iteration:
165/// 1. Queries `SQLite` for all non-deleted entries and their eviction metadata.
166/// 2. Scores each entry using `policy`.
167/// 3. If the count exceeds `config.max_entries`, soft-deletes the excess lowest-scoring rows.
168/// 4. Queries for all soft-deleted rows, deletes their Qdrant vectors via `embedding` (when
169///    `Some`), then marks the rows clean. If deletion fails, clean-up is retried next sweep.
170///
171/// If `config.max_entries == 0`, the loop exits immediately without doing anything.
172///
173/// Pass `embedding: None` when Qdrant is disabled — the loop will still clean `SQLite`
174/// bookkeeping without attempting any vector deletion.
175///
176/// # Errors (non-fatal)
177///
178/// Database and Qdrant errors are logged but do not stop the loop.
179pub async fn start_eviction_loop(
180    store: Arc<SqliteStore>,
181    embedding: Option<Arc<EmbeddingStore>>,
182    config: EvictionConfig,
183    policy: Arc<dyn EvictionPolicy + 'static>,
184    cancel: CancellationToken,
185) {
186    if config.max_entries == 0 {
187        tracing::debug!("eviction disabled (max_entries = 0)");
188        return;
189    }
190
191    let mut ticker = interval(Duration::from_secs(config.sweep_interval_secs));
192    // Skip the first immediate tick so the loop doesn't run at startup.
193    ticker.tick().await;
194
195    loop {
196        tokio::select! {
197            () = cancel.cancelled() => {
198                tracing::debug!("eviction loop shutting down");
199                return;
200            }
201            _ = ticker.tick() => {}
202        }
203
204        tracing::debug!(max_entries = config.max_entries, "running eviction sweep");
205
206        // Phase 1: score and soft-delete excess entries.
207        match run_eviction_phase1(&store, &*policy, config.max_entries).await {
208            Ok(deleted) => {
209                if deleted > 0 {
210                    tracing::info!(deleted, "eviction phase 1: soft-deleted entries");
211                }
212            }
213            Err(e) => {
214                tracing::warn!(error = %e, "eviction phase 1 failed, will retry next sweep");
215            }
216        }
217
218        // Phase 2: delete Qdrant vectors for soft-deleted entries, then mark them clean.
219        // On startup or after a crash this also cleans up any orphaned vectors.
220        match run_eviction_phase2(&store, embedding.as_deref()).await {
221            Ok(cleaned) => {
222                if cleaned > 0 {
223                    tracing::info!(cleaned, "eviction phase 2: removed Qdrant vectors");
224                }
225            }
226            Err(e) => {
227                tracing::warn!(error = %e, "eviction phase 2 failed, will retry next sweep");
228            }
229        }
230    }
231}
232
233#[cfg_attr(
234    feature = "profiling",
235    tracing::instrument(name = "memory.eviction_phase1", skip_all)
236)]
237async fn run_eviction_phase1(
238    store: &SqliteStore,
239    policy: &dyn EvictionPolicy,
240    max_entries: usize,
241) -> Result<usize, MemoryError> {
242    let candidates = store.get_eviction_candidates().await?;
243    let total = candidates.len();
244
245    if total <= max_entries {
246        return Ok(0);
247    }
248
249    let excess = total - max_entries;
250    let mut scored: Vec<(f64, MessageId)> = candidates
251        .into_iter()
252        .map(|e| (policy.score(&e), e.id))
253        .collect();
254
255    // Sort ascending by score — lowest scores (most forgettable) first.
256    scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
257
258    let candidates_to_delete: Vec<MessageId> =
259        scored.into_iter().take(excess).map(|(_, id)| id).collect();
260    let candidate_count = candidates_to_delete.len();
261    // MM-F4: always protect messages referenced by summaries (data-integrity invariant, #3341).
262    // `ids_to_delete` is the post-filter set; it may be smaller than `candidate_count` when
263    // summary-anchored messages are present. The return value reflects **actually soft-deleted**
264    // messages, not the original candidate count.
265    let ids_to_delete = store
266        .filter_out_preserved_episode_ids(&candidates_to_delete)
267        .await?;
268    let preserved = candidate_count - ids_to_delete.len();
269    if preserved > 0 {
270        tracing::debug!(
271            preserved,
272            deleted = ids_to_delete.len(),
273            "eviction phase 1: {preserved} candidate(s) preserved by summary anchor"
274        );
275    }
276    store.soft_delete_messages(&ids_to_delete).await?;
277
278    // Returns the number of messages actually soft-deleted (post-filter), not the candidate count.
279    Ok(ids_to_delete.len())
280}
281
282#[cfg_attr(
283    feature = "profiling",
284    tracing::instrument(name = "memory.eviction_phase2", skip_all)
285)]
286async fn run_eviction_phase2(
287    store: &SqliteStore,
288    embedding: Option<&EmbeddingStore>,
289) -> Result<usize, MemoryError> {
290    // Find all soft-deleted entries that haven't been cleaned from Qdrant yet.
291    let ids = store.get_soft_deleted_message_ids().await?;
292    if ids.is_empty() {
293        return Ok(0);
294    }
295
296    if let Some(emb) = embedding {
297        // Delete vectors before marking clean — crash-safe: if this fails, the next
298        // sweep retries (ids are still soft-deleted and not yet marked clean).
299        emb.delete_by_message_ids(&ids).await?;
300    } else {
301        tracing::debug!(
302            count = ids.len(),
303            "eviction phase 2: Qdrant disabled, cleaning SQLite bookkeeping only"
304        );
305    }
306
307    store.mark_qdrant_cleaned(&ids).await?;
308    Ok(ids.len())
309}
310
311// ── Tests ─────────────────────────────────────────────────────────────────────
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    /// Build a timestamp string for a time N seconds ago from now.
318    ///
319    /// Returns a string parseable by `parse_sqlite_timestamp_secs`.
320    fn ts_ago(seconds_ago: u64) -> String {
321        let ts = unix_now_secs().saturating_sub(seconds_ago);
322        // Convert back to "YYYY-MM-DD HH:MM:SS" using the same logic as parse_sqlite_timestamp_secs
323        let sec = ts % 60;
324        let min = (ts / 60) % 60;
325        let hour = (ts / 3600) % 24;
326        let mut total_days = ts / 86400;
327        let is_leap =
328            |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
329        let mut year = 1970u64;
330        loop {
331            let days_in_year = if is_leap(year) { 366 } else { 365 };
332            if total_days < days_in_year {
333                break;
334            }
335            total_days -= days_in_year;
336            year += 1;
337        }
338        let month_days = [
339            0u64,
340            31,
341            28 + u64::from(is_leap(year)),
342            31,
343            30,
344            31,
345            30,
346            31,
347            31,
348            30,
349            31,
350            30,
351            31,
352        ];
353        let mut month = 1u64;
354        while month <= 12 {
355            let month_idx = usize::try_from(month).unwrap_or_else(|_| unreachable!());
356            if total_days < month_days[month_idx] {
357                break;
358            }
359            total_days -= month_days[month_idx];
360            month += 1;
361        }
362        let day = total_days + 1;
363        format!("{year:04}-{month:02}-{day:02} {hour:02}:{min:02}:{sec:02}")
364    }
365
366    fn make_entry(access_count: u32, seconds_ago: u64) -> EvictionEntry {
367        let ts = ts_ago(seconds_ago);
368        EvictionEntry {
369            id: MessageId(1),
370            created_at: ts.clone(),
371            last_accessed: Some(ts),
372            access_count,
373        }
374    }
375
376    #[test]
377    fn ebbinghaus_recent_high_access_scores_near_one() {
378        let policy = EbbinghausPolicy::default();
379        // Use 1 second ago to ensure t is close to 0
380        let entry = make_entry(10, 1);
381        let score = policy.score(&entry);
382        // t = 1, n = 10, denominator = 86400 * ln(11) ≈ 207_946; exp(-1/207_946) ≈ 1.0
383        assert!(
384            score > 0.99,
385            "score should be near 1.0 for recently accessed entry, got {score}"
386        );
387    }
388
389    #[test]
390    fn ebbinghaus_old_zero_access_scores_lower() {
391        let policy = EbbinghausPolicy::default();
392        let old = make_entry(0, 7 * 24 * 3600); // 7 days ago, never accessed
393        let recent = make_entry(0, 60); // 1 minute ago
394        assert!(
395            policy.score(&old) < policy.score(&recent),
396            "old entry must score lower than recent"
397        );
398    }
399
400    #[test]
401    fn ebbinghaus_high_access_decays_slower() {
402        let policy = EbbinghausPolicy::default();
403        let low = make_entry(1, 3600); // accessed 1 hour ago, 1 time
404        let high = make_entry(20, 3600); // accessed 1 hour ago, 20 times
405        assert!(
406            policy.score(&high) > policy.score(&low),
407            "high access count should yield higher score"
408        );
409    }
410
411    #[test]
412    fn ebbinghaus_never_accessed_uses_created_at_as_reference() {
413        let policy = EbbinghausPolicy::default();
414        // An old entry (7 days ago) with last_accessed = None.
415        // Score should be the same as make_entry(0, 7 days) because both use created_at.
416        let old_with_no_last_accessed = EvictionEntry {
417            id: MessageId(2),
418            created_at: ts_ago(7 * 24 * 3600),
419            last_accessed: None,
420            access_count: 0,
421        };
422        let old_with_same_last_accessed = make_entry(0, 7 * 24 * 3600);
423        let score_no_access = policy.score(&old_with_no_last_accessed);
424        let score_same = policy.score(&old_with_same_last_accessed);
425        // Both reference the same time; scores should be approximately equal
426        let diff = (score_no_access - score_same).abs();
427        assert!(diff < 1e-6, "scores should match; diff = {diff}");
428    }
429
430    #[test]
431    fn eviction_config_default_is_disabled() {
432        let config = EvictionConfig::default();
433        assert_eq!(
434            config.max_entries, 0,
435            "eviction must be disabled by default"
436        );
437    }
438
439    // ── MM-F4: eviction preserves summary-anchored messages ───────────────────
440
441    #[tokio::test]
442    async fn test_eviction_preserves_summary_anchored_messages() {
443        use crate::store::SqliteStore;
444
445        let store = SqliteStore::new(":memory:").await.unwrap();
446        let cid = store.create_conversation().await.unwrap();
447
448        // Insert 6 messages (max_entries=3 → 3 excess candidates).
449        let ids: Vec<_> = (0..6)
450            .map(|_| async { store.save_message(cid, "user", "msg").await.unwrap() })
451            .collect();
452        let mut msg_ids = Vec::new();
453        for f in ids {
454            msg_ids.push(f.await);
455        }
456
457        // Anchor messages 1–3 inside a summary range.
458        store
459            .save_summary(cid, "summary", Some(msg_ids[0]), Some(msg_ids[2]), 30)
460            .await
461            .unwrap();
462
463        let policy = EbbinghausPolicy::default();
464        // Trigger eviction: 6 messages, max_entries=3.
465        let deleted = run_eviction_phase1(&store, &policy, 3).await.unwrap();
466
467        // At most 3 were eligible for deletion; summary-anchored (0–2) must survive.
468        // Only messages 3–5 (outside summary range) may be deleted.
469        assert!(
470            deleted <= 3,
471            "at most 3 messages can be deleted, got {deleted}"
472        );
473
474        for &anchored in &msg_ids[0..=2] {
475            let is_deleted: Option<String> =
476                sqlx::query_scalar("SELECT deleted_at FROM messages WHERE id = ?")
477                    .bind(anchored)
478                    .fetch_one(store.pool())
479                    .await
480                    .unwrap();
481            assert!(
482                is_deleted.is_none(),
483                "summary-anchored message {anchored:?} must not be soft-deleted"
484            );
485        }
486    }
487
488    #[test]
489    fn parse_sqlite_timestamp_known_value() {
490        // 2024-01-01 00:00:00 UTC
491        let ts = parse_sqlite_timestamp_secs("2024-01-01 00:00:00").unwrap();
492        // Days from 1970 to 2024: 54 years, roughly
493        // Reference: 2024-01-01 00:00:00 UTC = 1704067200
494        assert_eq!(
495            ts, 1_704_067_200,
496            "2024-01-01 must parse to known timestamp"
497        );
498    }
499
500    // ── Phase-2 Qdrant cleanup tests ─────────────────────────────────────────
501
502    /// Build a test `EmbeddingStore` backed by an in-memory vector store and a
503    /// fresh `SQLite` database. Returns both so the caller can manipulate `SQLite` directly.
504    async fn setup_embedding_store() -> (EmbeddingStore, crate::store::SqliteStore) {
505        let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
506        let pool = sqlite.pool().clone();
507        let mem_store = Box::new(crate::in_memory_store::InMemoryVectorStore::new());
508        let emb = EmbeddingStore::with_store(mem_store, pool);
509        emb.ensure_collection(4).await.unwrap();
510        (emb, sqlite)
511    }
512
513    /// Seed a message, store a vector, and soft-delete the message. Returns `(MessageId, point_id)`.
514    async fn seed_soft_deleted(
515        store: &crate::store::SqliteStore,
516        emb: &EmbeddingStore,
517    ) -> (MessageId, String) {
518        let cid = store.create_conversation().await.unwrap();
519        let msg_id = store.save_message(cid, "user", "hello").await.unwrap();
520
521        let point_id = emb
522            .store(
523                msg_id,
524                cid,
525                "user",
526                vec![1.0, 0.0, 0.0, 0.0],
527                crate::embedding_store::MessageKind::Regular,
528                "test",
529                0,
530            )
531            .await
532            .unwrap();
533
534        // Soft-delete the message so it appears in `get_soft_deleted_message_ids`.
535        store.soft_delete_messages(&[msg_id]).await.unwrap();
536
537        (msg_id, point_id)
538    }
539
540    /// Phase 2 with an embedding store: must delete vectors before marking clean.
541    #[tokio::test]
542    async fn eviction_phase2_calls_delete_before_mark_clean() {
543        let (emb, store) = setup_embedding_store().await;
544        let (msg_id, _point_id) = seed_soft_deleted(&store, &emb).await;
545
546        // Phase 2 should succeed, delete the vector, and mark the row clean.
547        let cleaned = run_eviction_phase2(&store, Some(&emb)).await.unwrap();
548        assert_eq!(cleaned, 1, "one message should be cleaned");
549
550        // After phase 2 the message must no longer appear in the pending cleanup list.
551        let remaining = store.get_soft_deleted_message_ids().await.unwrap();
552        assert!(
553            !remaining.contains(&msg_id),
554            "message must not remain in soft-deleted list after phase 2"
555        );
556    }
557
558    /// Phase 2 without an embedding store: `SQLite` bookkeeping still runs; no Qdrant call.
559    #[tokio::test]
560    async fn eviction_phase2_skips_delete_when_no_embedding_store() {
561        let (_, store) = setup_embedding_store().await;
562        let cid = store.create_conversation().await.unwrap();
563        let msg_id = store.save_message(cid, "user", "hello").await.unwrap();
564        store.soft_delete_messages(&[msg_id]).await.unwrap();
565
566        // No embedding store — must still mark rows clean.
567        let cleaned = run_eviction_phase2(&store, None).await.unwrap();
568        assert_eq!(
569            cleaned, 1,
570            "row must be cleaned even without embedding store"
571        );
572
573        let remaining = store.get_soft_deleted_message_ids().await.unwrap();
574        assert!(
575            !remaining.contains(&msg_id),
576            "message must not remain in soft-deleted list"
577        );
578    }
579
580    /// Phase 2 returns `Ok(0)` when there are no soft-deleted messages.
581    #[tokio::test]
582    async fn eviction_phase2_empty_returns_zero() {
583        let (emb, store) = setup_embedding_store().await;
584        let cleaned = run_eviction_phase2(&store, Some(&emb)).await.unwrap();
585        assert_eq!(cleaned, 0, "no soft-deleted messages → 0 cleaned");
586    }
587}