1use std::sync::Arc;
16
17use tokio::time::{Duration, interval};
18use tokio_util::sync::CancellationToken;
19
20use crate::embedding_store::EmbeddingStore;
21use crate::error::MemoryError;
22use crate::store::SqliteStore;
23use crate::types::MessageId;
24
25#[derive(Debug, Clone)]
29pub struct EvictionEntry {
30 pub id: MessageId,
32 pub created_at: String,
34 pub last_accessed: Option<String>,
36 pub access_count: u32,
38}
39
40pub trait EvictionPolicy: Send + Sync {
44 fn score(&self, entry: &EvictionEntry) -> f64;
49}
50
51use zeph_config::memory::EvictionConfig;
52
53pub struct EbbinghausPolicy {
68 retention_strength: f64,
69}
70
71impl EbbinghausPolicy {
72 #[must_use]
76 pub fn new(retention_strength: f64) -> Self {
77 Self { retention_strength }
78 }
79}
80
81impl Default for EbbinghausPolicy {
82 fn default() -> Self {
83 Self::new(86_400.0) }
85}
86
87impl EvictionPolicy for EbbinghausPolicy {
88 fn score(&self, entry: &EvictionEntry) -> f64 {
89 let now_secs = unix_now_secs();
90
91 let reference_secs = entry
92 .last_accessed
93 .as_deref()
94 .and_then(parse_sqlite_timestamp_secs)
95 .unwrap_or_else(|| parse_sqlite_timestamp_secs(&entry.created_at).unwrap_or(now_secs));
96
97 #[allow(clippy::cast_precision_loss)]
99 let t = now_secs.saturating_sub(reference_secs) as f64;
100 let n = f64::from(entry.access_count);
101
102 let denominator = (self.retention_strength * (1.0_f64 + n).ln()).max(1.0);
104 (-t / denominator).exp()
105 }
106}
107
108fn unix_now_secs() -> u64 {
109 std::time::SystemTime::now()
110 .duration_since(std::time::UNIX_EPOCH)
111 .map_or(0, |d| d.as_secs())
112}
113
114fn parse_sqlite_timestamp_secs(s: &str) -> Option<u64> {
118 let s = s.trim();
120 if s.len() < 19 {
121 return None;
122 }
123 let year: u64 = s[0..4].parse().ok()?;
124 let month: u64 = s[5..7].parse().ok()?;
125 let day: u64 = s[8..10].parse().ok()?;
126 let hour: u64 = s[11..13].parse().ok()?;
127 let min: u64 = s[14..16].parse().ok()?;
128 let sec: u64 = s[17..19].parse().ok()?;
129
130 let is_leap = |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
133 let days_in_month = |y: u64, m: u64| -> u64 {
134 match m {
135 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
136 4 | 6 | 9 | 11 => 30,
137 2 => {
138 if is_leap(y) {
139 29
140 } else {
141 28
142 }
143 }
144 _ => 0,
145 }
146 };
147
148 let mut days: u64 = 0;
149 for y in 1970..year {
150 days += if is_leap(y) { 366 } else { 365 };
151 }
152 for m in 1..month {
153 days += days_in_month(year, m);
154 }
155 days += day.saturating_sub(1);
156
157 Some(days * 86400 + hour * 3600 + min * 60 + sec)
158}
159
160pub async fn start_eviction_loop(
180 store: Arc<SqliteStore>,
181 embedding: Option<Arc<EmbeddingStore>>,
182 config: EvictionConfig,
183 policy: Arc<dyn EvictionPolicy + 'static>,
184 cancel: CancellationToken,
185) {
186 if config.max_entries == 0 {
187 tracing::debug!("eviction disabled (max_entries = 0)");
188 return;
189 }
190
191 let mut ticker = interval(Duration::from_secs(config.sweep_interval_secs));
192 ticker.tick().await;
194
195 loop {
196 tokio::select! {
197 () = cancel.cancelled() => {
198 tracing::debug!("eviction loop shutting down");
199 return;
200 }
201 _ = ticker.tick() => {}
202 }
203
204 tracing::debug!(max_entries = config.max_entries, "running eviction sweep");
205
206 match run_eviction_phase1(&store, &*policy, config.max_entries).await {
208 Ok(deleted) => {
209 if deleted > 0 {
210 tracing::info!(deleted, "eviction phase 1: soft-deleted entries");
211 }
212 }
213 Err(e) => {
214 tracing::warn!(error = %e, "eviction phase 1 failed, will retry next sweep");
215 }
216 }
217
218 match run_eviction_phase2(&store, embedding.as_deref()).await {
221 Ok(cleaned) => {
222 if cleaned > 0 {
223 tracing::info!(cleaned, "eviction phase 2: removed Qdrant vectors");
224 }
225 }
226 Err(e) => {
227 tracing::warn!(error = %e, "eviction phase 2 failed, will retry next sweep");
228 }
229 }
230 }
231}
232
233#[cfg_attr(
234 feature = "profiling",
235 tracing::instrument(name = "memory.eviction_phase1", skip_all)
236)]
237async fn run_eviction_phase1(
238 store: &SqliteStore,
239 policy: &dyn EvictionPolicy,
240 max_entries: usize,
241) -> Result<usize, MemoryError> {
242 let candidates = store.get_eviction_candidates().await?;
243 let total = candidates.len();
244
245 if total <= max_entries {
246 return Ok(0);
247 }
248
249 let excess = total - max_entries;
250 let mut scored: Vec<(f64, MessageId)> = candidates
251 .into_iter()
252 .map(|e| (policy.score(&e), e.id))
253 .collect();
254
255 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
257
258 let candidates_to_delete: Vec<MessageId> =
259 scored.into_iter().take(excess).map(|(_, id)| id).collect();
260 let candidate_count = candidates_to_delete.len();
261 let ids_to_delete = store
266 .filter_out_preserved_episode_ids(&candidates_to_delete)
267 .await?;
268 let preserved = candidate_count - ids_to_delete.len();
269 if preserved > 0 {
270 tracing::debug!(
271 preserved,
272 deleted = ids_to_delete.len(),
273 "eviction phase 1: {preserved} candidate(s) preserved by summary anchor"
274 );
275 }
276 store.soft_delete_messages(&ids_to_delete).await?;
277
278 Ok(ids_to_delete.len())
280}
281
282#[cfg_attr(
283 feature = "profiling",
284 tracing::instrument(name = "memory.eviction_phase2", skip_all)
285)]
286async fn run_eviction_phase2(
287 store: &SqliteStore,
288 embedding: Option<&EmbeddingStore>,
289) -> Result<usize, MemoryError> {
290 let ids = store.get_soft_deleted_message_ids().await?;
292 if ids.is_empty() {
293 return Ok(0);
294 }
295
296 if let Some(emb) = embedding {
297 emb.delete_by_message_ids(&ids).await?;
300 } else {
301 tracing::debug!(
302 count = ids.len(),
303 "eviction phase 2: Qdrant disabled, cleaning SQLite bookkeeping only"
304 );
305 }
306
307 store.mark_qdrant_cleaned(&ids).await?;
308 Ok(ids.len())
309}
310
311#[cfg(test)]
314mod tests {
315 use super::*;
316
317 fn ts_ago(seconds_ago: u64) -> String {
321 let ts = unix_now_secs().saturating_sub(seconds_ago);
322 let sec = ts % 60;
324 let min = (ts / 60) % 60;
325 let hour = (ts / 3600) % 24;
326 let mut total_days = ts / 86400;
327 let is_leap =
328 |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
329 let mut year = 1970u64;
330 loop {
331 let days_in_year = if is_leap(year) { 366 } else { 365 };
332 if total_days < days_in_year {
333 break;
334 }
335 total_days -= days_in_year;
336 year += 1;
337 }
338 let month_days = [
339 0u64,
340 31,
341 28 + u64::from(is_leap(year)),
342 31,
343 30,
344 31,
345 30,
346 31,
347 31,
348 30,
349 31,
350 30,
351 31,
352 ];
353 let mut month = 1u64;
354 while month <= 12 {
355 let month_idx = usize::try_from(month).unwrap_or_else(|_| unreachable!());
356 if total_days < month_days[month_idx] {
357 break;
358 }
359 total_days -= month_days[month_idx];
360 month += 1;
361 }
362 let day = total_days + 1;
363 format!("{year:04}-{month:02}-{day:02} {hour:02}:{min:02}:{sec:02}")
364 }
365
366 fn make_entry(access_count: u32, seconds_ago: u64) -> EvictionEntry {
367 let ts = ts_ago(seconds_ago);
368 EvictionEntry {
369 id: MessageId(1),
370 created_at: ts.clone(),
371 last_accessed: Some(ts),
372 access_count,
373 }
374 }
375
376 #[test]
377 fn ebbinghaus_recent_high_access_scores_near_one() {
378 let policy = EbbinghausPolicy::default();
379 let entry = make_entry(10, 1);
381 let score = policy.score(&entry);
382 assert!(
384 score > 0.99,
385 "score should be near 1.0 for recently accessed entry, got {score}"
386 );
387 }
388
389 #[test]
390 fn ebbinghaus_old_zero_access_scores_lower() {
391 let policy = EbbinghausPolicy::default();
392 let old = make_entry(0, 7 * 24 * 3600); let recent = make_entry(0, 60); assert!(
395 policy.score(&old) < policy.score(&recent),
396 "old entry must score lower than recent"
397 );
398 }
399
400 #[test]
401 fn ebbinghaus_high_access_decays_slower() {
402 let policy = EbbinghausPolicy::default();
403 let low = make_entry(1, 3600); let high = make_entry(20, 3600); assert!(
406 policy.score(&high) > policy.score(&low),
407 "high access count should yield higher score"
408 );
409 }
410
411 #[test]
412 fn ebbinghaus_never_accessed_uses_created_at_as_reference() {
413 let policy = EbbinghausPolicy::default();
414 let old_with_no_last_accessed = EvictionEntry {
417 id: MessageId(2),
418 created_at: ts_ago(7 * 24 * 3600),
419 last_accessed: None,
420 access_count: 0,
421 };
422 let old_with_same_last_accessed = make_entry(0, 7 * 24 * 3600);
423 let score_no_access = policy.score(&old_with_no_last_accessed);
424 let score_same = policy.score(&old_with_same_last_accessed);
425 let diff = (score_no_access - score_same).abs();
427 assert!(diff < 1e-6, "scores should match; diff = {diff}");
428 }
429
430 #[test]
431 fn eviction_config_default_is_disabled() {
432 let config = EvictionConfig::default();
433 assert_eq!(
434 config.max_entries, 0,
435 "eviction must be disabled by default"
436 );
437 }
438
439 #[tokio::test]
442 async fn test_eviction_preserves_summary_anchored_messages() {
443 use crate::store::SqliteStore;
444
445 let store = SqliteStore::new(":memory:").await.unwrap();
446 let cid = store.create_conversation().await.unwrap();
447
448 let ids: Vec<_> = (0..6)
450 .map(|_| async { store.save_message(cid, "user", "msg").await.unwrap() })
451 .collect();
452 let mut msg_ids = Vec::new();
453 for f in ids {
454 msg_ids.push(f.await);
455 }
456
457 store
459 .save_summary(cid, "summary", Some(msg_ids[0]), Some(msg_ids[2]), 30)
460 .await
461 .unwrap();
462
463 let policy = EbbinghausPolicy::default();
464 let deleted = run_eviction_phase1(&store, &policy, 3).await.unwrap();
466
467 assert!(
470 deleted <= 3,
471 "at most 3 messages can be deleted, got {deleted}"
472 );
473
474 for &anchored in &msg_ids[0..=2] {
475 let is_deleted: Option<String> =
476 sqlx::query_scalar("SELECT deleted_at FROM messages WHERE id = ?")
477 .bind(anchored)
478 .fetch_one(store.pool())
479 .await
480 .unwrap();
481 assert!(
482 is_deleted.is_none(),
483 "summary-anchored message {anchored:?} must not be soft-deleted"
484 );
485 }
486 }
487
488 #[test]
489 fn parse_sqlite_timestamp_known_value() {
490 let ts = parse_sqlite_timestamp_secs("2024-01-01 00:00:00").unwrap();
492 assert_eq!(
495 ts, 1_704_067_200,
496 "2024-01-01 must parse to known timestamp"
497 );
498 }
499
500 async fn setup_embedding_store() -> (EmbeddingStore, crate::store::SqliteStore) {
505 let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
506 let pool = sqlite.pool().clone();
507 let mem_store = Box::new(crate::in_memory_store::InMemoryVectorStore::new());
508 let emb = EmbeddingStore::with_store(mem_store, pool);
509 emb.ensure_collection(4).await.unwrap();
510 (emb, sqlite)
511 }
512
513 async fn seed_soft_deleted(
515 store: &crate::store::SqliteStore,
516 emb: &EmbeddingStore,
517 ) -> (MessageId, String) {
518 let cid = store.create_conversation().await.unwrap();
519 let msg_id = store.save_message(cid, "user", "hello").await.unwrap();
520
521 let point_id = emb
522 .store(
523 msg_id,
524 cid,
525 "user",
526 vec![1.0, 0.0, 0.0, 0.0],
527 crate::embedding_store::MessageKind::Regular,
528 "test",
529 0,
530 )
531 .await
532 .unwrap();
533
534 store.soft_delete_messages(&[msg_id]).await.unwrap();
536
537 (msg_id, point_id)
538 }
539
540 #[tokio::test]
542 async fn eviction_phase2_calls_delete_before_mark_clean() {
543 let (emb, store) = setup_embedding_store().await;
544 let (msg_id, _point_id) = seed_soft_deleted(&store, &emb).await;
545
546 let cleaned = run_eviction_phase2(&store, Some(&emb)).await.unwrap();
548 assert_eq!(cleaned, 1, "one message should be cleaned");
549
550 let remaining = store.get_soft_deleted_message_ids().await.unwrap();
552 assert!(
553 !remaining.contains(&msg_id),
554 "message must not remain in soft-deleted list after phase 2"
555 );
556 }
557
558 #[tokio::test]
560 async fn eviction_phase2_skips_delete_when_no_embedding_store() {
561 let (_, store) = setup_embedding_store().await;
562 let cid = store.create_conversation().await.unwrap();
563 let msg_id = store.save_message(cid, "user", "hello").await.unwrap();
564 store.soft_delete_messages(&[msg_id]).await.unwrap();
565
566 let cleaned = run_eviction_phase2(&store, None).await.unwrap();
568 assert_eq!(
569 cleaned, 1,
570 "row must be cleaned even without embedding store"
571 );
572
573 let remaining = store.get_soft_deleted_message_ids().await.unwrap();
574 assert!(
575 !remaining.contains(&msg_id),
576 "message must not remain in soft-deleted list"
577 );
578 }
579
580 #[tokio::test]
582 async fn eviction_phase2_empty_returns_zero() {
583 let (emb, store) = setup_embedding_store().await;
584 let cleaned = run_eviction_phase2(&store, Some(&emb)).await.unwrap();
585 assert_eq!(cleaned, 0, "no soft-deleted messages → 0 cleaned");
586 }
587}