Skip to main content

storage/
ttl.rs

1//! TTL (Time-to-Live) Management for Automatic Vector Expiration
2//!
3//! This module provides:
4//! - Background cleanup service for expired vectors
5//! - Namespace-specific TTL policies
6//! - Configurable cleanup intervals and batch sizes
7//! - Statistics tracking and monitoring
8//! - Graceful shutdown support
9
10use common::{DakeraError, NamespaceId, Result};
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::mpsc;
17
18use crate::traits::VectorStorage;
19
20// ============================================================================
21// TTL Configuration
22// ============================================================================
23
24/// Configuration for TTL cleanup service
25#[derive(Debug, Clone)]
26pub struct TtlConfig {
27    /// Interval between cleanup runs (default: 60 seconds)
28    pub cleanup_interval: Duration,
29    /// Maximum vectors to process per cleanup batch (default: 10000)
30    pub batch_size: usize,
31    /// Whether to run cleanup on startup (default: true)
32    pub cleanup_on_startup: bool,
33    /// Maximum time to spend on a single cleanup run (default: 30 seconds)
34    pub max_cleanup_duration: Duration,
35    /// Whether cleanup is enabled (default: true)
36    pub enabled: bool,
37    /// Namespaces to exclude from automatic cleanup
38    pub excluded_namespaces: Vec<NamespaceId>,
39    /// Minimum age before deletion (grace period) in seconds (default: 0)
40    pub grace_period_seconds: u64,
41}
42
43impl Default for TtlConfig {
44    fn default() -> Self {
45        Self {
46            cleanup_interval: Duration::from_secs(60),
47            batch_size: 10000,
48            cleanup_on_startup: true,
49            max_cleanup_duration: Duration::from_secs(30),
50            enabled: true,
51            excluded_namespaces: Vec::new(),
52            grace_period_seconds: 0,
53        }
54    }
55}
56
57impl TtlConfig {
58    /// Create a new TTL config with default values
59    pub fn new() -> Self {
60        Self::default()
61    }
62
63    /// Set cleanup interval
64    pub fn with_cleanup_interval(mut self, interval: Duration) -> Self {
65        self.cleanup_interval = interval;
66        self
67    }
68
69    /// Set batch size
70    pub fn with_batch_size(mut self, size: usize) -> Self {
71        self.batch_size = size;
72        self
73    }
74
75    /// Set whether to cleanup on startup
76    pub fn with_cleanup_on_startup(mut self, cleanup: bool) -> Self {
77        self.cleanup_on_startup = cleanup;
78        self
79    }
80
81    /// Set maximum cleanup duration
82    pub fn with_max_cleanup_duration(mut self, duration: Duration) -> Self {
83        self.max_cleanup_duration = duration;
84        self
85    }
86
87    /// Disable TTL cleanup
88    pub fn disabled() -> Self {
89        Self {
90            enabled: false,
91            ..Default::default()
92        }
93    }
94
95    /// Add excluded namespaces
96    pub fn with_excluded_namespaces(mut self, namespaces: Vec<NamespaceId>) -> Self {
97        self.excluded_namespaces = namespaces;
98        self
99    }
100
101    /// Set grace period
102    pub fn with_grace_period(mut self, seconds: u64) -> Self {
103        self.grace_period_seconds = seconds;
104        self
105    }
106}
107
108// ============================================================================
109// Namespace TTL Policy
110// ============================================================================
111
112/// TTL policy for a specific namespace
113#[derive(Debug, Clone, Default)]
114pub struct NamespaceTtlPolicy {
115    /// Default TTL for vectors without explicit TTL (None = no default)
116    pub default_ttl_seconds: Option<u64>,
117    /// Maximum allowed TTL (None = unlimited)
118    pub max_ttl_seconds: Option<u64>,
119    /// Minimum allowed TTL (None = no minimum)
120    pub min_ttl_seconds: Option<u64>,
121    /// Whether TTL is required for all vectors
122    pub ttl_required: bool,
123    /// Custom cleanup interval for this namespace (overrides global)
124    pub custom_cleanup_interval: Option<Duration>,
125    /// Whether this namespace is exempt from automatic cleanup
126    pub exempt_from_cleanup: bool,
127}
128
129impl NamespaceTtlPolicy {
130    /// Create a new policy with default TTL
131    pub fn with_default_ttl(seconds: u64) -> Self {
132        Self {
133            default_ttl_seconds: Some(seconds),
134            ..Default::default()
135        }
136    }
137
138    /// Create a policy with required TTL
139    pub fn required() -> Self {
140        Self {
141            ttl_required: true,
142            ..Default::default()
143        }
144    }
145
146    /// Create an exempt policy (no automatic cleanup)
147    pub fn exempt() -> Self {
148        Self {
149            exempt_from_cleanup: true,
150            ..Default::default()
151        }
152    }
153
154    /// Set maximum TTL
155    pub fn with_max_ttl(mut self, seconds: u64) -> Self {
156        self.max_ttl_seconds = Some(seconds);
157        self
158    }
159
160    /// Set minimum TTL
161    pub fn with_min_ttl(mut self, seconds: u64) -> Self {
162        self.min_ttl_seconds = Some(seconds);
163        self
164    }
165
166    /// Validate and apply policy to a TTL value
167    /// Returns the effective TTL (possibly clamped or defaulted)
168    pub fn apply(&self, ttl_seconds: Option<u64>) -> Result<Option<u64>> {
169        let ttl = match ttl_seconds {
170            Some(t) => Some(t),
171            None => {
172                if self.ttl_required && self.default_ttl_seconds.is_none() {
173                    return Err(DakeraError::InvalidRequest(
174                        "TTL is required for this namespace".to_string(),
175                    ));
176                }
177                self.default_ttl_seconds
178            }
179        };
180
181        if let Some(t) = ttl {
182            // Check minimum
183            if let Some(min) = self.min_ttl_seconds {
184                if t < min {
185                    return Err(DakeraError::InvalidRequest(format!(
186                        "TTL {} is below minimum {} seconds",
187                        t, min
188                    )));
189                }
190            }
191
192            // Check maximum and clamp if needed
193            if let Some(max) = self.max_ttl_seconds {
194                if t > max {
195                    return Ok(Some(max)); // Clamp to maximum
196                }
197            }
198        }
199
200        Ok(ttl)
201    }
202}
203
204// ============================================================================
205// TTL Statistics
206// ============================================================================
207
208/// Statistics for TTL cleanup operations
209#[derive(Debug, Clone, Default)]
210pub struct TtlStats {
211    /// Total vectors cleaned up since service start
212    pub total_cleaned: u64,
213    /// Vectors cleaned in the last cleanup run
214    pub last_cleanup_count: u64,
215    /// Duration of the last cleanup run
216    pub last_cleanup_duration_ms: u64,
217    /// Timestamp of last cleanup (Unix epoch seconds)
218    pub last_cleanup_at: u64,
219    /// Number of cleanup runs performed
220    pub cleanup_runs: u64,
221    /// Number of failed cleanup attempts
222    pub failed_cleanups: u64,
223    /// Average vectors cleaned per run
224    pub avg_cleaned_per_run: f64,
225    /// Per-namespace cleanup counts
226    pub namespace_stats: HashMap<NamespaceId, NamespaceCleanupStats>,
227}
228
229/// Per-namespace cleanup statistics
230#[derive(Debug, Clone, Default)]
231pub struct NamespaceCleanupStats {
232    /// Total vectors cleaned from this namespace
233    pub total_cleaned: u64,
234    /// Last cleanup timestamp
235    pub last_cleanup_at: u64,
236    /// Last cleanup count
237    pub last_cleanup_count: u64,
238}
239
240/// Internal atomic stats for thread-safe updates
241struct AtomicTtlStats {
242    total_cleaned: AtomicU64,
243    last_cleanup_count: AtomicU64,
244    last_cleanup_duration_ms: AtomicU64,
245    last_cleanup_at: AtomicU64,
246    cleanup_runs: AtomicU64,
247    failed_cleanups: AtomicU64,
248    namespace_stats: RwLock<HashMap<NamespaceId, NamespaceCleanupStats>>,
249}
250
251impl AtomicTtlStats {
252    fn new() -> Self {
253        Self {
254            total_cleaned: AtomicU64::new(0),
255            last_cleanup_count: AtomicU64::new(0),
256            last_cleanup_duration_ms: AtomicU64::new(0),
257            last_cleanup_at: AtomicU64::new(0),
258            cleanup_runs: AtomicU64::new(0),
259            failed_cleanups: AtomicU64::new(0),
260            namespace_stats: RwLock::new(HashMap::new()),
261        }
262    }
263
264    fn record_cleanup(
265        &self,
266        count: u64,
267        duration_ms: u64,
268        namespace_counts: &HashMap<NamespaceId, u64>,
269    ) {
270        self.total_cleaned.fetch_add(count, Ordering::SeqCst);
271        self.last_cleanup_count.store(count, Ordering::SeqCst);
272        self.last_cleanup_duration_ms
273            .store(duration_ms, Ordering::SeqCst);
274        self.cleanup_runs.fetch_add(1, Ordering::SeqCst);
275
276        let now = std::time::SystemTime::now()
277            .duration_since(std::time::UNIX_EPOCH)
278            .unwrap_or_default()
279            .as_secs();
280        self.last_cleanup_at.store(now, Ordering::SeqCst);
281
282        // Update namespace stats
283        let mut ns_stats = self.namespace_stats.write();
284        for (namespace, count) in namespace_counts {
285            let stats = ns_stats.entry(namespace.clone()).or_default();
286            stats.total_cleaned += count;
287            stats.last_cleanup_at = now;
288            stats.last_cleanup_count = *count;
289        }
290    }
291
292    fn record_failure(&self) {
293        self.failed_cleanups.fetch_add(1, Ordering::SeqCst);
294    }
295
296    fn snapshot(&self) -> TtlStats {
297        let cleanup_runs = self.cleanup_runs.load(Ordering::SeqCst);
298        let total_cleaned = self.total_cleaned.load(Ordering::SeqCst);
299
300        TtlStats {
301            total_cleaned,
302            last_cleanup_count: self.last_cleanup_count.load(Ordering::SeqCst),
303            last_cleanup_duration_ms: self.last_cleanup_duration_ms.load(Ordering::SeqCst),
304            last_cleanup_at: self.last_cleanup_at.load(Ordering::SeqCst),
305            cleanup_runs,
306            failed_cleanups: self.failed_cleanups.load(Ordering::SeqCst),
307            avg_cleaned_per_run: if cleanup_runs > 0 {
308                total_cleaned as f64 / cleanup_runs as f64
309            } else {
310                0.0
311            },
312            namespace_stats: self.namespace_stats.read().clone(),
313        }
314    }
315}
316
317// ============================================================================
318// TTL Manager
319// ============================================================================
320
321/// Commands for the TTL service
322enum TtlCommand {
323    /// Run cleanup immediately
324    RunCleanup,
325    /// Update configuration
326    UpdateConfig(TtlConfig),
327    /// Shutdown the service
328    Shutdown,
329}
330
331/// TTL Manager for coordinating automatic vector expiration
332pub struct TtlManager<S: VectorStorage> {
333    storage: Arc<S>,
334    config: RwLock<TtlConfig>,
335    policies: RwLock<HashMap<NamespaceId, NamespaceTtlPolicy>>,
336    stats: Arc<AtomicTtlStats>,
337    running: AtomicBool,
338    command_tx: Option<mpsc::Sender<TtlCommand>>,
339}
340
341impl<S: VectorStorage + 'static> TtlManager<S> {
342    /// Create a new TTL manager
343    pub fn new(storage: Arc<S>, config: TtlConfig) -> Self {
344        Self {
345            storage,
346            config: RwLock::new(config),
347            policies: RwLock::new(HashMap::new()),
348            stats: Arc::new(AtomicTtlStats::new()),
349            running: AtomicBool::new(false),
350            command_tx: None,
351        }
352    }
353
354    /// Create a new TTL manager with default config
355    pub fn with_defaults(storage: Arc<S>) -> Self {
356        Self::new(storage, TtlConfig::default())
357    }
358
359    /// Get current configuration
360    pub fn config(&self) -> TtlConfig {
361        self.config.read().clone()
362    }
363
364    /// Update configuration
365    pub fn update_config(&self, config: TtlConfig) {
366        *self.config.write() = config.clone();
367        if let Some(tx) = &self.command_tx {
368            let _ = tx.try_send(TtlCommand::UpdateConfig(config));
369        }
370    }
371
372    /// Set TTL policy for a namespace
373    pub fn set_policy(&self, namespace: &NamespaceId, policy: NamespaceTtlPolicy) {
374        self.policies.write().insert(namespace.clone(), policy);
375        tracing::info!(namespace = %namespace, "TTL policy updated");
376    }
377
378    /// Get TTL policy for a namespace
379    pub fn get_policy(&self, namespace: &NamespaceId) -> Option<NamespaceTtlPolicy> {
380        self.policies.read().get(namespace).cloned()
381    }
382
383    /// Remove TTL policy for a namespace
384    pub fn remove_policy(&self, namespace: &NamespaceId) -> Option<NamespaceTtlPolicy> {
385        self.policies.write().remove(namespace)
386    }
387
388    /// Get all namespace policies
389    pub fn list_policies(&self) -> HashMap<NamespaceId, NamespaceTtlPolicy> {
390        self.policies.read().clone()
391    }
392
393    /// Get current statistics
394    pub fn stats(&self) -> TtlStats {
395        self.stats.snapshot()
396    }
397
398    /// Check if the cleanup service is running
399    pub fn is_running(&self) -> bool {
400        self.running.load(Ordering::SeqCst)
401    }
402
403    /// Validate and apply TTL for a vector in a namespace
404    pub fn validate_ttl(
405        &self,
406        namespace: &NamespaceId,
407        ttl_seconds: Option<u64>,
408    ) -> Result<Option<u64>> {
409        if let Some(policy) = self.policies.read().get(namespace) {
410            policy.apply(ttl_seconds)
411        } else {
412            Ok(ttl_seconds)
413        }
414    }
415
416    /// Run cleanup once (manual trigger)
417    pub async fn run_cleanup(&self) -> Result<TtlCleanupResult> {
418        let config = self.config.read().clone();
419        let policies = self.policies.read().clone();
420
421        let start = Instant::now();
422        let mut total_cleaned = 0u64;
423        let mut namespace_counts: HashMap<NamespaceId, u64> = HashMap::new();
424        let mut errors = Vec::new();
425
426        // Get all namespaces
427        let namespaces = match self.storage.list_namespaces().await {
428            Ok(ns) => ns,
429            Err(e) => {
430                self.stats.record_failure();
431                return Err(e);
432            }
433        };
434
435        for namespace in namespaces {
436            // Check if excluded
437            if config.excluded_namespaces.contains(&namespace) {
438                continue;
439            }
440
441            // Check if exempt via policy
442            if let Some(policy) = policies.get(&namespace) {
443                if policy.exempt_from_cleanup {
444                    continue;
445                }
446            }
447
448            // Check timeout
449            if start.elapsed() > config.max_cleanup_duration {
450                tracing::warn!(
451                    "TTL cleanup timeout reached after {:?}, stopping early",
452                    start.elapsed()
453                );
454                break;
455            }
456
457            // Run cleanup for this namespace
458            match self.storage.cleanup_expired(&namespace).await {
459                Ok(cleaned) => {
460                    if cleaned > 0 {
461                        total_cleaned += cleaned as u64;
462                        namespace_counts.insert(namespace.clone(), cleaned as u64);
463                        tracing::debug!(
464                            namespace = %namespace,
465                            cleaned = cleaned,
466                            "Cleaned expired vectors"
467                        );
468                    }
469                }
470                Err(e) => {
471                    errors.push((namespace.clone(), e.to_string()));
472                    tracing::error!(
473                        namespace = %namespace,
474                        error = %e,
475                        "Failed to cleanup namespace"
476                    );
477                }
478            }
479        }
480
481        let duration = start.elapsed();
482        self.stats.record_cleanup(
483            total_cleaned,
484            duration.as_millis() as u64,
485            &namespace_counts,
486        );
487
488        tracing::info!(
489            total_cleaned = total_cleaned,
490            duration_ms = duration.as_millis(),
491            namespaces_cleaned = namespace_counts.len(),
492            errors = errors.len(),
493            "TTL cleanup completed"
494        );
495
496        Ok(TtlCleanupResult {
497            total_cleaned,
498            duration,
499            namespace_counts,
500            errors,
501        })
502    }
503
504    /// Trigger an immediate cleanup (async, doesn't wait)
505    pub fn trigger_cleanup(&self) {
506        if let Some(tx) = &self.command_tx {
507            let _ = tx.try_send(TtlCommand::RunCleanup);
508        }
509    }
510
511    /// Shutdown the TTL service
512    pub fn shutdown(&self) {
513        if let Some(tx) = &self.command_tx {
514            let _ = tx.try_send(TtlCommand::Shutdown);
515        }
516    }
517}
518
519/// Result of a TTL cleanup operation
520#[derive(Debug, Clone)]
521pub struct TtlCleanupResult {
522    /// Total vectors cleaned
523    pub total_cleaned: u64,
524    /// Duration of cleanup
525    pub duration: Duration,
526    /// Vectors cleaned per namespace
527    pub namespace_counts: HashMap<NamespaceId, u64>,
528    /// Errors encountered (namespace, error message)
529    pub errors: Vec<(NamespaceId, String)>,
530}
531
532// ============================================================================
533// TTL Service (Background Task)
534// ============================================================================
535
536/// Background TTL cleanup service
537pub struct TtlService<S: VectorStorage + 'static> {
538    manager: Arc<TtlManager<S>>,
539    command_rx: mpsc::Receiver<TtlCommand>,
540}
541
542impl<S: VectorStorage + 'static> TtlService<S> {
543    /// Create a new TTL service with its manager
544    pub fn new(storage: Arc<S>, config: TtlConfig) -> (Arc<TtlManager<S>>, Self) {
545        let (tx, rx) = mpsc::channel(16);
546
547        let manager = Arc::new(TtlManager {
548            storage,
549            config: RwLock::new(config),
550            policies: RwLock::new(HashMap::new()),
551            stats: Arc::new(AtomicTtlStats::new()),
552            running: AtomicBool::new(false),
553            command_tx: Some(tx),
554        });
555
556        let service = Self {
557            manager: Arc::clone(&manager),
558            command_rx: rx,
559        };
560
561        (manager, service)
562    }
563
564    /// Run the TTL service (blocking)
565    pub async fn run(mut self) {
566        let config = self.manager.config();
567
568        self.manager.running.store(true, Ordering::SeqCst);
569        tracing::info!(
570            interval_secs = config.cleanup_interval.as_secs(),
571            enabled = config.enabled,
572            "TTL service started"
573        );
574
575        // Run initial cleanup if configured
576        if config.cleanup_on_startup && config.enabled {
577            tracing::debug!("Running startup cleanup");
578            if let Err(e) = self.manager.run_cleanup().await {
579                tracing::error!(error = %e, "Startup cleanup failed");
580            }
581        }
582
583        let mut interval = tokio::time::interval(config.cleanup_interval);
584        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
585
586        loop {
587            tokio::select! {
588                _ = interval.tick() => {
589                    let current_config = self.manager.config();
590                    if current_config.enabled {
591                        if let Err(e) = self.manager.run_cleanup().await {
592                            tracing::error!(error = %e, "Scheduled cleanup failed");
593                        }
594                    }
595                }
596                Some(cmd) = self.command_rx.recv() => {
597                    match cmd {
598                        TtlCommand::RunCleanup => {
599                            if let Err(e) = self.manager.run_cleanup().await {
600                                tracing::error!(error = %e, "Manual cleanup failed");
601                            }
602                        }
603                        TtlCommand::UpdateConfig(new_config) => {
604                            interval = tokio::time::interval(new_config.cleanup_interval);
605                            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
606                            tracing::info!("TTL config updated");
607                        }
608                        TtlCommand::Shutdown => {
609                            tracing::info!("TTL service shutting down");
610                            break;
611                        }
612                    }
613                }
614            }
615        }
616
617        self.manager.running.store(false, Ordering::SeqCst);
618        tracing::info!("TTL service stopped");
619    }
620
621    /// Spawn the service as a background task
622    pub fn spawn(self) -> tokio::task::JoinHandle<()> {
623        tokio::spawn(self.run())
624    }
625}
626
627// ============================================================================
628// TTL-Aware Storage Wrapper
629// ============================================================================
630
631/// A wrapper that applies TTL policies and tracks expiration on upsert
632pub struct TtlAwareStorage<S: VectorStorage> {
633    inner: Arc<S>,
634    manager: Arc<TtlManager<S>>,
635}
636
637impl<S: VectorStorage + 'static> TtlAwareStorage<S> {
638    /// Create a new TTL-aware storage wrapper
639    pub fn new(storage: Arc<S>, manager: Arc<TtlManager<S>>) -> Self {
640        Self {
641            inner: storage,
642            manager,
643        }
644    }
645
646    /// Get the inner storage
647    pub fn inner(&self) -> &Arc<S> {
648        &self.inner
649    }
650
651    /// Get the TTL manager
652    pub fn manager(&self) -> &Arc<TtlManager<S>> {
653        &self.manager
654    }
655
656    /// Upsert vectors with TTL policy application
657    pub async fn upsert_with_ttl(
658        &self,
659        namespace: &NamespaceId,
660        mut vectors: Vec<common::Vector>,
661    ) -> Result<usize> {
662        // Apply TTL policies
663        for vector in &mut vectors {
664            let validated_ttl = self.manager.validate_ttl(namespace, vector.ttl_seconds)?;
665            vector.ttl_seconds = validated_ttl;
666
667            // Apply TTL to set expires_at
668            if vector.ttl_seconds.is_some() {
669                vector.apply_ttl();
670            }
671        }
672
673        self.inner.upsert(namespace, vectors).await
674    }
675}
676
677// ============================================================================
678// Helper Functions
679// ============================================================================
680
681/// Calculate expiration timestamp from TTL
682pub fn calculate_expiration(ttl_seconds: u64) -> u64 {
683    let now = std::time::SystemTime::now()
684        .duration_since(std::time::UNIX_EPOCH)
685        .unwrap_or_default()
686        .as_secs();
687    now + ttl_seconds
688}
689
690/// Check if a timestamp has expired
691pub fn is_expired(expires_at: u64) -> bool {
692    let now = std::time::SystemTime::now()
693        .duration_since(std::time::UNIX_EPOCH)
694        .unwrap_or_default()
695        .as_secs();
696    now >= expires_at
697}
698
699/// Get remaining TTL from expiration timestamp
700pub fn remaining_ttl(expires_at: u64) -> Option<u64> {
701    let now = std::time::SystemTime::now()
702        .duration_since(std::time::UNIX_EPOCH)
703        .unwrap_or_default()
704        .as_secs();
705    if now < expires_at {
706        Some(expires_at - now)
707    } else {
708        None
709    }
710}
711
712// ============================================================================
713// Tests
714// ============================================================================
715
716#[cfg(test)]
717mod tests {
718    use super::*;
719    use common::Vector;
720
721    // Mock storage for testing
722    struct MockStorage {
723        namespaces: RwLock<HashMap<NamespaceId, Vec<Vector>>>,
724    }
725
726    impl MockStorage {
727        fn new() -> Self {
728            Self {
729                namespaces: RwLock::new(HashMap::new()),
730            }
731        }
732
733        fn with_vectors(namespace: &str, vectors: Vec<Vector>) -> Self {
734            let mut map = HashMap::new();
735            map.insert(namespace.to_string(), vectors);
736            Self {
737                namespaces: RwLock::new(map),
738            }
739        }
740    }
741
742    #[async_trait::async_trait]
743    impl VectorStorage for MockStorage {
744        async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
745            let count = vectors.len();
746            self.namespaces
747                .write()
748                .entry(namespace.clone())
749                .or_default()
750                .extend(vectors);
751            Ok(count)
752        }
753
754        async fn get(
755            &self,
756            namespace: &NamespaceId,
757            ids: &[common::VectorId],
758        ) -> Result<Vec<Vector>> {
759            let ns = self.namespaces.read();
760            if let Some(vectors) = ns.get(namespace) {
761                Ok(vectors
762                    .iter()
763                    .filter(|v| ids.contains(&v.id))
764                    .cloned()
765                    .collect())
766            } else {
767                Ok(vec![])
768            }
769        }
770
771        async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
772            let ns = self.namespaces.read();
773            Ok(ns.get(namespace).cloned().unwrap_or_default())
774        }
775
776        async fn delete(&self, namespace: &NamespaceId, ids: &[common::VectorId]) -> Result<usize> {
777            let mut ns = self.namespaces.write();
778            if let Some(vectors) = ns.get_mut(namespace) {
779                let before = vectors.len();
780                vectors.retain(|v| !ids.contains(&v.id));
781                Ok(before - vectors.len())
782            } else {
783                Ok(0)
784            }
785        }
786
787        async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
788            Ok(self.namespaces.read().contains_key(namespace))
789        }
790
791        async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
792            self.namespaces
793                .write()
794                .entry(namespace.clone())
795                .or_default();
796            Ok(())
797        }
798
799        async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
800            let ns = self.namespaces.read();
801            Ok(ns.get(namespace).map(|v| v.len()).unwrap_or(0))
802        }
803
804        async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
805            let ns = self.namespaces.read();
806            Ok(ns
807                .get(namespace)
808                .and_then(|v| v.first().map(|vec| vec.values.len())))
809        }
810
811        async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
812            Ok(self.namespaces.read().keys().cloned().collect())
813        }
814
815        async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
816            Ok(self.namespaces.write().remove(namespace).is_some())
817        }
818
819        async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
820            let mut ns = self.namespaces.write();
821            if let Some(vectors) = ns.get_mut(namespace) {
822                let before = vectors.len();
823                let now = std::time::SystemTime::now()
824                    .duration_since(std::time::UNIX_EPOCH)
825                    .unwrap_or_default()
826                    .as_secs();
827                vectors.retain(|v| !v.is_expired_at(now));
828                Ok(before - vectors.len())
829            } else {
830                Ok(0)
831            }
832        }
833
834        async fn cleanup_all_expired(&self) -> Result<usize> {
835            let mut total = 0;
836            let namespaces: Vec<_> = self.namespaces.read().keys().cloned().collect();
837            for ns in namespaces {
838                total += self.cleanup_expired(&ns).await?;
839            }
840            Ok(total)
841        }
842    }
843
844    #[test]
845    fn test_ttl_config_builder() {
846        let config = TtlConfig::new()
847            .with_cleanup_interval(Duration::from_secs(120))
848            .with_batch_size(5000)
849            .with_cleanup_on_startup(false)
850            .with_grace_period(10);
851
852        assert_eq!(config.cleanup_interval, Duration::from_secs(120));
853        assert_eq!(config.batch_size, 5000);
854        assert!(!config.cleanup_on_startup);
855        assert_eq!(config.grace_period_seconds, 10);
856        assert!(config.enabled);
857    }
858
859    #[test]
860    fn test_ttl_config_disabled() {
861        let config = TtlConfig::disabled();
862        assert!(!config.enabled);
863    }
864
865    #[test]
866    fn test_namespace_policy_default_ttl() {
867        let policy = NamespaceTtlPolicy::with_default_ttl(3600);
868
869        // Should apply default when None
870        let result = policy.apply(None).unwrap();
871        assert_eq!(result, Some(3600));
872
873        // Should keep explicit TTL
874        let result = policy.apply(Some(7200)).unwrap();
875        assert_eq!(result, Some(7200));
876    }
877
878    #[test]
879    fn test_namespace_policy_required_ttl() {
880        let policy = NamespaceTtlPolicy::required();
881
882        // Should fail when no TTL and no default
883        let result = policy.apply(None);
884        assert!(result.is_err());
885
886        // Should accept explicit TTL
887        let result = policy.apply(Some(3600)).unwrap();
888        assert_eq!(result, Some(3600));
889    }
890
891    #[test]
892    fn test_namespace_policy_max_ttl() {
893        let policy = NamespaceTtlPolicy::default().with_max_ttl(3600);
894
895        // Should clamp to max
896        let result = policy.apply(Some(7200)).unwrap();
897        assert_eq!(result, Some(3600));
898
899        // Should keep within range
900        let result = policy.apply(Some(1800)).unwrap();
901        assert_eq!(result, Some(1800));
902    }
903
904    #[test]
905    fn test_namespace_policy_min_ttl() {
906        let policy = NamespaceTtlPolicy::default().with_min_ttl(600);
907
908        // Should fail below min
909        let result = policy.apply(Some(300));
910        assert!(result.is_err());
911
912        // Should accept above min
913        let result = policy.apply(Some(900)).unwrap();
914        assert_eq!(result, Some(900));
915    }
916
917    #[test]
918    fn test_namespace_policy_exempt() {
919        let policy = NamespaceTtlPolicy::exempt();
920        assert!(policy.exempt_from_cleanup);
921    }
922
923    #[tokio::test]
924    async fn test_ttl_manager_basic() {
925        let storage = Arc::new(MockStorage::new());
926        let manager = TtlManager::new(storage, TtlConfig::default());
927
928        assert!(!manager.is_running());
929        assert_eq!(manager.stats().total_cleaned, 0);
930    }
931
932    #[tokio::test]
933    async fn test_ttl_manager_policy() {
934        let storage = Arc::new(MockStorage::new());
935        let manager = TtlManager::new(storage, TtlConfig::default());
936
937        let policy = NamespaceTtlPolicy::with_default_ttl(3600);
938        manager.set_policy(&"test".to_string(), policy.clone());
939
940        let retrieved = manager.get_policy(&"test".to_string()).unwrap();
941        assert_eq!(retrieved.default_ttl_seconds, Some(3600));
942
943        manager.remove_policy(&"test".to_string());
944        assert!(manager.get_policy(&"test".to_string()).is_none());
945    }
946
947    #[tokio::test]
948    async fn test_ttl_manager_validate_ttl() {
949        let storage = Arc::new(MockStorage::new());
950        let manager = TtlManager::new(storage, TtlConfig::default());
951
952        // No policy - pass through
953        let result = manager
954            .validate_ttl(&"ns1".to_string(), Some(3600))
955            .unwrap();
956        assert_eq!(result, Some(3600));
957
958        // With policy
959        let policy = NamespaceTtlPolicy::with_default_ttl(1800);
960        manager.set_policy(&"ns2".to_string(), policy);
961
962        let result = manager.validate_ttl(&"ns2".to_string(), None).unwrap();
963        assert_eq!(result, Some(1800));
964    }
965
966    #[tokio::test]
967    async fn test_ttl_manager_cleanup() {
968        // Create vectors with some expired
969        let now = std::time::SystemTime::now()
970            .duration_since(std::time::UNIX_EPOCH)
971            .unwrap()
972            .as_secs();
973
974        let vectors = vec![
975            Vector {
976                id: "v1".to_string(),
977                values: vec![1.0],
978                metadata: None,
979                ttl_seconds: None,
980                expires_at: Some(now - 100), // Expired
981            },
982            Vector {
983                id: "v2".to_string(),
984                values: vec![2.0],
985                metadata: None,
986                ttl_seconds: None,
987                expires_at: Some(now + 3600), // Not expired
988            },
989            Vector {
990                id: "v3".to_string(),
991                values: vec![3.0],
992                metadata: None,
993                ttl_seconds: None,
994                expires_at: None, // No expiration
995            },
996        ];
997
998        let storage = Arc::new(MockStorage::with_vectors("test", vectors));
999        let manager = TtlManager::new(storage.clone(), TtlConfig::default());
1000
1001        let result = manager.run_cleanup().await.unwrap();
1002
1003        assert_eq!(result.total_cleaned, 1);
1004        assert_eq!(result.namespace_counts.get("test"), Some(&1));
1005
1006        // Verify remaining vectors
1007        let remaining = storage.get_all(&"test".to_string()).await.unwrap();
1008        assert_eq!(remaining.len(), 2);
1009        assert!(remaining.iter().any(|v| v.id == "v2"));
1010        assert!(remaining.iter().any(|v| v.id == "v3"));
1011    }
1012
1013    #[tokio::test]
1014    async fn test_ttl_manager_excluded_namespace() {
1015        let now = std::time::SystemTime::now()
1016            .duration_since(std::time::UNIX_EPOCH)
1017            .unwrap()
1018            .as_secs();
1019
1020        let expired_vector = Vector {
1021            id: "v1".to_string(),
1022            values: vec![1.0],
1023            metadata: None,
1024            ttl_seconds: None,
1025            expires_at: Some(now - 100),
1026        };
1027
1028        let storage = Arc::new(MockStorage::with_vectors("excluded", vec![expired_vector]));
1029        let config = TtlConfig::default().with_excluded_namespaces(vec!["excluded".to_string()]);
1030        let manager = TtlManager::new(storage.clone(), config);
1031
1032        let result = manager.run_cleanup().await.unwrap();
1033
1034        // Should not clean excluded namespace
1035        assert_eq!(result.total_cleaned, 0);
1036
1037        // Vector should still exist
1038        let remaining = storage.get_all(&"excluded".to_string()).await.unwrap();
1039        assert_eq!(remaining.len(), 1);
1040    }
1041
1042    #[tokio::test]
1043    async fn test_ttl_manager_exempt_policy() {
1044        let now = std::time::SystemTime::now()
1045            .duration_since(std::time::UNIX_EPOCH)
1046            .unwrap()
1047            .as_secs();
1048
1049        let expired_vector = Vector {
1050            id: "v1".to_string(),
1051            values: vec![1.0],
1052            metadata: None,
1053            ttl_seconds: None,
1054            expires_at: Some(now - 100),
1055        };
1056
1057        let storage = Arc::new(MockStorage::with_vectors("exempt_ns", vec![expired_vector]));
1058        let manager = TtlManager::new(storage.clone(), TtlConfig::default());
1059
1060        // Set exempt policy
1061        manager.set_policy(&"exempt_ns".to_string(), NamespaceTtlPolicy::exempt());
1062
1063        let result = manager.run_cleanup().await.unwrap();
1064
1065        // Should not clean exempt namespace
1066        assert_eq!(result.total_cleaned, 0);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_ttl_stats() {
1071        let storage = Arc::new(MockStorage::new());
1072        let manager = TtlManager::new(storage, TtlConfig::default());
1073
1074        let stats = manager.stats();
1075        assert_eq!(stats.total_cleaned, 0);
1076        assert_eq!(stats.cleanup_runs, 0);
1077        assert_eq!(stats.failed_cleanups, 0);
1078    }
1079
1080    #[test]
1081    fn test_calculate_expiration() {
1082        let now = std::time::SystemTime::now()
1083            .duration_since(std::time::UNIX_EPOCH)
1084            .unwrap()
1085            .as_secs();
1086
1087        let expires = calculate_expiration(3600);
1088        assert!(expires > now);
1089        assert!(expires <= now + 3600 + 1); // Allow 1 second tolerance
1090    }
1091
1092    #[test]
1093    fn test_is_expired() {
1094        let now = std::time::SystemTime::now()
1095            .duration_since(std::time::UNIX_EPOCH)
1096            .unwrap()
1097            .as_secs();
1098
1099        assert!(is_expired(now - 100));
1100        assert!(!is_expired(now + 100));
1101    }
1102
1103    #[test]
1104    fn test_remaining_ttl() {
1105        let now = std::time::SystemTime::now()
1106            .duration_since(std::time::UNIX_EPOCH)
1107            .unwrap()
1108            .as_secs();
1109
1110        // Expired
1111        assert_eq!(remaining_ttl(now - 100), None);
1112
1113        // Not expired
1114        let remaining = remaining_ttl(now + 100);
1115        assert!(remaining.is_some());
1116        assert!(remaining.unwrap() <= 100);
1117    }
1118
1119    #[tokio::test]
1120    async fn test_ttl_aware_storage() {
1121        let storage = Arc::new(MockStorage::new());
1122        let manager = Arc::new(TtlManager::new(Arc::clone(&storage), TtlConfig::default()));
1123
1124        // Set a policy with default TTL
1125        manager.set_policy(
1126            &"test".to_string(),
1127            NamespaceTtlPolicy::with_default_ttl(3600),
1128        );
1129
1130        let ttl_storage = TtlAwareStorage::new(Arc::clone(&storage), Arc::clone(&manager));
1131
1132        // Upsert without TTL - should get default
1133        let vectors = vec![Vector {
1134            id: "v1".to_string(),
1135            values: vec![1.0],
1136            metadata: None,
1137            ttl_seconds: None,
1138            expires_at: None,
1139        }];
1140
1141        ttl_storage
1142            .upsert_with_ttl(&"test".to_string(), vectors)
1143            .await
1144            .unwrap();
1145
1146        let stored = storage.get_all(&"test".to_string()).await.unwrap();
1147        assert_eq!(stored.len(), 1);
1148        assert!(stored[0].expires_at.is_some()); // Should have expiration set
1149    }
1150
1151    #[tokio::test]
1152    async fn test_ttl_service_creation() {
1153        let storage = Arc::new(MockStorage::new());
1154        let config = TtlConfig::default().with_cleanup_on_startup(false);
1155
1156        let (manager, _service) = TtlService::new(storage, config);
1157
1158        assert!(!manager.is_running());
1159        assert!(manager.config().enabled);
1160    }
1161
1162    #[test]
1163    fn test_list_policies() {
1164        let storage = Arc::new(MockStorage::new());
1165        let manager = TtlManager::new(storage, TtlConfig::default());
1166
1167        manager.set_policy(
1168            &"ns1".to_string(),
1169            NamespaceTtlPolicy::with_default_ttl(3600),
1170        );
1171        manager.set_policy(&"ns2".to_string(), NamespaceTtlPolicy::required());
1172
1173        let policies = manager.list_policies();
1174        assert_eq!(policies.len(), 2);
1175        assert!(policies.contains_key(&"ns1".to_string()));
1176        assert!(policies.contains_key(&"ns2".to_string()));
1177    }
1178}