1use common::{DakeraError, NamespaceId, Result};
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::mpsc;
17
18use crate::traits::VectorStorage;
19
20#[derive(Debug, Clone)]
26pub struct TtlConfig {
27 pub cleanup_interval: Duration,
29 pub batch_size: usize,
31 pub cleanup_on_startup: bool,
33 pub max_cleanup_duration: Duration,
35 pub enabled: bool,
37 pub excluded_namespaces: Vec<NamespaceId>,
39 pub grace_period_seconds: u64,
41}
42
43impl Default for TtlConfig {
44 fn default() -> Self {
45 Self {
46 cleanup_interval: Duration::from_secs(60),
47 batch_size: 10000,
48 cleanup_on_startup: true,
49 max_cleanup_duration: Duration::from_secs(30),
50 enabled: true,
51 excluded_namespaces: Vec::new(),
52 grace_period_seconds: 0,
53 }
54 }
55}
56
57impl TtlConfig {
58 pub fn new() -> Self {
60 Self::default()
61 }
62
63 pub fn with_cleanup_interval(mut self, interval: Duration) -> Self {
65 self.cleanup_interval = interval;
66 self
67 }
68
69 pub fn with_batch_size(mut self, size: usize) -> Self {
71 self.batch_size = size;
72 self
73 }
74
75 pub fn with_cleanup_on_startup(mut self, cleanup: bool) -> Self {
77 self.cleanup_on_startup = cleanup;
78 self
79 }
80
81 pub fn with_max_cleanup_duration(mut self, duration: Duration) -> Self {
83 self.max_cleanup_duration = duration;
84 self
85 }
86
87 pub fn disabled() -> Self {
89 Self {
90 enabled: false,
91 ..Default::default()
92 }
93 }
94
95 pub fn with_excluded_namespaces(mut self, namespaces: Vec<NamespaceId>) -> Self {
97 self.excluded_namespaces = namespaces;
98 self
99 }
100
101 pub fn with_grace_period(mut self, seconds: u64) -> Self {
103 self.grace_period_seconds = seconds;
104 self
105 }
106}
107
108#[derive(Debug, Clone, Default)]
114pub struct NamespaceTtlPolicy {
115 pub default_ttl_seconds: Option<u64>,
117 pub max_ttl_seconds: Option<u64>,
119 pub min_ttl_seconds: Option<u64>,
121 pub ttl_required: bool,
123 pub custom_cleanup_interval: Option<Duration>,
125 pub exempt_from_cleanup: bool,
127}
128
129impl NamespaceTtlPolicy {
130 pub fn with_default_ttl(seconds: u64) -> Self {
132 Self {
133 default_ttl_seconds: Some(seconds),
134 ..Default::default()
135 }
136 }
137
138 pub fn required() -> Self {
140 Self {
141 ttl_required: true,
142 ..Default::default()
143 }
144 }
145
146 pub fn exempt() -> Self {
148 Self {
149 exempt_from_cleanup: true,
150 ..Default::default()
151 }
152 }
153
154 pub fn with_max_ttl(mut self, seconds: u64) -> Self {
156 self.max_ttl_seconds = Some(seconds);
157 self
158 }
159
160 pub fn with_min_ttl(mut self, seconds: u64) -> Self {
162 self.min_ttl_seconds = Some(seconds);
163 self
164 }
165
166 pub fn apply(&self, ttl_seconds: Option<u64>) -> Result<Option<u64>> {
169 let ttl = match ttl_seconds {
170 Some(t) => Some(t),
171 None => {
172 if self.ttl_required && self.default_ttl_seconds.is_none() {
173 return Err(DakeraError::InvalidRequest(
174 "TTL is required for this namespace".to_string(),
175 ));
176 }
177 self.default_ttl_seconds
178 }
179 };
180
181 if let Some(t) = ttl {
182 if let Some(min) = self.min_ttl_seconds {
184 if t < min {
185 return Err(DakeraError::InvalidRequest(format!(
186 "TTL {} is below minimum {} seconds",
187 t, min
188 )));
189 }
190 }
191
192 if let Some(max) = self.max_ttl_seconds {
194 if t > max {
195 return Ok(Some(max)); }
197 }
198 }
199
200 Ok(ttl)
201 }
202}
203
204#[derive(Debug, Clone, Default)]
210pub struct TtlStats {
211 pub total_cleaned: u64,
213 pub last_cleanup_count: u64,
215 pub last_cleanup_duration_ms: u64,
217 pub last_cleanup_at: u64,
219 pub cleanup_runs: u64,
221 pub failed_cleanups: u64,
223 pub avg_cleaned_per_run: f64,
225 pub namespace_stats: HashMap<NamespaceId, NamespaceCleanupStats>,
227}
228
229#[derive(Debug, Clone, Default)]
231pub struct NamespaceCleanupStats {
232 pub total_cleaned: u64,
234 pub last_cleanup_at: u64,
236 pub last_cleanup_count: u64,
238}
239
240struct AtomicTtlStats {
242 total_cleaned: AtomicU64,
243 last_cleanup_count: AtomicU64,
244 last_cleanup_duration_ms: AtomicU64,
245 last_cleanup_at: AtomicU64,
246 cleanup_runs: AtomicU64,
247 failed_cleanups: AtomicU64,
248 namespace_stats: RwLock<HashMap<NamespaceId, NamespaceCleanupStats>>,
249}
250
251impl AtomicTtlStats {
252 fn new() -> Self {
253 Self {
254 total_cleaned: AtomicU64::new(0),
255 last_cleanup_count: AtomicU64::new(0),
256 last_cleanup_duration_ms: AtomicU64::new(0),
257 last_cleanup_at: AtomicU64::new(0),
258 cleanup_runs: AtomicU64::new(0),
259 failed_cleanups: AtomicU64::new(0),
260 namespace_stats: RwLock::new(HashMap::new()),
261 }
262 }
263
264 fn record_cleanup(
265 &self,
266 count: u64,
267 duration_ms: u64,
268 namespace_counts: &HashMap<NamespaceId, u64>,
269 ) {
270 self.total_cleaned.fetch_add(count, Ordering::SeqCst);
271 self.last_cleanup_count.store(count, Ordering::SeqCst);
272 self.last_cleanup_duration_ms
273 .store(duration_ms, Ordering::SeqCst);
274 self.cleanup_runs.fetch_add(1, Ordering::SeqCst);
275
276 let now = std::time::SystemTime::now()
277 .duration_since(std::time::UNIX_EPOCH)
278 .unwrap_or_default()
279 .as_secs();
280 self.last_cleanup_at.store(now, Ordering::SeqCst);
281
282 let mut ns_stats = self.namespace_stats.write();
284 for (namespace, count) in namespace_counts {
285 let stats = ns_stats.entry(namespace.clone()).or_default();
286 stats.total_cleaned += count;
287 stats.last_cleanup_at = now;
288 stats.last_cleanup_count = *count;
289 }
290 }
291
292 fn record_failure(&self) {
293 self.failed_cleanups.fetch_add(1, Ordering::SeqCst);
294 }
295
296 fn snapshot(&self) -> TtlStats {
297 let cleanup_runs = self.cleanup_runs.load(Ordering::SeqCst);
298 let total_cleaned = self.total_cleaned.load(Ordering::SeqCst);
299
300 TtlStats {
301 total_cleaned,
302 last_cleanup_count: self.last_cleanup_count.load(Ordering::SeqCst),
303 last_cleanup_duration_ms: self.last_cleanup_duration_ms.load(Ordering::SeqCst),
304 last_cleanup_at: self.last_cleanup_at.load(Ordering::SeqCst),
305 cleanup_runs,
306 failed_cleanups: self.failed_cleanups.load(Ordering::SeqCst),
307 avg_cleaned_per_run: if cleanup_runs > 0 {
308 total_cleaned as f64 / cleanup_runs as f64
309 } else {
310 0.0
311 },
312 namespace_stats: self.namespace_stats.read().clone(),
313 }
314 }
315}
316
317enum TtlCommand {
323 RunCleanup,
325 UpdateConfig(TtlConfig),
327 Shutdown,
329}
330
331pub struct TtlManager<S: VectorStorage> {
333 storage: Arc<S>,
334 config: RwLock<TtlConfig>,
335 policies: RwLock<HashMap<NamespaceId, NamespaceTtlPolicy>>,
336 stats: Arc<AtomicTtlStats>,
337 running: AtomicBool,
338 command_tx: Option<mpsc::Sender<TtlCommand>>,
339}
340
341impl<S: VectorStorage + 'static> TtlManager<S> {
342 pub fn new(storage: Arc<S>, config: TtlConfig) -> Self {
344 Self {
345 storage,
346 config: RwLock::new(config),
347 policies: RwLock::new(HashMap::new()),
348 stats: Arc::new(AtomicTtlStats::new()),
349 running: AtomicBool::new(false),
350 command_tx: None,
351 }
352 }
353
354 pub fn with_defaults(storage: Arc<S>) -> Self {
356 Self::new(storage, TtlConfig::default())
357 }
358
359 pub fn config(&self) -> TtlConfig {
361 self.config.read().clone()
362 }
363
364 pub fn update_config(&self, config: TtlConfig) {
366 *self.config.write() = config.clone();
367 if let Some(tx) = &self.command_tx {
368 let _ = tx.try_send(TtlCommand::UpdateConfig(config));
369 }
370 }
371
372 pub fn set_policy(&self, namespace: &NamespaceId, policy: NamespaceTtlPolicy) {
374 self.policies.write().insert(namespace.clone(), policy);
375 tracing::info!(namespace = %namespace, "TTL policy updated");
376 }
377
378 pub fn get_policy(&self, namespace: &NamespaceId) -> Option<NamespaceTtlPolicy> {
380 self.policies.read().get(namespace).cloned()
381 }
382
383 pub fn remove_policy(&self, namespace: &NamespaceId) -> Option<NamespaceTtlPolicy> {
385 self.policies.write().remove(namespace)
386 }
387
388 pub fn list_policies(&self) -> HashMap<NamespaceId, NamespaceTtlPolicy> {
390 self.policies.read().clone()
391 }
392
393 pub fn stats(&self) -> TtlStats {
395 self.stats.snapshot()
396 }
397
398 pub fn is_running(&self) -> bool {
400 self.running.load(Ordering::SeqCst)
401 }
402
403 pub fn validate_ttl(
405 &self,
406 namespace: &NamespaceId,
407 ttl_seconds: Option<u64>,
408 ) -> Result<Option<u64>> {
409 if let Some(policy) = self.policies.read().get(namespace) {
410 policy.apply(ttl_seconds)
411 } else {
412 Ok(ttl_seconds)
413 }
414 }
415
416 pub async fn run_cleanup(&self) -> Result<TtlCleanupResult> {
418 let config = self.config.read().clone();
419 let policies = self.policies.read().clone();
420
421 let start = Instant::now();
422 let mut total_cleaned = 0u64;
423 let mut namespace_counts: HashMap<NamespaceId, u64> = HashMap::new();
424 let mut errors = Vec::new();
425
426 let namespaces = match self.storage.list_namespaces().await {
428 Ok(ns) => ns,
429 Err(e) => {
430 self.stats.record_failure();
431 return Err(e);
432 }
433 };
434
435 for namespace in namespaces {
436 if config.excluded_namespaces.contains(&namespace) {
438 continue;
439 }
440
441 if let Some(policy) = policies.get(&namespace) {
443 if policy.exempt_from_cleanup {
444 continue;
445 }
446 }
447
448 if start.elapsed() > config.max_cleanup_duration {
450 tracing::warn!(
451 "TTL cleanup timeout reached after {:?}, stopping early",
452 start.elapsed()
453 );
454 break;
455 }
456
457 match self.storage.cleanup_expired(&namespace).await {
459 Ok(cleaned) => {
460 if cleaned > 0 {
461 total_cleaned += cleaned as u64;
462 namespace_counts.insert(namespace.clone(), cleaned as u64);
463 tracing::debug!(
464 namespace = %namespace,
465 cleaned = cleaned,
466 "Cleaned expired vectors"
467 );
468 }
469 }
470 Err(e) => {
471 errors.push((namespace.clone(), e.to_string()));
472 tracing::error!(
473 namespace = %namespace,
474 error = %e,
475 "Failed to cleanup namespace"
476 );
477 }
478 }
479 }
480
481 let duration = start.elapsed();
482 self.stats.record_cleanup(
483 total_cleaned,
484 duration.as_millis() as u64,
485 &namespace_counts,
486 );
487
488 tracing::info!(
489 total_cleaned = total_cleaned,
490 duration_ms = duration.as_millis(),
491 namespaces_cleaned = namespace_counts.len(),
492 errors = errors.len(),
493 "TTL cleanup completed"
494 );
495
496 Ok(TtlCleanupResult {
497 total_cleaned,
498 duration,
499 namespace_counts,
500 errors,
501 })
502 }
503
504 pub fn trigger_cleanup(&self) {
506 if let Some(tx) = &self.command_tx {
507 let _ = tx.try_send(TtlCommand::RunCleanup);
508 }
509 }
510
511 pub fn shutdown(&self) {
513 if let Some(tx) = &self.command_tx {
514 let _ = tx.try_send(TtlCommand::Shutdown);
515 }
516 }
517}
518
519#[derive(Debug, Clone)]
521pub struct TtlCleanupResult {
522 pub total_cleaned: u64,
524 pub duration: Duration,
526 pub namespace_counts: HashMap<NamespaceId, u64>,
528 pub errors: Vec<(NamespaceId, String)>,
530}
531
532pub struct TtlService<S: VectorStorage + 'static> {
538 manager: Arc<TtlManager<S>>,
539 command_rx: mpsc::Receiver<TtlCommand>,
540}
541
542impl<S: VectorStorage + 'static> TtlService<S> {
543 pub fn new(storage: Arc<S>, config: TtlConfig) -> (Arc<TtlManager<S>>, Self) {
545 let (tx, rx) = mpsc::channel(16);
546
547 let manager = Arc::new(TtlManager {
548 storage,
549 config: RwLock::new(config),
550 policies: RwLock::new(HashMap::new()),
551 stats: Arc::new(AtomicTtlStats::new()),
552 running: AtomicBool::new(false),
553 command_tx: Some(tx),
554 });
555
556 let service = Self {
557 manager: Arc::clone(&manager),
558 command_rx: rx,
559 };
560
561 (manager, service)
562 }
563
564 pub async fn run(mut self) {
566 let config = self.manager.config();
567
568 self.manager.running.store(true, Ordering::SeqCst);
569 tracing::info!(
570 interval_secs = config.cleanup_interval.as_secs(),
571 enabled = config.enabled,
572 "TTL service started"
573 );
574
575 if config.cleanup_on_startup && config.enabled {
577 tracing::debug!("Running startup cleanup");
578 if let Err(e) = self.manager.run_cleanup().await {
579 tracing::error!(error = %e, "Startup cleanup failed");
580 }
581 }
582
583 let mut interval = tokio::time::interval(config.cleanup_interval);
584 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
585
586 loop {
587 tokio::select! {
588 _ = interval.tick() => {
589 let current_config = self.manager.config();
590 if current_config.enabled {
591 if let Err(e) = self.manager.run_cleanup().await {
592 tracing::error!(error = %e, "Scheduled cleanup failed");
593 }
594 }
595 }
596 Some(cmd) = self.command_rx.recv() => {
597 match cmd {
598 TtlCommand::RunCleanup => {
599 if let Err(e) = self.manager.run_cleanup().await {
600 tracing::error!(error = %e, "Manual cleanup failed");
601 }
602 }
603 TtlCommand::UpdateConfig(new_config) => {
604 interval = tokio::time::interval(new_config.cleanup_interval);
605 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
606 tracing::info!("TTL config updated");
607 }
608 TtlCommand::Shutdown => {
609 tracing::info!("TTL service shutting down");
610 break;
611 }
612 }
613 }
614 }
615 }
616
617 self.manager.running.store(false, Ordering::SeqCst);
618 tracing::info!("TTL service stopped");
619 }
620
621 pub fn spawn(self) -> tokio::task::JoinHandle<()> {
623 tokio::spawn(self.run())
624 }
625}
626
627pub struct TtlAwareStorage<S: VectorStorage> {
633 inner: Arc<S>,
634 manager: Arc<TtlManager<S>>,
635}
636
637impl<S: VectorStorage + 'static> TtlAwareStorage<S> {
638 pub fn new(storage: Arc<S>, manager: Arc<TtlManager<S>>) -> Self {
640 Self {
641 inner: storage,
642 manager,
643 }
644 }
645
646 pub fn inner(&self) -> &Arc<S> {
648 &self.inner
649 }
650
651 pub fn manager(&self) -> &Arc<TtlManager<S>> {
653 &self.manager
654 }
655
656 pub async fn upsert_with_ttl(
658 &self,
659 namespace: &NamespaceId,
660 mut vectors: Vec<common::Vector>,
661 ) -> Result<usize> {
662 for vector in &mut vectors {
664 let validated_ttl = self.manager.validate_ttl(namespace, vector.ttl_seconds)?;
665 vector.ttl_seconds = validated_ttl;
666
667 if vector.ttl_seconds.is_some() {
669 vector.apply_ttl();
670 }
671 }
672
673 self.inner.upsert(namespace, vectors).await
674 }
675}
676
677pub fn calculate_expiration(ttl_seconds: u64) -> u64 {
683 let now = std::time::SystemTime::now()
684 .duration_since(std::time::UNIX_EPOCH)
685 .unwrap_or_default()
686 .as_secs();
687 now + ttl_seconds
688}
689
690pub fn is_expired(expires_at: u64) -> bool {
692 let now = std::time::SystemTime::now()
693 .duration_since(std::time::UNIX_EPOCH)
694 .unwrap_or_default()
695 .as_secs();
696 now >= expires_at
697}
698
699pub fn remaining_ttl(expires_at: u64) -> Option<u64> {
701 let now = std::time::SystemTime::now()
702 .duration_since(std::time::UNIX_EPOCH)
703 .unwrap_or_default()
704 .as_secs();
705 if now < expires_at {
706 Some(expires_at - now)
707 } else {
708 None
709 }
710}
711
712#[cfg(test)]
717mod tests {
718 use super::*;
719 use common::Vector;
720
721 struct MockStorage {
723 namespaces: RwLock<HashMap<NamespaceId, Vec<Vector>>>,
724 }
725
726 impl MockStorage {
727 fn new() -> Self {
728 Self {
729 namespaces: RwLock::new(HashMap::new()),
730 }
731 }
732
733 fn with_vectors(namespace: &str, vectors: Vec<Vector>) -> Self {
734 let mut map = HashMap::new();
735 map.insert(namespace.to_string(), vectors);
736 Self {
737 namespaces: RwLock::new(map),
738 }
739 }
740 }
741
742 #[async_trait::async_trait]
743 impl VectorStorage for MockStorage {
744 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
745 let count = vectors.len();
746 self.namespaces
747 .write()
748 .entry(namespace.clone())
749 .or_default()
750 .extend(vectors);
751 Ok(count)
752 }
753
754 async fn get(
755 &self,
756 namespace: &NamespaceId,
757 ids: &[common::VectorId],
758 ) -> Result<Vec<Vector>> {
759 let ns = self.namespaces.read();
760 if let Some(vectors) = ns.get(namespace) {
761 Ok(vectors
762 .iter()
763 .filter(|v| ids.contains(&v.id))
764 .cloned()
765 .collect())
766 } else {
767 Ok(vec![])
768 }
769 }
770
771 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
772 let ns = self.namespaces.read();
773 Ok(ns.get(namespace).cloned().unwrap_or_default())
774 }
775
776 async fn delete(&self, namespace: &NamespaceId, ids: &[common::VectorId]) -> Result<usize> {
777 let mut ns = self.namespaces.write();
778 if let Some(vectors) = ns.get_mut(namespace) {
779 let before = vectors.len();
780 vectors.retain(|v| !ids.contains(&v.id));
781 Ok(before - vectors.len())
782 } else {
783 Ok(0)
784 }
785 }
786
787 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
788 Ok(self.namespaces.read().contains_key(namespace))
789 }
790
791 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
792 self.namespaces
793 .write()
794 .entry(namespace.clone())
795 .or_default();
796 Ok(())
797 }
798
799 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
800 let ns = self.namespaces.read();
801 Ok(ns.get(namespace).map(|v| v.len()).unwrap_or(0))
802 }
803
804 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
805 let ns = self.namespaces.read();
806 Ok(ns
807 .get(namespace)
808 .and_then(|v| v.first().map(|vec| vec.values.len())))
809 }
810
811 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
812 Ok(self.namespaces.read().keys().cloned().collect())
813 }
814
815 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
816 Ok(self.namespaces.write().remove(namespace).is_some())
817 }
818
819 async fn cleanup_expired(&self, namespace: &NamespaceId) -> Result<usize> {
820 let mut ns = self.namespaces.write();
821 if let Some(vectors) = ns.get_mut(namespace) {
822 let before = vectors.len();
823 let now = std::time::SystemTime::now()
824 .duration_since(std::time::UNIX_EPOCH)
825 .unwrap_or_default()
826 .as_secs();
827 vectors.retain(|v| !v.is_expired_at(now));
828 Ok(before - vectors.len())
829 } else {
830 Ok(0)
831 }
832 }
833
834 async fn cleanup_all_expired(&self) -> Result<usize> {
835 let mut total = 0;
836 let namespaces: Vec<_> = self.namespaces.read().keys().cloned().collect();
837 for ns in namespaces {
838 total += self.cleanup_expired(&ns).await?;
839 }
840 Ok(total)
841 }
842 }
843
844 #[test]
845 fn test_ttl_config_builder() {
846 let config = TtlConfig::new()
847 .with_cleanup_interval(Duration::from_secs(120))
848 .with_batch_size(5000)
849 .with_cleanup_on_startup(false)
850 .with_grace_period(10);
851
852 assert_eq!(config.cleanup_interval, Duration::from_secs(120));
853 assert_eq!(config.batch_size, 5000);
854 assert!(!config.cleanup_on_startup);
855 assert_eq!(config.grace_period_seconds, 10);
856 assert!(config.enabled);
857 }
858
859 #[test]
860 fn test_ttl_config_disabled() {
861 let config = TtlConfig::disabled();
862 assert!(!config.enabled);
863 }
864
865 #[test]
866 fn test_namespace_policy_default_ttl() {
867 let policy = NamespaceTtlPolicy::with_default_ttl(3600);
868
869 let result = policy.apply(None).unwrap();
871 assert_eq!(result, Some(3600));
872
873 let result = policy.apply(Some(7200)).unwrap();
875 assert_eq!(result, Some(7200));
876 }
877
878 #[test]
879 fn test_namespace_policy_required_ttl() {
880 let policy = NamespaceTtlPolicy::required();
881
882 let result = policy.apply(None);
884 assert!(result.is_err());
885
886 let result = policy.apply(Some(3600)).unwrap();
888 assert_eq!(result, Some(3600));
889 }
890
891 #[test]
892 fn test_namespace_policy_max_ttl() {
893 let policy = NamespaceTtlPolicy::default().with_max_ttl(3600);
894
895 let result = policy.apply(Some(7200)).unwrap();
897 assert_eq!(result, Some(3600));
898
899 let result = policy.apply(Some(1800)).unwrap();
901 assert_eq!(result, Some(1800));
902 }
903
904 #[test]
905 fn test_namespace_policy_min_ttl() {
906 let policy = NamespaceTtlPolicy::default().with_min_ttl(600);
907
908 let result = policy.apply(Some(300));
910 assert!(result.is_err());
911
912 let result = policy.apply(Some(900)).unwrap();
914 assert_eq!(result, Some(900));
915 }
916
917 #[test]
918 fn test_namespace_policy_exempt() {
919 let policy = NamespaceTtlPolicy::exempt();
920 assert!(policy.exempt_from_cleanup);
921 }
922
923 #[tokio::test]
924 async fn test_ttl_manager_basic() {
925 let storage = Arc::new(MockStorage::new());
926 let manager = TtlManager::new(storage, TtlConfig::default());
927
928 assert!(!manager.is_running());
929 assert_eq!(manager.stats().total_cleaned, 0);
930 }
931
932 #[tokio::test]
933 async fn test_ttl_manager_policy() {
934 let storage = Arc::new(MockStorage::new());
935 let manager = TtlManager::new(storage, TtlConfig::default());
936
937 let policy = NamespaceTtlPolicy::with_default_ttl(3600);
938 manager.set_policy(&"test".to_string(), policy.clone());
939
940 let retrieved = manager.get_policy(&"test".to_string()).unwrap();
941 assert_eq!(retrieved.default_ttl_seconds, Some(3600));
942
943 manager.remove_policy(&"test".to_string());
944 assert!(manager.get_policy(&"test".to_string()).is_none());
945 }
946
947 #[tokio::test]
948 async fn test_ttl_manager_validate_ttl() {
949 let storage = Arc::new(MockStorage::new());
950 let manager = TtlManager::new(storage, TtlConfig::default());
951
952 let result = manager
954 .validate_ttl(&"ns1".to_string(), Some(3600))
955 .unwrap();
956 assert_eq!(result, Some(3600));
957
958 let policy = NamespaceTtlPolicy::with_default_ttl(1800);
960 manager.set_policy(&"ns2".to_string(), policy);
961
962 let result = manager.validate_ttl(&"ns2".to_string(), None).unwrap();
963 assert_eq!(result, Some(1800));
964 }
965
966 #[tokio::test]
967 async fn test_ttl_manager_cleanup() {
968 let now = std::time::SystemTime::now()
970 .duration_since(std::time::UNIX_EPOCH)
971 .unwrap()
972 .as_secs();
973
974 let vectors = vec![
975 Vector {
976 id: "v1".to_string(),
977 values: vec![1.0],
978 metadata: None,
979 ttl_seconds: None,
980 expires_at: Some(now - 100), },
982 Vector {
983 id: "v2".to_string(),
984 values: vec![2.0],
985 metadata: None,
986 ttl_seconds: None,
987 expires_at: Some(now + 3600), },
989 Vector {
990 id: "v3".to_string(),
991 values: vec![3.0],
992 metadata: None,
993 ttl_seconds: None,
994 expires_at: None, },
996 ];
997
998 let storage = Arc::new(MockStorage::with_vectors("test", vectors));
999 let manager = TtlManager::new(storage.clone(), TtlConfig::default());
1000
1001 let result = manager.run_cleanup().await.unwrap();
1002
1003 assert_eq!(result.total_cleaned, 1);
1004 assert_eq!(result.namespace_counts.get("test"), Some(&1));
1005
1006 let remaining = storage.get_all(&"test".to_string()).await.unwrap();
1008 assert_eq!(remaining.len(), 2);
1009 assert!(remaining.iter().any(|v| v.id == "v2"));
1010 assert!(remaining.iter().any(|v| v.id == "v3"));
1011 }
1012
1013 #[tokio::test]
1014 async fn test_ttl_manager_excluded_namespace() {
1015 let now = std::time::SystemTime::now()
1016 .duration_since(std::time::UNIX_EPOCH)
1017 .unwrap()
1018 .as_secs();
1019
1020 let expired_vector = Vector {
1021 id: "v1".to_string(),
1022 values: vec![1.0],
1023 metadata: None,
1024 ttl_seconds: None,
1025 expires_at: Some(now - 100),
1026 };
1027
1028 let storage = Arc::new(MockStorage::with_vectors("excluded", vec![expired_vector]));
1029 let config = TtlConfig::default().with_excluded_namespaces(vec!["excluded".to_string()]);
1030 let manager = TtlManager::new(storage.clone(), config);
1031
1032 let result = manager.run_cleanup().await.unwrap();
1033
1034 assert_eq!(result.total_cleaned, 0);
1036
1037 let remaining = storage.get_all(&"excluded".to_string()).await.unwrap();
1039 assert_eq!(remaining.len(), 1);
1040 }
1041
1042 #[tokio::test]
1043 async fn test_ttl_manager_exempt_policy() {
1044 let now = std::time::SystemTime::now()
1045 .duration_since(std::time::UNIX_EPOCH)
1046 .unwrap()
1047 .as_secs();
1048
1049 let expired_vector = Vector {
1050 id: "v1".to_string(),
1051 values: vec![1.0],
1052 metadata: None,
1053 ttl_seconds: None,
1054 expires_at: Some(now - 100),
1055 };
1056
1057 let storage = Arc::new(MockStorage::with_vectors("exempt_ns", vec![expired_vector]));
1058 let manager = TtlManager::new(storage.clone(), TtlConfig::default());
1059
1060 manager.set_policy(&"exempt_ns".to_string(), NamespaceTtlPolicy::exempt());
1062
1063 let result = manager.run_cleanup().await.unwrap();
1064
1065 assert_eq!(result.total_cleaned, 0);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_ttl_stats() {
1071 let storage = Arc::new(MockStorage::new());
1072 let manager = TtlManager::new(storage, TtlConfig::default());
1073
1074 let stats = manager.stats();
1075 assert_eq!(stats.total_cleaned, 0);
1076 assert_eq!(stats.cleanup_runs, 0);
1077 assert_eq!(stats.failed_cleanups, 0);
1078 }
1079
1080 #[test]
1081 fn test_calculate_expiration() {
1082 let now = std::time::SystemTime::now()
1083 .duration_since(std::time::UNIX_EPOCH)
1084 .unwrap()
1085 .as_secs();
1086
1087 let expires = calculate_expiration(3600);
1088 assert!(expires > now);
1089 assert!(expires <= now + 3600 + 1); }
1091
1092 #[test]
1093 fn test_is_expired() {
1094 let now = std::time::SystemTime::now()
1095 .duration_since(std::time::UNIX_EPOCH)
1096 .unwrap()
1097 .as_secs();
1098
1099 assert!(is_expired(now - 100));
1100 assert!(!is_expired(now + 100));
1101 }
1102
1103 #[test]
1104 fn test_remaining_ttl() {
1105 let now = std::time::SystemTime::now()
1106 .duration_since(std::time::UNIX_EPOCH)
1107 .unwrap()
1108 .as_secs();
1109
1110 assert_eq!(remaining_ttl(now - 100), None);
1112
1113 let remaining = remaining_ttl(now + 100);
1115 assert!(remaining.is_some());
1116 assert!(remaining.unwrap() <= 100);
1117 }
1118
1119 #[tokio::test]
1120 async fn test_ttl_aware_storage() {
1121 let storage = Arc::new(MockStorage::new());
1122 let manager = Arc::new(TtlManager::new(Arc::clone(&storage), TtlConfig::default()));
1123
1124 manager.set_policy(
1126 &"test".to_string(),
1127 NamespaceTtlPolicy::with_default_ttl(3600),
1128 );
1129
1130 let ttl_storage = TtlAwareStorage::new(Arc::clone(&storage), Arc::clone(&manager));
1131
1132 let vectors = vec![Vector {
1134 id: "v1".to_string(),
1135 values: vec![1.0],
1136 metadata: None,
1137 ttl_seconds: None,
1138 expires_at: None,
1139 }];
1140
1141 ttl_storage
1142 .upsert_with_ttl(&"test".to_string(), vectors)
1143 .await
1144 .unwrap();
1145
1146 let stored = storage.get_all(&"test".to_string()).await.unwrap();
1147 assert_eq!(stored.len(), 1);
1148 assert!(stored[0].expires_at.is_some()); }
1150
1151 #[tokio::test]
1152 async fn test_ttl_service_creation() {
1153 let storage = Arc::new(MockStorage::new());
1154 let config = TtlConfig::default().with_cleanup_on_startup(false);
1155
1156 let (manager, _service) = TtlService::new(storage, config);
1157
1158 assert!(!manager.is_running());
1159 assert!(manager.config().enabled);
1160 }
1161
1162 #[test]
1163 fn test_list_policies() {
1164 let storage = Arc::new(MockStorage::new());
1165 let manager = TtlManager::new(storage, TtlConfig::default());
1166
1167 manager.set_policy(
1168 &"ns1".to_string(),
1169 NamespaceTtlPolicy::with_default_ttl(3600),
1170 );
1171 manager.set_policy(&"ns2".to_string(), NamespaceTtlPolicy::required());
1172
1173 let policies = manager.list_policies();
1174 assert_eq!(policies.len(), 2);
1175 assert!(policies.contains_key(&"ns1".to_string()));
1176 assert!(policies.contains_key(&"ns2".to_string()));
1177 }
1178}