1use std::sync::Arc;
16
17use tokio::time::{Duration, interval};
18use tokio_util::sync::CancellationToken;
19
20use crate::error::MemoryError;
21use crate::store::SqliteStore;
22use crate::types::MessageId;
23
24#[derive(Debug, Clone)]
28pub struct EvictionEntry {
29 pub id: MessageId,
31 pub created_at: String,
33 pub last_accessed: Option<String>,
35 pub access_count: u32,
37}
38
39pub trait EvictionPolicy: Send + Sync {
43 fn score(&self, entry: &EvictionEntry) -> f64;
48}
49
50#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
52#[serde(default)]
53pub struct EvictionConfig {
54 pub policy: String,
56 pub max_entries: usize,
58 pub sweep_interval_secs: u64,
60}
61
62impl Default for EvictionConfig {
63 fn default() -> Self {
64 Self {
65 policy: "ebbinghaus".to_owned(),
66 max_entries: 0,
67 sweep_interval_secs: 3600,
68 }
69 }
70}
71
72pub struct EbbinghausPolicy {
87 retention_strength: f64,
88}
89
90impl EbbinghausPolicy {
91 #[must_use]
95 pub fn new(retention_strength: f64) -> Self {
96 Self { retention_strength }
97 }
98}
99
100impl Default for EbbinghausPolicy {
101 fn default() -> Self {
102 Self::new(86_400.0) }
104}
105
106impl EvictionPolicy for EbbinghausPolicy {
107 fn score(&self, entry: &EvictionEntry) -> f64 {
108 let now_secs = unix_now_secs();
109
110 let reference_secs = entry
111 .last_accessed
112 .as_deref()
113 .and_then(parse_sqlite_timestamp_secs)
114 .unwrap_or_else(|| parse_sqlite_timestamp_secs(&entry.created_at).unwrap_or(now_secs));
115
116 #[allow(clippy::cast_precision_loss)]
118 let t = now_secs.saturating_sub(reference_secs) as f64;
119 let n = f64::from(entry.access_count);
120
121 let denominator = (self.retention_strength * (1.0_f64 + n).ln()).max(1.0);
123 (-t / denominator).exp()
124 }
125}
126
127fn unix_now_secs() -> u64 {
128 std::time::SystemTime::now()
129 .duration_since(std::time::UNIX_EPOCH)
130 .map_or(0, |d| d.as_secs())
131}
132
133fn parse_sqlite_timestamp_secs(s: &str) -> Option<u64> {
137 let s = s.trim();
139 if s.len() < 19 {
140 return None;
141 }
142 let year: u64 = s[0..4].parse().ok()?;
143 let month: u64 = s[5..7].parse().ok()?;
144 let day: u64 = s[8..10].parse().ok()?;
145 let hour: u64 = s[11..13].parse().ok()?;
146 let min: u64 = s[14..16].parse().ok()?;
147 let sec: u64 = s[17..19].parse().ok()?;
148
149 let is_leap = |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
152 let days_in_month = |y: u64, m: u64| -> u64 {
153 match m {
154 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
155 4 | 6 | 9 | 11 => 30,
156 2 => {
157 if is_leap(y) {
158 29
159 } else {
160 28
161 }
162 }
163 _ => 0,
164 }
165 };
166
167 let mut days: u64 = 0;
168 for y in 1970..year {
169 days += if is_leap(y) { 366 } else { 365 };
170 }
171 for m in 1..month {
172 days += days_in_month(year, m);
173 }
174 days += day.saturating_sub(1);
175
176 Some(days * 86400 + hour * 3600 + min * 60 + sec)
177}
178
179pub async fn start_eviction_loop(
196 store: Arc<SqliteStore>,
197 config: EvictionConfig,
198 policy: Arc<dyn EvictionPolicy + 'static>,
199 cancel: CancellationToken,
200) {
201 if config.max_entries == 0 {
202 tracing::debug!("eviction disabled (max_entries = 0)");
203 return;
204 }
205
206 let mut ticker = interval(Duration::from_secs(config.sweep_interval_secs));
207 ticker.tick().await;
209
210 loop {
211 tokio::select! {
212 () = cancel.cancelled() => {
213 tracing::debug!("eviction loop shutting down");
214 return;
215 }
216 _ = ticker.tick() => {}
217 }
218
219 tracing::debug!(max_entries = config.max_entries, "running eviction sweep");
220
221 match run_eviction_phase1(&store, &*policy, config.max_entries).await {
223 Ok(deleted) => {
224 if deleted > 0 {
225 tracing::info!(deleted, "eviction phase 1: soft-deleted entries");
226 }
227 }
228 Err(e) => {
229 tracing::warn!(error = %e, "eviction phase 1 failed, will retry next sweep");
230 }
231 }
232
233 match run_eviction_phase2(&store).await {
236 Ok(cleaned) => {
237 if cleaned > 0 {
238 tracing::info!(cleaned, "eviction phase 2: removed Qdrant vectors");
239 }
240 }
241 Err(e) => {
242 tracing::warn!(error = %e, "eviction phase 2 failed, will retry next sweep");
243 }
244 }
245 }
246}
247
248#[cfg_attr(
249 feature = "profiling",
250 tracing::instrument(name = "memory.eviction_phase1", skip_all)
251)]
252async fn run_eviction_phase1(
253 store: &SqliteStore,
254 policy: &dyn EvictionPolicy,
255 max_entries: usize,
256) -> Result<usize, MemoryError> {
257 let candidates = store.get_eviction_candidates().await?;
258 let total = candidates.len();
259
260 if total <= max_entries {
261 return Ok(0);
262 }
263
264 let excess = total - max_entries;
265 let mut scored: Vec<(f64, MessageId)> = candidates
266 .into_iter()
267 .map(|e| (policy.score(&e), e.id))
268 .collect();
269
270 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
272
273 let candidates_to_delete: Vec<MessageId> =
274 scored.into_iter().take(excess).map(|(_, id)| id).collect();
275 let candidate_count = candidates_to_delete.len();
276 let ids_to_delete = store
281 .filter_out_preserved_episode_ids(&candidates_to_delete)
282 .await?;
283 let preserved = candidate_count - ids_to_delete.len();
284 if preserved > 0 {
285 tracing::debug!(
286 preserved,
287 deleted = ids_to_delete.len(),
288 "eviction phase 1: {preserved} candidate(s) preserved by summary anchor"
289 );
290 }
291 store.soft_delete_messages(&ids_to_delete).await?;
292
293 Ok(ids_to_delete.len())
295}
296
297#[cfg_attr(
298 feature = "profiling",
299 tracing::instrument(name = "memory.eviction_phase2", skip_all)
300)]
301async fn run_eviction_phase2(store: &SqliteStore) -> Result<usize, MemoryError> {
302 let ids = store.get_soft_deleted_message_ids().await?;
304 if ids.is_empty() {
305 return Ok(0);
306 }
307
308 tracing::warn!(
313 count = ids.len(),
314 "eviction phase 2: Qdrant vector removal not yet wired — marking cleaned without actual deletion (MVP)"
315 );
316
317 store.mark_qdrant_cleaned(&ids).await?;
320 Ok(ids.len())
321}
322
323#[cfg(test)]
326mod tests {
327 use super::*;
328
329 fn ts_ago(seconds_ago: u64) -> String {
333 let ts = unix_now_secs().saturating_sub(seconds_ago);
334 let sec = ts % 60;
336 let min = (ts / 60) % 60;
337 let hour = (ts / 3600) % 24;
338 let mut total_days = ts / 86400;
339 let is_leap =
340 |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
341 let mut year = 1970u64;
342 loop {
343 let days_in_year = if is_leap(year) { 366 } else { 365 };
344 if total_days < days_in_year {
345 break;
346 }
347 total_days -= days_in_year;
348 year += 1;
349 }
350 let month_days = [
351 0u64,
352 31,
353 28 + u64::from(is_leap(year)),
354 31,
355 30,
356 31,
357 30,
358 31,
359 31,
360 30,
361 31,
362 30,
363 31,
364 ];
365 let mut month = 1u64;
366 while month <= 12 {
367 let month_idx = usize::try_from(month).unwrap_or_else(|_| unreachable!());
368 if total_days < month_days[month_idx] {
369 break;
370 }
371 total_days -= month_days[month_idx];
372 month += 1;
373 }
374 let day = total_days + 1;
375 format!("{year:04}-{month:02}-{day:02} {hour:02}:{min:02}:{sec:02}")
376 }
377
378 fn make_entry(access_count: u32, seconds_ago: u64) -> EvictionEntry {
379 let ts = ts_ago(seconds_ago);
380 EvictionEntry {
381 id: MessageId(1),
382 created_at: ts.clone(),
383 last_accessed: Some(ts),
384 access_count,
385 }
386 }
387
388 #[test]
389 fn ebbinghaus_recent_high_access_scores_near_one() {
390 let policy = EbbinghausPolicy::default();
391 let entry = make_entry(10, 1);
393 let score = policy.score(&entry);
394 assert!(
396 score > 0.99,
397 "score should be near 1.0 for recently accessed entry, got {score}"
398 );
399 }
400
401 #[test]
402 fn ebbinghaus_old_zero_access_scores_lower() {
403 let policy = EbbinghausPolicy::default();
404 let old = make_entry(0, 7 * 24 * 3600); let recent = make_entry(0, 60); assert!(
407 policy.score(&old) < policy.score(&recent),
408 "old entry must score lower than recent"
409 );
410 }
411
412 #[test]
413 fn ebbinghaus_high_access_decays_slower() {
414 let policy = EbbinghausPolicy::default();
415 let low = make_entry(1, 3600); let high = make_entry(20, 3600); assert!(
418 policy.score(&high) > policy.score(&low),
419 "high access count should yield higher score"
420 );
421 }
422
423 #[test]
424 fn ebbinghaus_never_accessed_uses_created_at_as_reference() {
425 let policy = EbbinghausPolicy::default();
426 let old_with_no_last_accessed = EvictionEntry {
429 id: MessageId(2),
430 created_at: ts_ago(7 * 24 * 3600),
431 last_accessed: None,
432 access_count: 0,
433 };
434 let old_with_same_last_accessed = make_entry(0, 7 * 24 * 3600);
435 let score_no_access = policy.score(&old_with_no_last_accessed);
436 let score_same = policy.score(&old_with_same_last_accessed);
437 let diff = (score_no_access - score_same).abs();
439 assert!(diff < 1e-6, "scores should match; diff = {diff}");
440 }
441
442 #[test]
443 fn eviction_config_default_is_disabled() {
444 let config = EvictionConfig::default();
445 assert_eq!(
446 config.max_entries, 0,
447 "eviction must be disabled by default"
448 );
449 }
450
451 #[tokio::test]
454 async fn test_eviction_preserves_summary_anchored_messages() {
455 use crate::store::SqliteStore;
456
457 let store = SqliteStore::new(":memory:").await.unwrap();
458 let cid = store.create_conversation().await.unwrap();
459
460 let ids: Vec<_> = (0..6)
462 .map(|_| async { store.save_message(cid, "user", "msg").await.unwrap() })
463 .collect();
464 let mut msg_ids = Vec::new();
465 for f in ids {
466 msg_ids.push(f.await);
467 }
468
469 store
471 .save_summary(cid, "summary", Some(msg_ids[0]), Some(msg_ids[2]), 30)
472 .await
473 .unwrap();
474
475 let policy = EbbinghausPolicy::default();
476 let deleted = run_eviction_phase1(&store, &policy, 3).await.unwrap();
478
479 assert!(
482 deleted <= 3,
483 "at most 3 messages can be deleted, got {deleted}"
484 );
485
486 for &anchored in &msg_ids[0..=2] {
487 let is_deleted: Option<String> =
488 sqlx::query_scalar("SELECT deleted_at FROM messages WHERE id = ?")
489 .bind(anchored)
490 .fetch_one(store.pool())
491 .await
492 .unwrap();
493 assert!(
494 is_deleted.is_none(),
495 "summary-anchored message {anchored:?} must not be soft-deleted"
496 );
497 }
498 }
499
500 #[test]
501 fn parse_sqlite_timestamp_known_value() {
502 let ts = parse_sqlite_timestamp_secs("2024-01-01 00:00:00").unwrap();
504 assert_eq!(
507 ts, 1_704_067_200,
508 "2024-01-01 must parse to known timestamp"
509 );
510 }
511}