use common::{DakeraError, NamespaceId, Result};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use crate::traits::VectorStorage;
#[derive(Debug, Clone)]
pub struct TtlConfig {
pub cleanup_interval: Duration,
pub batch_size: usize,
pub cleanup_on_startup: bool,
pub max_cleanup_duration: Duration,
pub enabled: bool,
pub excluded_namespaces: Vec<NamespaceId>,
pub grace_period_seconds: u64,
}
impl Default for TtlConfig {
fn default() -> Self {
Self {
cleanup_interval: Duration::from_secs(60),
batch_size: 10000,
cleanup_on_startup: true,
max_cleanup_duration: Duration::from_secs(30),
enabled: true,
excluded_namespaces: Vec::new(),
grace_period_seconds: 0,
}
}
}
impl TtlConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_cleanup_interval(mut self, interval: Duration) -> Self {
self.cleanup_interval = interval;
self
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn with_cleanup_on_startup(mut self, cleanup: bool) -> Self {
self.cleanup_on_startup = cleanup;
self
}
pub fn with_max_cleanup_duration(mut self, duration: Duration) -> Self {
self.max_cleanup_duration = duration;
self
}
pub fn disabled() -> Self {
Self {
enabled: false,
..Default::default()
}
}
pub fn with_excluded_namespaces(mut self, namespaces: Vec<NamespaceId>) -> Self {
self.excluded_namespaces = namespaces;
self
}
pub fn with_grace_period(mut self, seconds: u64) -> Self {
self.grace_period_seconds = seconds;
self
}
}
#[derive(Debug, Clone, Default)]
pub struct NamespaceTtlPolicy {
pub default_ttl_seconds: Option<u64>,
pub max_ttl_seconds: Option<u64>,
pub min_ttl_seconds: Option<u64>,
pub ttl_required: bool,
pub custom_cleanup_interval: Option<Duration>,
pub exempt_from_cleanup: bool,
}
impl NamespaceTtlPolicy {
pub fn with_default_ttl(seconds: u64) -> Self {
Self {
default_ttl_seconds: Some(seconds),
..Default::default()
}
}
pub fn required() -> Self {
Self {
ttl_required: true,
..Default::default()
}
}
pub fn exempt() -> Self {
Self {
exempt_from_cleanup: true,
..Default::default()
}
}
pub fn with_max_ttl(mut self, seconds: u64) -> Self {
self.max_ttl_seconds = Some(seconds);
self
}
pub fn with_min_ttl(mut self, seconds: u64) -> Self {
self.min_ttl_seconds = Some(seconds);
self
}
pub fn apply(&self, ttl_seconds: Option<u64>) -> Result<Option<u64>> {
let ttl = match ttl_seconds {
Some(t) => Some(t),
None => {
if self.ttl_required && self.default_ttl_seconds.is_none() {
return Err(DakeraError::InvalidRequest(
"TTL is required for this namespace".to_string(),
));
}
self.default_ttl_seconds
}
};
if let Some(t) = ttl {
if let Some(min) = self.min_ttl_seconds {
if t < min {
return Err(DakeraError::InvalidRequest(format!(
"TTL {} is below minimum {} seconds",
t, min
)));
}
}
if let Some(max) = self.max_ttl_seconds {
if t > max {
return Ok(Some(max)); }
}
}
Ok(ttl)
}
}
#[derive(Debug, Clone, Default)]
pub struct TtlStats {
pub total_cleaned: u64,
pub last_cleanup_count: u64,
pub last_cleanup_duration_ms: u64,
pub last_cleanup_at: u64,
pub cleanup_runs: u64,
pub failed_cleanups: u64,
pub avg_cleaned_per_run: f64,
pub namespace_stats: HashMap<NamespaceId, NamespaceCleanupStats>,
}
#[derive(Debug, Clone, Default)]
pub struct NamespaceCleanupStats {
pub total_cleaned: u64,
pub last_cleanup_at: u64,
pub last_cleanup_count: u64,
}
struct AtomicTtlStats {
total_cleaned: AtomicU64,
last_cleanup_count: AtomicU64,
last_cleanup_duration_ms: AtomicU64,
last_cleanup_at: AtomicU64,
cleanup_runs: AtomicU64,
failed_cleanups: AtomicU64,
namespace_stats: RwLock<HashMap<NamespaceId, NamespaceCleanupStats>>,
}
impl AtomicTtlStats {
fn new() -> Self {
Self {
total_cleaned: AtomicU64::new(0),
last_cleanup_count: AtomicU64::new(0),
last_cleanup_duration_ms: AtomicU64::new(0),
last_cleanup_at: AtomicU64::new(0),
cleanup_runs: AtomicU64::new(0),
failed_cleanups: AtomicU64::new(0),
namespace_stats: RwLock::new(HashMap::new()),
}
}
fn record_cleanup(
&self,
count: u64,
duration_ms: u64,
namespace_counts: &HashMap<NamespaceId, u64>,
) {
self.total_cleaned.fetch_add(count, Ordering::SeqCst);
self.last_cleanup_count.store(count, Ordering::SeqCst);
self.last_cleanup_duration_ms
.store(duration_ms, Ordering::SeqCst);
self.cleanup_runs.fetch_add(1, Ordering::SeqCst);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.last_cleanup_at.store(now, Ordering::SeqCst);
let mut ns_stats = self.namespace_stats.write();
for (namespace, count) in namespace_counts {
let stats = ns_stats.entry(namespace.clone()).or_default();
stats.total_cleaned += count;
stats.last_cleanup_at = now;
stats.last_cleanup_count = *count;
}
}
fn record_failure(&self) {
self.failed_cleanups.fetch_add(1, Ordering::SeqCst);
}
fn snapshot(&self) -> TtlStats {
let cleanup_runs = self.cleanup_runs.load(Ordering::SeqCst);
let total_cleaned = self.total_cleaned.load(Ordering::SeqCst);
TtlStats {
total_cleaned,
last_cleanup_count: self.last_cleanup_count.load(Ordering::SeqCst),
last_cleanup_duration_ms: self.last_cleanup_duration_ms.load(Ordering::SeqCst),
last_cleanup_at: self.last_cleanup_at.load(Ordering::SeqCst),
cleanup_runs,
failed_cleanups: self.failed_cleanups.load(Ordering::SeqCst),
avg_cleaned_per_run: if cleanup_runs > 0 {
total_cleaned as f64 / cleanup_runs as f64
} else {
0.0
},
namespace_stats: self.namespace_stats.read().clone(),
}
}
}
enum TtlCommand {
RunCleanup,
UpdateConfig(TtlConfig),
Shutdown,
}
pub struct TtlManager<S: VectorStorage> {
storage: Arc<S>,
config: RwLock<TtlConfig>,
policies: RwLock<HashMap<NamespaceId, NamespaceTtlPolicy>>,
stats: Arc<AtomicTtlStats>,
running: AtomicBool,
command_tx: Option<mpsc::Sender<TtlCommand>>,
}
impl<S: VectorStorage + 'static> TtlManager<S> {
pub fn new(storage: Arc<S>, config: TtlConfig) -> Self {
Self {
storage,
config: RwLock::new(config),
policies: RwLock::new(HashMap::new()),
stats: Arc::new(AtomicTtlStats::new()),
running: AtomicBool::new(false),
command_tx: None,
}
}
pub fn with_defaults(storage: Arc<S>) -> Self {
Self::new(storage, TtlConfig::default())
}
pub fn config(&self) -> TtlConfig {
self.config.read().clone()
}
pub fn update_config(&self, config: TtlConfig) {
*self.config.write() = config.clone();
if let Some(tx) = &self.command_tx {
let _ = tx.try_send(TtlCommand::UpdateConfig(config));
}
}
pub fn set_policy(&self, namespace: &NamespaceId, policy: NamespaceTtlPolicy) {
self.policies.write().insert(namespace.clone(), policy);
tracing::info!(namespace = %namespace, "TTL policy updated");
}
pub fn get_policy(&self, namespace: &NamespaceId) -> Option<NamespaceTtlPolicy> {
self.policies.read().get(namespace).cloned()
}
pub fn remove_policy(&self, namespace: &NamespaceId) -> Option<NamespaceTtlPolicy> {
self.policies.write().remove(namespace)
}
pub fn list_policies(&self) -> HashMap<NamespaceId, NamespaceTtlPolicy> {
self.policies.read().clone()
}
pub fn stats(&self) -> TtlStats {
self.stats.snapshot()
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub fn validate_ttl(
&self,
namespace: &NamespaceId,
ttl_seconds: Option<u64>,
) -> Result<Option<u64>> {
if let Some(policy) = self.policies.read().get(namespace) {
policy.apply(ttl_seconds)
} else {
Ok(ttl_seconds)
}
}
pub async fn run_cleanup(&self) -> Result<TtlCleanupResult> {
let config = self.config.read().clone();
let policies = self.policies.read().clone();
let start = Instant::now();
let mut total_cleaned = 0u64;
let mut namespace_counts: HashMap<NamespaceId, u64> = HashMap::new();
let mut errors = Vec::new();
let namespaces = match self.storage.list_namespaces().await {
Ok(ns) => ns,
Err(e) => {
self.stats.record_failure();
return Err(e);
}
};
for namespace in namespaces {
if config.excluded_namespaces.contains(&namespace) {
continue;
}
if let Some(policy) = policies.get(&namespace) {
if policy.exempt_from_cleanup {
continue;
}
}
if start.elapsed() > config.max_cleanup_duration {
tracing::warn!(
"TTL cleanup timeout reached after {:?}, stopping early",
start.elapsed()
);
break;
}
match self.storage.cleanup_expired(&namespace).await {
Ok(cleaned) => {
if cleaned > 0 {
total_cleaned += cleaned as u64;
namespace_counts.insert(namespace.clone(), cleaned as u64);
tracing::debug!(
namespace = %namespace,
cleaned = cleaned,
"Cleaned expired vectors"
);
}
}
Err(e) => {
errors.push((namespace.clone(), e.to_string()));
tracing::error!(
namespace = %namespace,
error = %e,
"Failed to cleanup namespace"
);
}
}
}
let duration = start.elapsed();
self.stats.record_cleanup(
total_cleaned,
duration.as_millis() as u64,
&namespace_counts,
);
tracing::info!(
total_cleaned = total_cleaned,
duration_ms = duration.as_millis(),
namespaces_cleaned = namespace_counts.len(),
errors = errors.len(),
"TTL cleanup completed"
);
Ok(TtlCleanupResult {
total_cleaned,
duration,
namespace_counts,
errors,
})
}
pub fn trigger_cleanup(&self) {
if let Some(tx) = &self.command_tx {
let _ = tx.try_send(TtlCommand::RunCleanup);
}
}
pub fn shutdown(&self) {
if let Some(tx) = &self.command_tx {
let _ = tx.try_send(TtlCommand::Shutdown);
}
}
}
#[derive(Debug, Clone)]
pub struct TtlCleanupResult {
pub total_cleaned: u64,
pub duration: Duration,
pub namespace_counts: HashMap<NamespaceId, u64>,
pub errors: Vec<(NamespaceId, String)>,
}
pub struct TtlService<S: VectorStorage + 'static> {
manager: Arc<TtlManager<S>>,
command_rx: mpsc::Receiver<TtlCommand>,
}
impl<S: VectorStorage + 'static> TtlService<S> {
pub fn new(storage: Arc<S>, config: TtlConfig) -> (Arc<TtlManager<S>>, Self) {
let (tx, rx) = mpsc::channel(16);
let manager = Arc::new(TtlManager {
storage,
config: RwLock::new(config),
policies: RwLock::new(HashMap::new()),
stats: Arc::new(AtomicTtlStats::new()),
running: AtomicBool::new(false),
command_tx: Some(tx),
});
let service = Self {
manager: Arc::clone(&manager),
command_rx: rx,
};
(manager, service)
}
pub async fn run(mut self) {
let config = self.manager.config();
self.manager.running.store(true, Ordering::SeqCst);
tracing::info!(
interval_secs = config.cleanup_interval.as_secs(),
enabled = config.enabled,
"TTL service started"
);
if config.cleanup_on_startup && config.enabled {
tracing::debug!("Running startup cleanup");
if let Err(e) = self.manager.run_cleanup().await {
tracing::error!(error = %e, "Startup cleanup failed");
}
}
let mut interval = tokio::time::interval(config.cleanup_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = interval.tick() => {
let current_config = self.manager.config();
if current_config.enabled {
if let Err(e) = self.manager.run_cleanup().await {
tracing::error!(error = %e, "Scheduled cleanup failed");
}
}
}
Some(cmd) = self.command_rx.recv() => {
match cmd {
TtlCommand::RunCleanup => {
if let Err(e) = self.manager.run_cleanup().await {
tracing::error!(error = %e, "Manual cleanup failed");
}
}
TtlCommand::UpdateConfig(new_config) => {
interval = tokio::time::interval(new_config.cleanup_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tracing::info!("TTL config updated");
}
TtlCommand::Shutdown => {
tracing::info!("TTL service shutting down");
break;
}
}
}
}
}
self.manager.running.store(false, Ordering::SeqCst);
tracing::info!("TTL service stopped");
}
pub fn spawn(self) -> tokio::task::JoinHandle<()> {
tokio::spawn(self.run())
}
}
pub struct TtlAwareStorage<S: VectorStorage> {
inner: Arc<S>,
manager: Arc<TtlManager<S>>,
}
impl<S: VectorStorage + 'static> TtlAwareStorage<S> {
pub fn new(storage: Arc<S>, manager: Arc<TtlManager<S>>) -> Self {
Self {
inner: storage,
manager,
}
}
pub fn inner(&self) -> &Arc<S> {
&self.inner
}
pub fn manager(&self) -> &Arc<TtlManager<S>> {
&self.manager
}
pub async fn upsert_with_ttl(
&self,
namespace: &NamespaceId,
mut vectors: Vec<common::Vector>,
) -> Result<usize> {
for vector in &mut vectors {
let validated_ttl = self.manager.validate_ttl(namespace, vector.ttl_seconds)?;
vector.ttl_seconds = validated_ttl;
if vector.ttl_seconds.is_some() {
vector.apply_ttl();
}
}
self.inner.upsert(namespace, vectors).await
}
}
pub fn calculate_expiration(ttl_seconds: u64) -> u64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
now + ttl_seconds
}
pub fn is_expired(expires_at: u64) -> bool {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
now >= expires_at
}
pub fn remaining_ttl(expires_at: u64) -> Option<u64> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if now < expires_at {
Some(expires_at - now)
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use common::Vector;
struct MockStorage {
namespaces: RwLock<HashMap<NamespaceId, Vec<Vector>>>,
}
impl MockStorage {
fn new() -> Self {
Self {
namespaces: RwLock::new(HashMap::new()),
}
}
fn with_vectors(namespace: &str, vectors: Vec<Vector>) -> Self {
let mut map = HashMap::new();
map.insert(namespace.to_string(), vectors);
Self {
namespaces: RwLock::new(map),
}
}
}
#[async_trait::async_trait]
impl VectorStorage for MockStorage {
async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
let count = vectors.len();
self.namespaces
.write()
.entry(namespace.clone())
.or_default()
.extend(vectors);
Ok(count)
}
async fn get(
&self,
namespace: &NamespaceId,
ids: &[common::VectorId],
) -> Result<Vec<Vector>> {
let ns = self.namespaces.read();
if let Some(vectors) = ns.get(namespace) {
Ok(vectors
.iter()
.filter(|v| ids.contains(&v.id))
.cloned()
.collect())
} else {
Ok(vec![])
}
}
async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
let ns = self.namespaces.read();
Ok(ns.get(namespace).cloned().unwrap_or_default())
}
async fn delete(&self, namespace: &NamespaceId, ids: &[common::VectorId]) -> Result<usize> {
let mut ns = self.namespaces.write();
if let Some(vectors) = ns.get_mut(namespace) {
let before = vectors.len();
vectors.retain(|v| !ids.contains(&v.id));
Ok(before - vectors.len())
} else {
Ok(0)
}
}
async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
Ok(self.namespaces.read().contains_key(namespace))
}
async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
self.namespaces
.write()
.entry(namespace.clone())
.or_default();
Ok(())
}
async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
let ns = self.namespaces.read();
Ok(ns.get(namespace).map(|v| v.len()).unwrap_or(0))
}
async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
let ns = self.namespaces.read();
Ok(ns
.get(namespace)
.and_then(|v| v.first().map(|vec| vec.values.len())))
}
async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
Ok(self.namespaces.read().keys().cloned().collect())
}
async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
Ok(self.namespaces.write().remove(namespace).is_some())
}
async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
let mut ns = self.namespaces.write();
if let Some(vectors) = ns.get_mut(namespace) {
let before = vectors.len();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
vectors.retain(|v| !v.is_expired_at(now));
Ok(before - vectors.len())
} else {
Ok(0)
}
}
async fn cleanup_all_expired(&self) -> Result<usize> {
let mut total = 0;
let namespaces: Vec<_> = self.namespaces.read().keys().cloned().collect();
for ns in namespaces {
total += self.cleanup_expired(&ns).await?;
}
Ok(total)
}
}
#[test]
fn test_ttl_config_builder() {
let config = TtlConfig::new()
.with_cleanup_interval(Duration::from_secs(120))
.with_batch_size(5000)
.with_cleanup_on_startup(false)
.with_grace_period(10);
assert_eq!(config.cleanup_interval, Duration::from_secs(120));
assert_eq!(config.batch_size, 5000);
assert!(!config.cleanup_on_startup);
assert_eq!(config.grace_period_seconds, 10);
assert!(config.enabled);
}
#[test]
fn test_ttl_config_disabled() {
let config = TtlConfig::disabled();
assert!(!config.enabled);
}
#[test]
fn test_namespace_policy_default_ttl() {
let policy = NamespaceTtlPolicy::with_default_ttl(3600);
let result = policy.apply(None).unwrap();
assert_eq!(result, Some(3600));
let result = policy.apply(Some(7200)).unwrap();
assert_eq!(result, Some(7200));
}
#[test]
fn test_namespace_policy_required_ttl() {
let policy = NamespaceTtlPolicy::required();
let result = policy.apply(None);
assert!(result.is_err());
let result = policy.apply(Some(3600)).unwrap();
assert_eq!(result, Some(3600));
}
#[test]
fn test_namespace_policy_max_ttl() {
let policy = NamespaceTtlPolicy::default().with_max_ttl(3600);
let result = policy.apply(Some(7200)).unwrap();
assert_eq!(result, Some(3600));
let result = policy.apply(Some(1800)).unwrap();
assert_eq!(result, Some(1800));
}
#[test]
fn test_namespace_policy_min_ttl() {
let policy = NamespaceTtlPolicy::default().with_min_ttl(600);
let result = policy.apply(Some(300));
assert!(result.is_err());
let result = policy.apply(Some(900)).unwrap();
assert_eq!(result, Some(900));
}
#[test]
fn test_namespace_policy_exempt() {
let policy = NamespaceTtlPolicy::exempt();
assert!(policy.exempt_from_cleanup);
}
#[tokio::test]
async fn test_ttl_manager_basic() {
let storage = Arc::new(MockStorage::new());
let manager = TtlManager::new(storage, TtlConfig::default());
assert!(!manager.is_running());
assert_eq!(manager.stats().total_cleaned, 0);
}
#[tokio::test]
async fn test_ttl_manager_policy() {
let storage = Arc::new(MockStorage::new());
let manager = TtlManager::new(storage, TtlConfig::default());
let policy = NamespaceTtlPolicy::with_default_ttl(3600);
manager.set_policy(&"test".to_string(), policy.clone());
let retrieved = manager.get_policy(&"test".to_string()).unwrap();
assert_eq!(retrieved.default_ttl_seconds, Some(3600));
manager.remove_policy(&"test".to_string());
assert!(manager.get_policy(&"test".to_string()).is_none());
}
#[tokio::test]
async fn test_ttl_manager_validate_ttl() {
let storage = Arc::new(MockStorage::new());
let manager = TtlManager::new(storage, TtlConfig::default());
let result = manager
.validate_ttl(&"ns1".to_string(), Some(3600))
.unwrap();
assert_eq!(result, Some(3600));
let policy = NamespaceTtlPolicy::with_default_ttl(1800);
manager.set_policy(&"ns2".to_string(), policy);
let result = manager.validate_ttl(&"ns2".to_string(), None).unwrap();
assert_eq!(result, Some(1800));
}
#[tokio::test]
async fn test_ttl_manager_cleanup() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let vectors = vec![
Vector {
id: "v1".to_string(),
values: vec![1.0],
metadata: None,
ttl_seconds: None,
expires_at: Some(now - 100), },
Vector {
id: "v2".to_string(),
values: vec![2.0],
metadata: None,
ttl_seconds: None,
expires_at: Some(now + 3600), },
Vector {
id: "v3".to_string(),
values: vec![3.0],
metadata: None,
ttl_seconds: None,
expires_at: None, },
];
let storage = Arc::new(MockStorage::with_vectors("test", vectors));
let manager = TtlManager::new(storage.clone(), TtlConfig::default());
let result = manager.run_cleanup().await.unwrap();
assert_eq!(result.total_cleaned, 1);
assert_eq!(result.namespace_counts.get("test"), Some(&1));
let remaining = storage.get_all(&"test".to_string()).await.unwrap();
assert_eq!(remaining.len(), 2);
assert!(remaining.iter().any(|v| v.id == "v2"));
assert!(remaining.iter().any(|v| v.id == "v3"));
}
#[tokio::test]
async fn test_ttl_manager_excluded_namespace() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let expired_vector = Vector {
id: "v1".to_string(),
values: vec![1.0],
metadata: None,
ttl_seconds: None,
expires_at: Some(now - 100),
};
let storage = Arc::new(MockStorage::with_vectors("excluded", vec![expired_vector]));
let config = TtlConfig::default().with_excluded_namespaces(vec!["excluded".to_string()]);
let manager = TtlManager::new(storage.clone(), config);
let result = manager.run_cleanup().await.unwrap();
assert_eq!(result.total_cleaned, 0);
let remaining = storage.get_all(&"excluded".to_string()).await.unwrap();
assert_eq!(remaining.len(), 1);
}
#[tokio::test]
async fn test_ttl_manager_exempt_policy() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let expired_vector = Vector {
id: "v1".to_string(),
values: vec![1.0],
metadata: None,
ttl_seconds: None,
expires_at: Some(now - 100),
};
let storage = Arc::new(MockStorage::with_vectors("exempt_ns", vec![expired_vector]));
let manager = TtlManager::new(storage.clone(), TtlConfig::default());
manager.set_policy(&"exempt_ns".to_string(), NamespaceTtlPolicy::exempt());
let result = manager.run_cleanup().await.unwrap();
assert_eq!(result.total_cleaned, 0);
}
#[tokio::test]
async fn test_ttl_stats() {
let storage = Arc::new(MockStorage::new());
let manager = TtlManager::new(storage, TtlConfig::default());
let stats = manager.stats();
assert_eq!(stats.total_cleaned, 0);
assert_eq!(stats.cleanup_runs, 0);
assert_eq!(stats.failed_cleanups, 0);
}
#[test]
fn test_calculate_expiration() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let expires = calculate_expiration(3600);
assert!(expires > now);
assert!(expires <= now + 3600 + 1); }
#[test]
fn test_is_expired() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(is_expired(now - 100));
assert!(!is_expired(now + 100));
}
#[test]
fn test_remaining_ttl() {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
assert_eq!(remaining_ttl(now - 100), None);
let remaining = remaining_ttl(now + 100);
assert!(remaining.is_some());
assert!(remaining.unwrap() <= 100);
}
#[tokio::test]
async fn test_ttl_aware_storage() {
let storage = Arc::new(MockStorage::new());
let manager = Arc::new(TtlManager::new(Arc::clone(&storage), TtlConfig::default()));
manager.set_policy(
&"test".to_string(),
NamespaceTtlPolicy::with_default_ttl(3600),
);
let ttl_storage = TtlAwareStorage::new(Arc::clone(&storage), Arc::clone(&manager));
let vectors = vec![Vector {
id: "v1".to_string(),
values: vec![1.0],
metadata: None,
ttl_seconds: None,
expires_at: None,
}];
ttl_storage
.upsert_with_ttl(&"test".to_string(), vectors)
.await
.unwrap();
let stored = storage.get_all(&"test".to_string()).await.unwrap();
assert_eq!(stored.len(), 1);
assert!(stored[0].expires_at.is_some()); }
#[tokio::test]
async fn test_ttl_service_creation() {
let storage = Arc::new(MockStorage::new());
let config = TtlConfig::default().with_cleanup_on_startup(false);
let (manager, _service) = TtlService::new(storage, config);
assert!(!manager.is_running());
assert!(manager.config().enabled);
}
#[test]
fn test_list_policies() {
let storage = Arc::new(MockStorage::new());
let manager = TtlManager::new(storage, TtlConfig::default());
manager.set_policy(
&"ns1".to_string(),
NamespaceTtlPolicy::with_default_ttl(3600),
);
manager.set_policy(&"ns2".to_string(), NamespaceTtlPolicy::required());
let policies = manager.list_policies();
assert_eq!(policies.len(), 2);
assert!(policies.contains_key(&"ns1".to_string()));
assert!(policies.contains_key(&"ns2".to_string()));
}
}