Skip to main content

zeph_memory/
eviction.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Memory eviction subsystem.
5//!
6//! Provides a trait-based eviction policy framework with an Ebbinghaus
7//! forgetting curve implementation. The background sweep loop runs
8//! periodically, scoring entries and soft-deleting the lowest-scoring ones
9//! from `SQLite` before removing their `Qdrant` vectors in a second phase.
10//!
11//! Two-phase design ensures crash safety: soft-deleted `SQLite` rows are
12//! invisible to the application immediately, and `Qdrant` cleanup is retried
13//! on the next sweep if the agent crashes between phases.
14
15use std::sync::Arc;
16
17use tokio::time::{Duration, interval};
18use tokio_util::sync::CancellationToken;
19
20use crate::error::MemoryError;
21use crate::store::SqliteStore;
22use crate::types::MessageId;
23
24// ── Public types ──────────────────────────────────────────────────────────────
25
26/// Metadata for a single memory entry evaluated by [`EvictionPolicy::score`].
27#[derive(Debug, Clone)]
28pub struct EvictionEntry {
29    /// `SQLite` row ID of the message.
30    pub id: MessageId,
31    /// ISO 8601 creation timestamp (TEXT column from `SQLite`, UTC).
32    pub created_at: String,
33    /// ISO 8601 last-accessed timestamp, or `None` if never accessed after creation.
34    pub last_accessed: Option<String>,
35    /// Number of times this message has been retrieved via recall.
36    pub access_count: u32,
37}
38
39/// Trait for eviction scoring strategies.
40///
41/// Implementations must be `Send + Sync` so they can be shared across threads.
42pub trait EvictionPolicy: Send + Sync {
43    /// Compute a retention score for the given entry.
44    ///
45    /// Higher scores mean the entry is more likely to be retained.
46    /// Lower scores mean the entry is a candidate for eviction.
47    fn score(&self, entry: &EvictionEntry) -> f64;
48}
49
50/// Configuration for the eviction subsystem.
51#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
52#[serde(default)]
53pub struct EvictionConfig {
54    /// Policy name. Currently only `"ebbinghaus"` is supported.
55    pub policy: String,
56    /// Maximum number of entries to retain. `0` means unlimited (eviction disabled).
57    pub max_entries: usize,
58    /// How often to run the eviction sweep, in seconds.
59    pub sweep_interval_secs: u64,
60}
61
62impl Default for EvictionConfig {
63    fn default() -> Self {
64        Self {
65            policy: "ebbinghaus".to_owned(),
66            max_entries: 0,
67            sweep_interval_secs: 3600,
68        }
69    }
70}
71
72// ── Ebbinghaus policy ─────────────────────────────────────────────────────────
73
74/// Ebbinghaus forgetting curve eviction policy.
75///
76/// Score formula:
77///   `score = exp(-t / (S * ln(1 + n)))`
78///
79/// Where:
80/// - `t` = seconds since `last_accessed` (or `created_at` if never accessed)
81/// - `S` = `retention_strength` (higher = slower decay)
82/// - `n` = `access_count`
83///
84/// Entries with a high access count or recent access get higher scores
85/// and are less likely to be evicted.
86pub struct EbbinghausPolicy {
87    retention_strength: f64,
88}
89
90impl EbbinghausPolicy {
91    /// Create a new policy with the given retention strength.
92    ///
93    /// A good default is `86400.0` (one day in seconds).
94    #[must_use]
95    pub fn new(retention_strength: f64) -> Self {
96        Self { retention_strength }
97    }
98}
99
100impl Default for EbbinghausPolicy {
101    fn default() -> Self {
102        Self::new(86_400.0) // 1 day
103    }
104}
105
106impl EvictionPolicy for EbbinghausPolicy {
107    fn score(&self, entry: &EvictionEntry) -> f64 {
108        let now_secs = unix_now_secs();
109
110        let reference_secs = entry
111            .last_accessed
112            .as_deref()
113            .and_then(parse_sqlite_timestamp_secs)
114            .unwrap_or_else(|| parse_sqlite_timestamp_secs(&entry.created_at).unwrap_or(now_secs));
115
116        // Clamp t >= 0 to handle clock skew or future timestamps.
117        #[allow(clippy::cast_precision_loss)]
118        let t = now_secs.saturating_sub(reference_secs) as f64;
119        let n = f64::from(entry.access_count);
120
121        // ln(1 + 0) = 0 which would divide by zero — use 1.0 as minimum denominator.
122        let denominator = (self.retention_strength * (1.0_f64 + n).ln()).max(1.0);
123        (-t / denominator).exp()
124    }
125}
126
127fn unix_now_secs() -> u64 {
128    std::time::SystemTime::now()
129        .duration_since(std::time::UNIX_EPOCH)
130        .map_or(0, |d| d.as_secs())
131}
132
133/// Parse a `SQLite` TEXT timestamp ("YYYY-MM-DD HH:MM:SS") into Unix seconds.
134///
135/// Does not use `chrono` to avoid adding a dependency to `zeph-memory`.
136fn parse_sqlite_timestamp_secs(s: &str) -> Option<u64> {
137    // Expected format: "YYYY-MM-DD HH:MM:SS"
138    let s = s.trim();
139    if s.len() < 19 {
140        return None;
141    }
142    let year: u64 = s[0..4].parse().ok()?;
143    let month: u64 = s[5..7].parse().ok()?;
144    let day: u64 = s[8..10].parse().ok()?;
145    let hour: u64 = s[11..13].parse().ok()?;
146    let min: u64 = s[14..16].parse().ok()?;
147    let sec: u64 = s[17..19].parse().ok()?;
148
149    // Days since Unix epoch (1970-01-01). Simple but accurate for years 1970-2099.
150    // Leap year calculation: divisible by 4 and not 100, or divisible by 400.
151    let is_leap = |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
152    let days_in_month = |y: u64, m: u64| -> u64 {
153        match m {
154            1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
155            4 | 6 | 9 | 11 => 30,
156            2 => {
157                if is_leap(y) {
158                    29
159                } else {
160                    28
161                }
162            }
163            _ => 0,
164        }
165    };
166
167    let mut days: u64 = 0;
168    for y in 1970..year {
169        days += if is_leap(y) { 366 } else { 365 };
170    }
171    for m in 1..month {
172        days += days_in_month(year, m);
173    }
174    days += day.saturating_sub(1);
175
176    Some(days * 86400 + hour * 3600 + min * 60 + sec)
177}
178
179// ── Sweep loop ────────────────────────────────────────────────────────────────
180
181/// Start the background eviction loop.
182///
183/// The loop runs every `config.sweep_interval_secs` seconds. Each iteration:
184/// 1. Queries `SQLite` for all non-deleted entries and their eviction metadata.
185/// 2. Scores each entry using `policy`.
186/// 3. If the count exceeds `config.max_entries`, soft-deletes the excess lowest-scoring rows.
187/// 4. Queries for all soft-deleted rows and attempts to remove their Qdrant vectors.
188///    If Qdrant removal fails, it is retried on the next sweep cycle.
189///
190/// If `config.max_entries == 0`, the loop exits immediately without doing anything.
191///
192/// # Errors (non-fatal)
193///
194/// Database and Qdrant errors are logged but do not stop the loop.
195pub async fn start_eviction_loop(
196    store: Arc<SqliteStore>,
197    config: EvictionConfig,
198    policy: Arc<dyn EvictionPolicy + 'static>,
199    cancel: CancellationToken,
200) {
201    if config.max_entries == 0 {
202        tracing::debug!("eviction disabled (max_entries = 0)");
203        return;
204    }
205
206    let mut ticker = interval(Duration::from_secs(config.sweep_interval_secs));
207    // Skip the first immediate tick so the loop doesn't run at startup.
208    ticker.tick().await;
209
210    loop {
211        tokio::select! {
212            () = cancel.cancelled() => {
213                tracing::debug!("eviction loop shutting down");
214                return;
215            }
216            _ = ticker.tick() => {}
217        }
218
219        tracing::debug!(max_entries = config.max_entries, "running eviction sweep");
220
221        // Phase 1: score and soft-delete excess entries.
222        match run_eviction_phase1(&store, &*policy, config.max_entries).await {
223            Ok(deleted) => {
224                if deleted > 0 {
225                    tracing::info!(deleted, "eviction phase 1: soft-deleted entries");
226                }
227            }
228            Err(e) => {
229                tracing::warn!(error = %e, "eviction phase 1 failed, will retry next sweep");
230            }
231        }
232
233        // Phase 2: clean up soft-deleted entries from Qdrant.
234        // On startup or after a crash, this also cleans up any orphaned vectors.
235        match run_eviction_phase2(&store).await {
236            Ok(cleaned) => {
237                if cleaned > 0 {
238                    tracing::info!(cleaned, "eviction phase 2: removed Qdrant vectors");
239                }
240            }
241            Err(e) => {
242                tracing::warn!(error = %e, "eviction phase 2 failed, will retry next sweep");
243            }
244        }
245    }
246}
247
248#[cfg_attr(
249    feature = "profiling",
250    tracing::instrument(name = "memory.eviction_phase1", skip_all)
251)]
252async fn run_eviction_phase1(
253    store: &SqliteStore,
254    policy: &dyn EvictionPolicy,
255    max_entries: usize,
256) -> Result<usize, MemoryError> {
257    let candidates = store.get_eviction_candidates().await?;
258    let total = candidates.len();
259
260    if total <= max_entries {
261        return Ok(0);
262    }
263
264    let excess = total - max_entries;
265    let mut scored: Vec<(f64, MessageId)> = candidates
266        .into_iter()
267        .map(|e| (policy.score(&e), e.id))
268        .collect();
269
270    // Sort ascending by score — lowest scores (most forgettable) first.
271    scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
272
273    let candidates_to_delete: Vec<MessageId> =
274        scored.into_iter().take(excess).map(|(_, id)| id).collect();
275    let candidate_count = candidates_to_delete.len();
276    // MM-F4: always protect messages referenced by summaries (data-integrity invariant, #3341).
277    // `ids_to_delete` is the post-filter set; it may be smaller than `candidate_count` when
278    // summary-anchored messages are present. The return value reflects **actually soft-deleted**
279    // messages, not the original candidate count.
280    let ids_to_delete = store
281        .filter_out_preserved_episode_ids(&candidates_to_delete)
282        .await?;
283    let preserved = candidate_count - ids_to_delete.len();
284    if preserved > 0 {
285        tracing::debug!(
286            preserved,
287            deleted = ids_to_delete.len(),
288            "eviction phase 1: {preserved} candidate(s) preserved by summary anchor"
289        );
290    }
291    store.soft_delete_messages(&ids_to_delete).await?;
292
293    // Returns the number of messages actually soft-deleted (post-filter), not the candidate count.
294    Ok(ids_to_delete.len())
295}
296
297#[cfg_attr(
298    feature = "profiling",
299    tracing::instrument(name = "memory.eviction_phase2", skip_all)
300)]
301async fn run_eviction_phase2(store: &SqliteStore) -> Result<usize, MemoryError> {
302    // Find all soft-deleted entries that haven't been cleaned from Qdrant yet.
303    let ids = store.get_soft_deleted_message_ids().await?;
304    if ids.is_empty() {
305        return Ok(0);
306    }
307
308    // TODO: call Qdrant delete-vectors API here before marking as cleaned.
309    // The embedding_store handles vector lifecycle separately; when that API
310    // is wired in, the call should happen here and mark_qdrant_cleaned should
311    // only be called on success. Tracked in issue: phase-2 Qdrant cleanup.
312    tracing::warn!(
313        count = ids.len(),
314        "eviction phase 2: Qdrant vector removal not yet wired — marking cleaned without actual deletion (MVP)"
315    );
316
317    // Mark as cleaned after the (future) Qdrant call succeeds. For now this
318    // prevents infinite retries on every sweep cycle.
319    store.mark_qdrant_cleaned(&ids).await?;
320    Ok(ids.len())
321}
322
323// ── Tests ─────────────────────────────────────────────────────────────────────
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328
329    /// Build a timestamp string for a time N seconds ago from now.
330    ///
331    /// Returns a string parseable by `parse_sqlite_timestamp_secs`.
332    fn ts_ago(seconds_ago: u64) -> String {
333        let ts = unix_now_secs().saturating_sub(seconds_ago);
334        // Convert back to "YYYY-MM-DD HH:MM:SS" using the same logic as parse_sqlite_timestamp_secs
335        let sec = ts % 60;
336        let min = (ts / 60) % 60;
337        let hour = (ts / 3600) % 24;
338        let mut total_days = ts / 86400;
339        let is_leap =
340            |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
341        let mut year = 1970u64;
342        loop {
343            let days_in_year = if is_leap(year) { 366 } else { 365 };
344            if total_days < days_in_year {
345                break;
346            }
347            total_days -= days_in_year;
348            year += 1;
349        }
350        let month_days = [
351            0u64,
352            31,
353            28 + u64::from(is_leap(year)),
354            31,
355            30,
356            31,
357            30,
358            31,
359            31,
360            30,
361            31,
362            30,
363            31,
364        ];
365        let mut month = 1u64;
366        while month <= 12 {
367            let month_idx = usize::try_from(month).unwrap_or_else(|_| unreachable!());
368            if total_days < month_days[month_idx] {
369                break;
370            }
371            total_days -= month_days[month_idx];
372            month += 1;
373        }
374        let day = total_days + 1;
375        format!("{year:04}-{month:02}-{day:02} {hour:02}:{min:02}:{sec:02}")
376    }
377
378    fn make_entry(access_count: u32, seconds_ago: u64) -> EvictionEntry {
379        let ts = ts_ago(seconds_ago);
380        EvictionEntry {
381            id: MessageId(1),
382            created_at: ts.clone(),
383            last_accessed: Some(ts),
384            access_count,
385        }
386    }
387
388    #[test]
389    fn ebbinghaus_recent_high_access_scores_near_one() {
390        let policy = EbbinghausPolicy::default();
391        // Use 1 second ago to ensure t is close to 0
392        let entry = make_entry(10, 1);
393        let score = policy.score(&entry);
394        // t = 1, n = 10, denominator = 86400 * ln(11) ≈ 207_946; exp(-1/207_946) ≈ 1.0
395        assert!(
396            score > 0.99,
397            "score should be near 1.0 for recently accessed entry, got {score}"
398        );
399    }
400
401    #[test]
402    fn ebbinghaus_old_zero_access_scores_lower() {
403        let policy = EbbinghausPolicy::default();
404        let old = make_entry(0, 7 * 24 * 3600); // 7 days ago, never accessed
405        let recent = make_entry(0, 60); // 1 minute ago
406        assert!(
407            policy.score(&old) < policy.score(&recent),
408            "old entry must score lower than recent"
409        );
410    }
411
412    #[test]
413    fn ebbinghaus_high_access_decays_slower() {
414        let policy = EbbinghausPolicy::default();
415        let low = make_entry(1, 3600); // accessed 1 hour ago, 1 time
416        let high = make_entry(20, 3600); // accessed 1 hour ago, 20 times
417        assert!(
418            policy.score(&high) > policy.score(&low),
419            "high access count should yield higher score"
420        );
421    }
422
423    #[test]
424    fn ebbinghaus_never_accessed_uses_created_at_as_reference() {
425        let policy = EbbinghausPolicy::default();
426        // An old entry (7 days ago) with last_accessed = None.
427        // Score should be the same as make_entry(0, 7 days) because both use created_at.
428        let old_with_no_last_accessed = EvictionEntry {
429            id: MessageId(2),
430            created_at: ts_ago(7 * 24 * 3600),
431            last_accessed: None,
432            access_count: 0,
433        };
434        let old_with_same_last_accessed = make_entry(0, 7 * 24 * 3600);
435        let score_no_access = policy.score(&old_with_no_last_accessed);
436        let score_same = policy.score(&old_with_same_last_accessed);
437        // Both reference the same time; scores should be approximately equal
438        let diff = (score_no_access - score_same).abs();
439        assert!(diff < 1e-6, "scores should match; diff = {diff}");
440    }
441
442    #[test]
443    fn eviction_config_default_is_disabled() {
444        let config = EvictionConfig::default();
445        assert_eq!(
446            config.max_entries, 0,
447            "eviction must be disabled by default"
448        );
449    }
450
451    // ── MM-F4: eviction preserves summary-anchored messages ───────────────────
452
453    #[tokio::test]
454    async fn test_eviction_preserves_summary_anchored_messages() {
455        use crate::store::SqliteStore;
456
457        let store = SqliteStore::new(":memory:").await.unwrap();
458        let cid = store.create_conversation().await.unwrap();
459
460        // Insert 6 messages (max_entries=3 → 3 excess candidates).
461        let ids: Vec<_> = (0..6)
462            .map(|_| async { store.save_message(cid, "user", "msg").await.unwrap() })
463            .collect();
464        let mut msg_ids = Vec::new();
465        for f in ids {
466            msg_ids.push(f.await);
467        }
468
469        // Anchor messages 1–3 inside a summary range.
470        store
471            .save_summary(cid, "summary", Some(msg_ids[0]), Some(msg_ids[2]), 30)
472            .await
473            .unwrap();
474
475        let policy = EbbinghausPolicy::default();
476        // Trigger eviction: 6 messages, max_entries=3.
477        let deleted = run_eviction_phase1(&store, &policy, 3).await.unwrap();
478
479        // At most 3 were eligible for deletion; summary-anchored (0–2) must survive.
480        // Only messages 3–5 (outside summary range) may be deleted.
481        assert!(
482            deleted <= 3,
483            "at most 3 messages can be deleted, got {deleted}"
484        );
485
486        for &anchored in &msg_ids[0..=2] {
487            let is_deleted: Option<String> =
488                sqlx::query_scalar("SELECT deleted_at FROM messages WHERE id = ?")
489                    .bind(anchored)
490                    .fetch_one(store.pool())
491                    .await
492                    .unwrap();
493            assert!(
494                is_deleted.is_none(),
495                "summary-anchored message {anchored:?} must not be soft-deleted"
496            );
497        }
498    }
499
500    #[test]
501    fn parse_sqlite_timestamp_known_value() {
502        // 2024-01-01 00:00:00 UTC
503        let ts = parse_sqlite_timestamp_secs("2024-01-01 00:00:00").unwrap();
504        // Days from 1970 to 2024: 54 years, roughly
505        // Reference: 2024-01-01 00:00:00 UTC = 1704067200
506        assert_eq!(
507            ts, 1_704_067_200,
508            "2024-01-01 must parse to known timestamp"
509        );
510    }
511}