use std::sync::Arc;
use std::time::{Duration, SystemTime};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use super::TenantId;
pub const DEFAULT_TOMBSTONE_RETENTION: Duration = Duration::from_secs(24 * 60 * 60);
pub const MIN_TOMBSTONE_RETENTION: Duration = Duration::from_secs(60 * 60);
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RetentionPolicy {
pub tombstone_retention: Duration,
}
impl Default for RetentionPolicy {
fn default() -> Self {
Self {
tombstone_retention: DEFAULT_TOMBSTONE_RETENTION,
}
}
}
impl RetentionPolicy {
pub fn validate(&self) -> Result<(), RetentionError> {
if self.tombstone_retention < MIN_TOMBSTONE_RETENTION {
return Err(RetentionError::PolicyTooAggressive {
field: "tombstone_retention",
min: MIN_TOMBSTONE_RETENTION,
got: self.tombstone_retention,
});
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum RetentionError {
#[error(
"retention policy field `{field}` set below safety floor: \
got {got:?}, minimum is {min:?}"
)]
PolicyTooAggressive {
field: &'static str,
min: Duration,
got: Duration,
},
#[error("contributor `{name}` failed: {message}")]
ContributorFailed { name: &'static str, message: String },
}
#[async_trait]
pub trait RetentionContributor: Send + Sync {
fn name(&self) -> &'static str;
async fn min_required_log_index(
&self,
tenant_id: TenantId,
) -> Result<Option<u64>, RetentionError>;
}
#[derive(Default, Clone)]
pub struct RetentionRegistry {
contributors: Vec<Arc<dyn RetentionContributor>>,
}
impl RetentionRegistry {
pub fn new() -> Self {
Self {
contributors: Vec::new(),
}
}
pub fn with(mut self, c: Arc<dyn RetentionContributor>) -> Self {
self.contributors.push(c);
self
}
pub fn add(&mut self, c: Arc<dyn RetentionContributor>) {
self.contributors.push(c);
}
pub fn contributor_names(&self) -> Vec<&'static str> {
self.contributors.iter().map(|c| c.name()).collect()
}
pub async fn safe_purge_watermark(
&self,
tenant_id: TenantId,
) -> Result<SafePurgeWatermark, RetentionError> {
let mut floors: Vec<(&'static str, Option<u64>)> =
Vec::with_capacity(self.contributors.len());
for c in &self.contributors {
let f = c.min_required_log_index(tenant_id).await?;
floors.push((c.name(), f));
}
let min_floor = floors.iter().filter_map(|(_, f)| *f).min();
let safe_purge_through = min_floor.map(|f| f.saturating_sub(1));
Ok(SafePurgeWatermark {
tenant_id,
safe_purge_through,
min_required: min_floor,
per_contributor: floors,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SafePurgeWatermark {
pub tenant_id: TenantId,
pub safe_purge_through: Option<u64>,
pub min_required: Option<u64>,
pub per_contributor: Vec<(&'static str, Option<u64>)>,
}
impl SafePurgeWatermark {
pub fn binding_contributor(&self) -> Option<(&'static str, u64)> {
self.per_contributor
.iter()
.filter_map(|(n, f)| f.map(|v| (*n, v)))
.min_by_key(|(_, v)| *v)
}
}
pub struct TombstoneRetentionContributor<C: ?Sized> {
committer: Arc<C>,
policy: RetentionPolicy,
now_fn: Arc<dyn Fn() -> SystemTime + Send + Sync>,
}
impl<C: ?Sized> TombstoneRetentionContributor<C>
where
C: super::MutationCommitter,
{
pub fn new(committer: Arc<C>, policy: RetentionPolicy) -> Self {
Self {
committer,
policy,
now_fn: Arc::new(SystemTime::now),
}
}
#[cfg(test)]
pub fn with_clock(
committer: Arc<C>,
policy: RetentionPolicy,
now_fn: impl Fn() -> SystemTime + Send + Sync + 'static,
) -> Self {
Self {
committer,
policy,
now_fn: Arc::new(now_fn),
}
}
}
#[async_trait]
impl<C: ?Sized> RetentionContributor for TombstoneRetentionContributor<C>
where
C: super::MutationCommitter + 'static,
{
fn name(&self) -> &'static str {
"tombstone_retention"
}
async fn min_required_log_index(
&self,
tenant_id: TenantId,
) -> Result<Option<u64>, RetentionError> {
let now = (self.now_fn)();
let cutoff = now
.checked_sub(self.policy.tombstone_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let cutoff_micros = cutoff
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or(0);
let high = self
.committer
.high_watermark(tenant_id)
.await
.map_err(|e| RetentionError::ContributorFailed {
name: "tombstone_retention",
message: format!("high_watermark: {e}"),
})?;
if high == 0 {
return Ok(None); }
const PAGE: usize = 1024;
let mut from = 1;
while from <= high {
let entries = self
.committer
.read_range(tenant_id, from, PAGE)
.await
.map_err(|e| RetentionError::ContributorFailed {
name: "tombstone_retention",
message: format!("read_range from={from}: {e}"),
})?;
if entries.is_empty() {
break;
}
for e in &entries {
let committed_micros = e
.committed_at
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or(0);
if committed_micros >= cutoff_micros {
return Ok(Some(e.log_index));
}
}
from = entries.last().map(|e| e.log_index + 1).unwrap_or(high + 1);
}
Ok(None)
}
}
pub struct HnswWatermarkContributor {
manifests: Arc<dyn crate::index::hnsw::HnswManifestStore>,
}
impl HnswWatermarkContributor {
pub fn new(manifests: Arc<dyn crate::index::hnsw::HnswManifestStore>) -> Self {
Self { manifests }
}
}
#[async_trait]
impl RetentionContributor for HnswWatermarkContributor {
fn name(&self) -> &'static str {
"hnsw_watermark"
}
async fn min_required_log_index(
&self,
tenant_id: TenantId,
) -> Result<Option<u64>, RetentionError> {
let manifests = self.manifests.list_for_tenant(tenant_id).map_err(|e| {
RetentionError::ContributorFailed {
name: "hnsw_watermark",
message: format!("list_for_tenant: {e}"),
}
})?;
if manifests.is_empty() {
return Ok(None);
}
let min_watermark = manifests
.iter()
.map(|m| m.source_log_watermark)
.min()
.unwrap_or(0);
Ok(Some(min_watermark.saturating_add(1)))
}
}
pub struct FollowerLagContributor {
measure: Arc<dyn Fn(TenantId) -> Option<u64> + Send + Sync>,
}
impl FollowerLagContributor {
pub fn new(measure: impl Fn(TenantId) -> Option<u64> + Send + Sync + 'static) -> Self {
Self {
measure: Arc::new(measure),
}
}
pub fn disabled() -> Self {
Self::new(|_| None)
}
}
#[async_trait]
impl RetentionContributor for FollowerLagContributor {
fn name(&self) -> &'static str {
"follower_lag"
}
async fn min_required_log_index(
&self,
tenant_id: TenantId,
) -> Result<Option<u64>, RetentionError> {
Ok((self.measure)(tenant_id).map(|applied| applied.saturating_add(1)))
}
}
pub struct BackupWatermarkContributor {
measure: Arc<dyn Fn(TenantId) -> Option<u64> + Send + Sync>,
}
impl BackupWatermarkContributor {
pub fn new(measure: impl Fn(TenantId) -> Option<u64> + Send + Sync + 'static) -> Self {
Self {
measure: Arc::new(measure),
}
}
pub fn disabled() -> Self {
Self::new(|_| None)
}
}
#[async_trait]
impl RetentionContributor for BackupWatermarkContributor {
fn name(&self) -> &'static str {
"backup_watermark"
}
async fn min_required_log_index(
&self,
tenant_id: TenantId,
) -> Result<Option<u64>, RetentionError> {
Ok((self.measure)(tenant_id))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commit::{CommitOptions, LocalSqliteCommitter, MemoryMutation};
use crate::index::hnsw::{
DistanceMetric, HnswManifest, HnswManifestStore, SqliteHnswManifestStore,
};
use parking_lot::Mutex;
use rusqlite::Connection;
use std::sync::Arc;
fn upsert(rid: &str) -> MemoryMutation {
MemoryMutation::UpsertMemory {
rid: rid.into(),
text: "x".into(),
memory_type: "semantic".into(),
importance: 0.5,
valence: 0.0,
half_life: 168.0,
namespace: "default".into(),
certainty: 1.0,
domain: "general".into(),
source: "user".into(),
emotional_state: None,
embedding: None,
metadata: serde_json::json!({}),
}
}
fn open_manifest_store() -> SqliteHnswManifestStore {
let mut conn = Connection::open_in_memory().unwrap();
crate::migrations::MigrationRunner::run_pending(&mut conn).unwrap();
SqliteHnswManifestStore::new(Arc::new(Mutex::new(conn)))
}
#[test]
fn policy_default_meets_minimum() {
let p = RetentionPolicy::default();
p.validate().unwrap();
assert_eq!(p.tombstone_retention, DEFAULT_TOMBSTONE_RETENTION);
}
#[test]
fn policy_below_minimum_rejects() {
let p = RetentionPolicy {
tombstone_retention: Duration::from_secs(60), };
match p.validate() {
Err(RetentionError::PolicyTooAggressive { field, .. }) => {
assert_eq!(field, "tombstone_retention");
}
other => panic!("expected PolicyTooAggressive, got {other:?}"),
}
}
#[tokio::test]
async fn empty_registry_reports_no_constraint() {
let r = RetentionRegistry::new();
let w = r.safe_purge_watermark(TenantId::new(1)).await.unwrap();
assert_eq!(w.safe_purge_through, None);
assert_eq!(w.min_required, None);
assert!(w.per_contributor.is_empty());
}
struct StubContributor {
name: &'static str,
floor: Option<u64>,
}
#[async_trait]
impl RetentionContributor for StubContributor {
fn name(&self) -> &'static str {
self.name
}
async fn min_required_log_index(
&self,
_tenant_id: TenantId,
) -> Result<Option<u64>, RetentionError> {
Ok(self.floor)
}
}
#[tokio::test]
async fn min_across_contributors_is_taken() {
let r = RetentionRegistry::new()
.with(Arc::new(StubContributor {
name: "a",
floor: Some(100),
}))
.with(Arc::new(StubContributor {
name: "b",
floor: Some(50),
}))
.with(Arc::new(StubContributor {
name: "c",
floor: Some(200),
}));
let w = r.safe_purge_watermark(TenantId::new(1)).await.unwrap();
assert_eq!(w.min_required, Some(50));
assert_eq!(w.safe_purge_through, Some(49));
assert_eq!(w.binding_contributor(), Some(("b", 50)));
}
#[tokio::test]
async fn none_floors_skip_contributor() {
let r = RetentionRegistry::new()
.with(Arc::new(StubContributor {
name: "a",
floor: None,
}))
.with(Arc::new(StubContributor {
name: "b",
floor: Some(30),
}));
let w = r.safe_purge_watermark(TenantId::new(1)).await.unwrap();
assert_eq!(w.min_required, Some(30));
assert_eq!(w.safe_purge_through, Some(29));
assert_eq!(w.binding_contributor(), Some(("b", 30)));
}
#[tokio::test]
async fn all_none_means_no_constraint() {
let r = RetentionRegistry::new()
.with(Arc::new(StubContributor {
name: "a",
floor: None,
}))
.with(Arc::new(StubContributor {
name: "b",
floor: None,
}));
let w = r.safe_purge_watermark(TenantId::new(1)).await.unwrap();
assert_eq!(w.min_required, None);
assert_eq!(w.safe_purge_through, None);
assert!(w.binding_contributor().is_none());
}
struct ErrContributor;
#[async_trait]
impl RetentionContributor for ErrContributor {
fn name(&self) -> &'static str {
"err"
}
async fn min_required_log_index(
&self,
_tenant_id: TenantId,
) -> Result<Option<u64>, RetentionError> {
Err(RetentionError::ContributorFailed {
name: "err",
message: "synthetic".into(),
})
}
}
#[tokio::test]
async fn any_error_aborts_safe_watermark() {
let r = RetentionRegistry::new()
.with(Arc::new(StubContributor {
name: "ok",
floor: Some(100),
}))
.with(Arc::new(ErrContributor));
let result = r.safe_purge_watermark(TenantId::new(1)).await;
assert!(matches!(
result,
Err(RetentionError::ContributorFailed { .. })
));
}
#[tokio::test]
async fn safe_purge_through_one_when_floor_is_one() {
let r = RetentionRegistry::new().with(Arc::new(StubContributor {
name: "a",
floor: Some(1),
}));
let w = r.safe_purge_watermark(TenantId::new(1)).await.unwrap();
assert_eq!(w.safe_purge_through, Some(0));
}
#[tokio::test]
async fn hnsw_contributor_floor_is_watermark_plus_one() {
let store = Arc::new(open_manifest_store());
let mut m = HnswManifest::new(TenantId::new(1), "minilm", 384, DistanceMetric::Cosine);
m.source_log_watermark = 42;
store.upsert(&m).unwrap();
let c = HnswWatermarkContributor::new(store);
let f = c.min_required_log_index(TenantId::new(1)).await.unwrap();
assert_eq!(f, Some(43));
}
#[tokio::test]
async fn hnsw_contributor_uses_min_across_manifests() {
let store = Arc::new(open_manifest_store());
let tenant = TenantId::new(1);
let mut primary = HnswManifest::new(tenant, "minilm", 384, DistanceMetric::Cosine);
primary.source_log_watermark = 100;
store.upsert(&primary).unwrap();
let mut shadow = HnswManifest::new(tenant, "bge-base", 768, DistanceMetric::L2);
shadow.source_log_watermark = 30;
store.upsert(&shadow).unwrap();
let c = HnswWatermarkContributor::new(store);
let f = c.min_required_log_index(tenant).await.unwrap();
assert_eq!(f, Some(31));
}
#[tokio::test]
async fn hnsw_contributor_no_manifests_returns_none() {
let store = Arc::new(open_manifest_store());
let c = HnswWatermarkContributor::new(store);
let f = c.min_required_log_index(TenantId::new(99)).await.unwrap();
assert_eq!(f, None, "tenant with no manifests should not constrain");
}
#[tokio::test]
async fn follower_lag_disabled_reports_none() {
let c = FollowerLagContributor::disabled();
let f = c.min_required_log_index(TenantId::new(1)).await.unwrap();
assert_eq!(f, None);
}
#[tokio::test]
async fn follower_lag_floor_is_applied_plus_one() {
let c = FollowerLagContributor::new(|t| {
if t == TenantId::new(1) {
Some(50)
} else {
None
}
});
assert_eq!(
c.min_required_log_index(TenantId::new(1)).await.unwrap(),
Some(51)
);
assert_eq!(
c.min_required_log_index(TenantId::new(2)).await.unwrap(),
None
);
}
#[tokio::test]
async fn backup_watermark_disabled_reports_none() {
let c = BackupWatermarkContributor::disabled();
assert_eq!(
c.min_required_log_index(TenantId::new(1)).await.unwrap(),
None
);
}
#[tokio::test]
async fn backup_watermark_floor_pins_source_log_start() {
let c = BackupWatermarkContributor::new(|_| Some(7));
assert_eq!(
c.min_required_log_index(TenantId::new(1)).await.unwrap(),
Some(7)
);
}
#[tokio::test]
async fn tombstone_retention_no_entries_returns_none() {
let committer: Arc<dyn super::super::MutationCommitter> =
Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
let c = TombstoneRetentionContributor::new(committer, RetentionPolicy::default());
let f = c.min_required_log_index(TenantId::new(99)).await.unwrap();
assert_eq!(f, None);
}
#[tokio::test]
async fn tombstone_retention_keeps_entries_inside_window() {
let committer: Arc<dyn super::super::MutationCommitter> =
Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
for tag in ["a", "b", "c"] {
committer
.commit(TenantId::new(1), upsert(tag), CommitOptions::default())
.await
.unwrap();
}
let c = TombstoneRetentionContributor::new(committer.clone(), RetentionPolicy::default());
let f = c.min_required_log_index(TenantId::new(1)).await.unwrap();
assert_eq!(
f,
Some(1),
"all entries inside window → floor at first entry"
);
}
#[tokio::test]
async fn tombstone_retention_drops_entries_outside_window() {
let committer: Arc<dyn super::super::MutationCommitter> =
Arc::new(LocalSqliteCommitter::open_in_memory().unwrap());
for tag in ["a", "b", "c"] {
committer
.commit(TenantId::new(1), upsert(tag), CommitOptions::default())
.await
.unwrap();
}
let two_days_later = SystemTime::now() + Duration::from_secs(48 * 60 * 60);
let c = TombstoneRetentionContributor::with_clock(
committer.clone(),
RetentionPolicy::default(),
move || two_days_later,
);
let f = c.min_required_log_index(TenantId::new(1)).await.unwrap();
assert_eq!(f, None, "all entries past window → no constraint");
}
#[tokio::test]
async fn registry_composes_real_contributors() {
let store = Arc::new(open_manifest_store());
let tenant = TenantId::new(1);
let mut m = HnswManifest::new(tenant, "minilm", 384, DistanceMetric::Cosine);
m.source_log_watermark = 50;
store.upsert(&m).unwrap();
let r = RetentionRegistry::new()
.with(Arc::new(HnswWatermarkContributor::new(store)))
.with(Arc::new(FollowerLagContributor::new(|_| Some(30))))
.with(Arc::new(BackupWatermarkContributor::new(|_| Some(100))));
let w = r.safe_purge_watermark(tenant).await.unwrap();
assert_eq!(w.min_required, Some(31));
assert_eq!(w.safe_purge_through, Some(30));
assert_eq!(w.binding_contributor(), Some(("follower_lag", 31)));
}
#[tokio::test]
async fn registry_returns_no_constraint_when_all_disabled() {
let store = Arc::new(open_manifest_store());
let r = RetentionRegistry::new()
.with(Arc::new(HnswWatermarkContributor::new(store)))
.with(Arc::new(FollowerLagContributor::disabled()))
.with(Arc::new(BackupWatermarkContributor::disabled()));
let w = r.safe_purge_watermark(TenantId::new(99)).await.unwrap();
assert_eq!(w.safe_purge_through, None);
assert_eq!(w.binding_contributor(), None);
}
}