1use std::sync::Arc;
16
17use tokio::time::{Duration, interval};
18use tokio_util::sync::CancellationToken;
19
20use crate::error::MemoryError;
21use crate::store::SqliteStore;
22use crate::types::MessageId;
23
24#[derive(Debug, Clone)]
28pub struct EvictionEntry {
29 pub id: MessageId,
31 pub created_at: String,
33 pub last_accessed: Option<String>,
35 pub access_count: u32,
37}
38
39pub trait EvictionPolicy: Send + Sync {
43 fn score(&self, entry: &EvictionEntry) -> f64;
48}
49
50#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
52#[serde(default)]
53pub struct EvictionConfig {
54 pub policy: String,
56 pub max_entries: usize,
58 pub sweep_interval_secs: u64,
60}
61
62impl Default for EvictionConfig {
63 fn default() -> Self {
64 Self {
65 policy: "ebbinghaus".to_owned(),
66 max_entries: 0,
67 sweep_interval_secs: 3600,
68 }
69 }
70}
71
72pub struct EbbinghausPolicy {
87 retention_strength: f64,
88}
89
90impl EbbinghausPolicy {
91 #[must_use]
95 pub fn new(retention_strength: f64) -> Self {
96 Self { retention_strength }
97 }
98}
99
100impl Default for EbbinghausPolicy {
101 fn default() -> Self {
102 Self::new(86_400.0) }
104}
105
106impl EvictionPolicy for EbbinghausPolicy {
107 fn score(&self, entry: &EvictionEntry) -> f64 {
108 let now_secs = unix_now_secs();
109
110 let reference_secs = entry
111 .last_accessed
112 .as_deref()
113 .and_then(parse_sqlite_timestamp_secs)
114 .unwrap_or_else(|| parse_sqlite_timestamp_secs(&entry.created_at).unwrap_or(now_secs));
115
116 #[allow(clippy::cast_precision_loss)]
118 let t = now_secs.saturating_sub(reference_secs) as f64;
119 let n = f64::from(entry.access_count);
120
121 let denominator = (self.retention_strength * (1.0_f64 + n).ln()).max(1.0);
123 (-t / denominator).exp()
124 }
125}
126
127fn unix_now_secs() -> u64 {
128 std::time::SystemTime::now()
129 .duration_since(std::time::UNIX_EPOCH)
130 .map_or(0, |d| d.as_secs())
131}
132
133fn parse_sqlite_timestamp_secs(s: &str) -> Option<u64> {
137 let s = s.trim();
139 if s.len() < 19 {
140 return None;
141 }
142 let year: u64 = s[0..4].parse().ok()?;
143 let month: u64 = s[5..7].parse().ok()?;
144 let day: u64 = s[8..10].parse().ok()?;
145 let hour: u64 = s[11..13].parse().ok()?;
146 let min: u64 = s[14..16].parse().ok()?;
147 let sec: u64 = s[17..19].parse().ok()?;
148
149 let is_leap = |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
152 let days_in_month = |y: u64, m: u64| -> u64 {
153 match m {
154 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
155 4 | 6 | 9 | 11 => 30,
156 2 => {
157 if is_leap(y) {
158 29
159 } else {
160 28
161 }
162 }
163 _ => 0,
164 }
165 };
166
167 let mut days: u64 = 0;
168 for y in 1970..year {
169 days += if is_leap(y) { 366 } else { 365 };
170 }
171 for m in 1..month {
172 days += days_in_month(year, m);
173 }
174 days += day.saturating_sub(1);
175
176 Some(days * 86400 + hour * 3600 + min * 60 + sec)
177}
178
179pub async fn start_eviction_loop(
196 store: Arc<SqliteStore>,
197 config: EvictionConfig,
198 policy: Arc<dyn EvictionPolicy + 'static>,
199 cancel: CancellationToken,
200) {
201 if config.max_entries == 0 {
202 tracing::debug!("eviction disabled (max_entries = 0)");
203 return;
204 }
205
206 let mut ticker = interval(Duration::from_secs(config.sweep_interval_secs));
207 ticker.tick().await;
209
210 loop {
211 tokio::select! {
212 () = cancel.cancelled() => {
213 tracing::debug!("eviction loop shutting down");
214 return;
215 }
216 _ = ticker.tick() => {}
217 }
218
219 tracing::debug!(max_entries = config.max_entries, "running eviction sweep");
220
221 match run_eviction_phase1(&store, &*policy, config.max_entries).await {
223 Ok(deleted) => {
224 if deleted > 0 {
225 tracing::info!(deleted, "eviction phase 1: soft-deleted entries");
226 }
227 }
228 Err(e) => {
229 tracing::warn!(error = %e, "eviction phase 1 failed, will retry next sweep");
230 }
231 }
232
233 match run_eviction_phase2(&store).await {
236 Ok(cleaned) => {
237 if cleaned > 0 {
238 tracing::info!(cleaned, "eviction phase 2: removed Qdrant vectors");
239 }
240 }
241 Err(e) => {
242 tracing::warn!(error = %e, "eviction phase 2 failed, will retry next sweep");
243 }
244 }
245 }
246}
247
248#[cfg_attr(
249 feature = "profiling",
250 tracing::instrument(name = "memory.eviction_phase1", skip_all)
251)]
252async fn run_eviction_phase1(
253 store: &SqliteStore,
254 policy: &dyn EvictionPolicy,
255 max_entries: usize,
256) -> Result<usize, MemoryError> {
257 let candidates = store.get_eviction_candidates().await?;
258 let total = candidates.len();
259
260 if total <= max_entries {
261 return Ok(0);
262 }
263
264 let excess = total - max_entries;
265 let mut scored: Vec<(f64, MessageId)> = candidates
266 .into_iter()
267 .map(|e| (policy.score(&e), e.id))
268 .collect();
269
270 scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
272
273 let ids_to_delete: Vec<MessageId> = scored.into_iter().take(excess).map(|(_, id)| id).collect();
274 store.soft_delete_messages(&ids_to_delete).await?;
275
276 Ok(ids_to_delete.len())
277}
278
279#[cfg_attr(
280 feature = "profiling",
281 tracing::instrument(name = "memory.eviction_phase2", skip_all)
282)]
283async fn run_eviction_phase2(store: &SqliteStore) -> Result<usize, MemoryError> {
284 let ids = store.get_soft_deleted_message_ids().await?;
286 if ids.is_empty() {
287 return Ok(0);
288 }
289
290 tracing::warn!(
295 count = ids.len(),
296 "eviction phase 2: Qdrant vector removal not yet wired — marking cleaned without actual deletion (MVP)"
297 );
298
299 store.mark_qdrant_cleaned(&ids).await?;
302 Ok(ids.len())
303}
304
305#[cfg(test)]
308mod tests {
309 use super::*;
310
311 fn ts_ago(seconds_ago: u64) -> String {
315 let ts = unix_now_secs().saturating_sub(seconds_ago);
316 let sec = ts % 60;
318 let min = (ts / 60) % 60;
319 let hour = (ts / 3600) % 24;
320 let mut total_days = ts / 86400;
321 let is_leap =
322 |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
323 let mut year = 1970u64;
324 loop {
325 let days_in_year = if is_leap(year) { 366 } else { 365 };
326 if total_days < days_in_year {
327 break;
328 }
329 total_days -= days_in_year;
330 year += 1;
331 }
332 let month_days = [
333 0u64,
334 31,
335 28 + u64::from(is_leap(year)),
336 31,
337 30,
338 31,
339 30,
340 31,
341 31,
342 30,
343 31,
344 30,
345 31,
346 ];
347 let mut month = 1u64;
348 while month <= 12 {
349 let month_idx = usize::try_from(month).unwrap_or_else(|_| unreachable!());
350 if total_days < month_days[month_idx] {
351 break;
352 }
353 total_days -= month_days[month_idx];
354 month += 1;
355 }
356 let day = total_days + 1;
357 format!("{year:04}-{month:02}-{day:02} {hour:02}:{min:02}:{sec:02}")
358 }
359
360 fn make_entry(access_count: u32, seconds_ago: u64) -> EvictionEntry {
361 let ts = ts_ago(seconds_ago);
362 EvictionEntry {
363 id: MessageId(1),
364 created_at: ts.clone(),
365 last_accessed: Some(ts),
366 access_count,
367 }
368 }
369
370 #[test]
371 fn ebbinghaus_recent_high_access_scores_near_one() {
372 let policy = EbbinghausPolicy::default();
373 let entry = make_entry(10, 1);
375 let score = policy.score(&entry);
376 assert!(
378 score > 0.99,
379 "score should be near 1.0 for recently accessed entry, got {score}"
380 );
381 }
382
383 #[test]
384 fn ebbinghaus_old_zero_access_scores_lower() {
385 let policy = EbbinghausPolicy::default();
386 let old = make_entry(0, 7 * 24 * 3600); let recent = make_entry(0, 60); assert!(
389 policy.score(&old) < policy.score(&recent),
390 "old entry must score lower than recent"
391 );
392 }
393
394 #[test]
395 fn ebbinghaus_high_access_decays_slower() {
396 let policy = EbbinghausPolicy::default();
397 let low = make_entry(1, 3600); let high = make_entry(20, 3600); assert!(
400 policy.score(&high) > policy.score(&low),
401 "high access count should yield higher score"
402 );
403 }
404
405 #[test]
406 fn ebbinghaus_never_accessed_uses_created_at_as_reference() {
407 let policy = EbbinghausPolicy::default();
408 let old_with_no_last_accessed = EvictionEntry {
411 id: MessageId(2),
412 created_at: ts_ago(7 * 24 * 3600),
413 last_accessed: None,
414 access_count: 0,
415 };
416 let old_with_same_last_accessed = make_entry(0, 7 * 24 * 3600);
417 let score_no_access = policy.score(&old_with_no_last_accessed);
418 let score_same = policy.score(&old_with_same_last_accessed);
419 let diff = (score_no_access - score_same).abs();
421 assert!(diff < 1e-6, "scores should match; diff = {diff}");
422 }
423
424 #[test]
425 fn eviction_config_default_is_disabled() {
426 let config = EvictionConfig::default();
427 assert_eq!(
428 config.max_entries, 0,
429 "eviction must be disabled by default"
430 );
431 }
432
433 #[test]
434 fn parse_sqlite_timestamp_known_value() {
435 let ts = parse_sqlite_timestamp_secs("2024-01-01 00:00:00").unwrap();
437 assert_eq!(
440 ts, 1_704_067_200,
441 "2024-01-01 must parse to known timestamp"
442 );
443 }
444}