use std::sync::Arc;
use tokio::time::{Duration, interval};
use tokio_util::sync::CancellationToken;
use crate::error::MemoryError;
use crate::store::SqliteStore;
use crate::types::MessageId;
#[derive(Debug, Clone)]
pub struct EvictionEntry {
pub id: MessageId,
pub created_at: String,
pub last_accessed: Option<String>,
pub access_count: u32,
}
pub trait EvictionPolicy: Send + Sync {
fn score(&self, entry: &EvictionEntry) -> f64;
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[serde(default)]
pub struct EvictionConfig {
pub policy: String,
pub max_entries: usize,
pub sweep_interval_secs: u64,
}
impl Default for EvictionConfig {
fn default() -> Self {
Self {
policy: "ebbinghaus".to_owned(),
max_entries: 0,
sweep_interval_secs: 3600,
}
}
}
pub struct EbbinghausPolicy {
retention_strength: f64,
}
impl EbbinghausPolicy {
#[must_use]
pub fn new(retention_strength: f64) -> Self {
Self { retention_strength }
}
}
impl Default for EbbinghausPolicy {
fn default() -> Self {
Self::new(86_400.0) }
}
impl EvictionPolicy for EbbinghausPolicy {
fn score(&self, entry: &EvictionEntry) -> f64 {
let now_secs = unix_now_secs();
let reference_secs = entry
.last_accessed
.as_deref()
.and_then(parse_sqlite_timestamp_secs)
.unwrap_or_else(|| parse_sqlite_timestamp_secs(&entry.created_at).unwrap_or(now_secs));
#[allow(clippy::cast_precision_loss)]
let t = now_secs.saturating_sub(reference_secs) as f64;
let n = f64::from(entry.access_count);
let denominator = (self.retention_strength * (1.0_f64 + n).ln()).max(1.0);
(-t / denominator).exp()
}
}
fn unix_now_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn parse_sqlite_timestamp_secs(s: &str) -> Option<u64> {
let s = s.trim();
if s.len() < 19 {
return None;
}
let year: u64 = s[0..4].parse().ok()?;
let month: u64 = s[5..7].parse().ok()?;
let day: u64 = s[8..10].parse().ok()?;
let hour: u64 = s[11..13].parse().ok()?;
let min: u64 = s[14..16].parse().ok()?;
let sec: u64 = s[17..19].parse().ok()?;
let is_leap = |y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
let days_in_month = |y: u64, m: u64| -> u64 {
match m {
1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
4 | 6 | 9 | 11 => 30,
2 => {
if is_leap(y) {
29
} else {
28
}
}
_ => 0,
}
};
let mut days: u64 = 0;
for y in 1970..year {
days += if is_leap(y) { 366 } else { 365 };
}
for m in 1..month {
days += days_in_month(year, m);
}
days += day.saturating_sub(1);
Some(days * 86400 + hour * 3600 + min * 60 + sec)
}
pub async fn start_eviction_loop(
store: Arc<SqliteStore>,
config: EvictionConfig,
policy: Arc<dyn EvictionPolicy + 'static>,
cancel: CancellationToken,
) {
if config.max_entries == 0 {
tracing::debug!("eviction disabled (max_entries = 0)");
return;
}
let mut ticker = interval(Duration::from_secs(config.sweep_interval_secs));
ticker.tick().await;
loop {
tokio::select! {
() = cancel.cancelled() => {
tracing::debug!("eviction loop shutting down");
return;
}
_ = ticker.tick() => {}
}
tracing::debug!(max_entries = config.max_entries, "running eviction sweep");
match run_eviction_phase1(&store, &*policy, config.max_entries).await {
Ok(deleted) => {
if deleted > 0 {
tracing::info!(deleted, "eviction phase 1: soft-deleted entries");
}
}
Err(e) => {
tracing::warn!(error = %e, "eviction phase 1 failed, will retry next sweep");
}
}
match run_eviction_phase2(&store).await {
Ok(cleaned) => {
if cleaned > 0 {
tracing::info!(cleaned, "eviction phase 2: removed Qdrant vectors");
}
}
Err(e) => {
tracing::warn!(error = %e, "eviction phase 2 failed, will retry next sweep");
}
}
}
}
#[cfg_attr(
feature = "profiling",
tracing::instrument(name = "memory.eviction_phase1", skip_all)
)]
async fn run_eviction_phase1(
store: &SqliteStore,
policy: &dyn EvictionPolicy,
max_entries: usize,
) -> Result<usize, MemoryError> {
let candidates = store.get_eviction_candidates().await?;
let total = candidates.len();
if total <= max_entries {
return Ok(0);
}
let excess = total - max_entries;
let mut scored: Vec<(f64, MessageId)> = candidates
.into_iter()
.map(|e| (policy.score(&e), e.id))
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
let ids_to_delete: Vec<MessageId> = scored.into_iter().take(excess).map(|(_, id)| id).collect();
store.soft_delete_messages(&ids_to_delete).await?;
Ok(ids_to_delete.len())
}
#[cfg_attr(
feature = "profiling",
tracing::instrument(name = "memory.eviction_phase2", skip_all)
)]
async fn run_eviction_phase2(store: &SqliteStore) -> Result<usize, MemoryError> {
let ids = store.get_soft_deleted_message_ids().await?;
if ids.is_empty() {
return Ok(0);
}
tracing::warn!(
count = ids.len(),
"eviction phase 2: Qdrant vector removal not yet wired — marking cleaned without actual deletion (MVP)"
);
store.mark_qdrant_cleaned(&ids).await?;
Ok(ids.len())
}
#[cfg(test)]
mod tests {
use super::*;
fn ts_ago(seconds_ago: u64) -> String {
let ts = unix_now_secs().saturating_sub(seconds_ago);
let sec = ts % 60;
let min = (ts / 60) % 60;
let hour = (ts / 3600) % 24;
let mut total_days = ts / 86400;
let is_leap =
|y: u64| (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
let mut year = 1970u64;
loop {
let days_in_year = if is_leap(year) { 366 } else { 365 };
if total_days < days_in_year {
break;
}
total_days -= days_in_year;
year += 1;
}
let month_days = [
0u64,
31,
28 + u64::from(is_leap(year)),
31,
30,
31,
30,
31,
31,
30,
31,
30,
31,
];
let mut month = 1u64;
while month <= 12 {
let month_idx = usize::try_from(month).unwrap_or_else(|_| unreachable!());
if total_days < month_days[month_idx] {
break;
}
total_days -= month_days[month_idx];
month += 1;
}
let day = total_days + 1;
format!("{year:04}-{month:02}-{day:02} {hour:02}:{min:02}:{sec:02}")
}
fn make_entry(access_count: u32, seconds_ago: u64) -> EvictionEntry {
let ts = ts_ago(seconds_ago);
EvictionEntry {
id: MessageId(1),
created_at: ts.clone(),
last_accessed: Some(ts),
access_count,
}
}
#[test]
fn ebbinghaus_recent_high_access_scores_near_one() {
let policy = EbbinghausPolicy::default();
let entry = make_entry(10, 1);
let score = policy.score(&entry);
assert!(
score > 0.99,
"score should be near 1.0 for recently accessed entry, got {score}"
);
}
#[test]
fn ebbinghaus_old_zero_access_scores_lower() {
let policy = EbbinghausPolicy::default();
let old = make_entry(0, 7 * 24 * 3600); let recent = make_entry(0, 60); assert!(
policy.score(&old) < policy.score(&recent),
"old entry must score lower than recent"
);
}
#[test]
fn ebbinghaus_high_access_decays_slower() {
let policy = EbbinghausPolicy::default();
let low = make_entry(1, 3600); let high = make_entry(20, 3600); assert!(
policy.score(&high) > policy.score(&low),
"high access count should yield higher score"
);
}
#[test]
fn ebbinghaus_never_accessed_uses_created_at_as_reference() {
let policy = EbbinghausPolicy::default();
let old_with_no_last_accessed = EvictionEntry {
id: MessageId(2),
created_at: ts_ago(7 * 24 * 3600),
last_accessed: None,
access_count: 0,
};
let old_with_same_last_accessed = make_entry(0, 7 * 24 * 3600);
let score_no_access = policy.score(&old_with_no_last_accessed);
let score_same = policy.score(&old_with_same_last_accessed);
let diff = (score_no_access - score_same).abs();
assert!(diff < 1e-6, "scores should match; diff = {diff}");
}
#[test]
fn eviction_config_default_is_disabled() {
let config = EvictionConfig::default();
assert_eq!(
config.max_entries, 0,
"eviction must be disabled by default"
);
}
#[test]
fn parse_sqlite_timestamp_known_value() {
let ts = parse_sqlite_timestamp_secs("2024-01-01 00:00:00").unwrap();
assert_eq!(
ts, 1_704_067_200,
"2024-01-01 must parse to known timestamp"
);
}
}