1use super::automerge_store::AutomergeStore;
41use super::ttl::TtlConfig;
42use anyhow::Result;
43use std::collections::BTreeMap;
44use std::sync::{Arc, RwLock};
45use std::time::{Duration, Instant};
46use tokio::task::JoinHandle;
47
48pub struct TtlManager {
53 store: Arc<AutomergeStore>,
55
56 config: TtlConfig,
58
59 expiry_map: Arc<RwLock<BTreeMap<Instant, Vec<String>>>>,
65
66 cleanup_task: Arc<RwLock<Option<JoinHandle<()>>>>,
68}
69
70impl TtlManager {
71 pub fn new(store: Arc<AutomergeStore>, config: TtlConfig) -> Self {
86 Self {
87 store,
88 config,
89 expiry_map: Arc::new(RwLock::new(BTreeMap::new())),
90 cleanup_task: Arc::new(RwLock::new(None)),
91 }
92 }
93
94 pub fn set_ttl(&self, key: &str, ttl: Duration) -> Result<()> {
108 let expiry_time = Instant::now() + ttl;
109
110 let mut expiry_map = self.expiry_map.write().unwrap_or_else(|e| e.into_inner());
111 expiry_map
112 .entry(expiry_time)
113 .or_default()
114 .push(key.to_string());
115
116 Ok(())
117 }
118
119 pub fn cleanup_expired(&self) -> Result<usize> {
130 let now = Instant::now();
131 let mut count = 0;
132
133 let expired_keys = {
135 let mut expiry_map = self.expiry_map.write().unwrap_or_else(|e| e.into_inner());
136
137 let split_key = expiry_map
139 .range(..=now)
140 .next_back()
141 .map(|(k, _)| *k)
142 .unwrap_or(now);
143
144 let expired: Vec<_> = expiry_map
146 .range(..=split_key)
147 .flat_map(|(expiry, keys)| keys.iter().map(move |k| (*expiry, k.clone())))
148 .collect();
149
150 expiry_map.retain(|&expiry_time, _| expiry_time > now);
152
153 expired
154 };
155
156 let ordered_keys = self.apply_eviction_strategy(expired_keys);
158
159 for key in ordered_keys {
161 self.store.delete(&key)?;
162 count += 1;
163 }
164
165 Ok(count)
166 }
167
168 fn apply_eviction_strategy(&self, mut expired: Vec<(Instant, String)>) -> Vec<String> {
170 use super::ttl::EvictionStrategy;
171
172 match self.config.evict_strategy {
173 EvictionStrategy::OldestFirst => {
174 expired.sort_by_key(|(expiry, _)| *expiry);
176 expired.into_iter().map(|(_, key)| key).collect()
177 }
178 EvictionStrategy::KeepLastN(n) => {
179 use std::collections::HashMap;
181 let mut by_collection: HashMap<String, Vec<(Instant, String)>> = HashMap::new();
182 for (expiry, key) in expired {
183 let collection = key.split('/').next().unwrap_or("").to_string();
184 by_collection
185 .entry(collection)
186 .or_default()
187 .push((expiry, key));
188 }
189 let mut to_delete = Vec::new();
190 for (_collection, mut entries) in by_collection {
191 entries.sort_by_key(|(expiry, _)| std::cmp::Reverse(*expiry));
193 to_delete.extend(entries.into_iter().skip(n).map(|(_, key)| key));
194 }
195 to_delete
196 }
197 EvictionStrategy::StoragePressure { .. } | EvictionStrategy::None => {
198 expired.into_iter().map(|(_, key)| key).collect()
200 }
201 }
202 }
203
204 pub fn extend_ttls_for_offline(&self) {
212 let policy = match &self.config.offline_policy {
213 Some(p) => p,
214 None => return, };
216
217 let online_secs = policy.online_ttl.as_secs_f64();
221 let offline_secs = policy.offline_ttl.as_secs_f64();
222 if offline_secs <= 0.0 || online_secs <= 0.0 {
223 return;
224 }
225 let factor = online_secs / offline_secs;
227 if factor <= 1.0 {
228 return; }
230
231 let now = Instant::now();
232 let mut expiry_map = self.expiry_map.write().unwrap_or_else(|e| e.into_inner());
233
234 let entries: Vec<_> = expiry_map
236 .iter()
237 .filter(|(expiry, _)| **expiry > now)
238 .flat_map(|(expiry, keys)| keys.iter().map(move |k| (*expiry, k.clone())))
239 .collect();
240
241 expiry_map.clear();
243 for (old_expiry, key) in entries {
244 let remaining = old_expiry.duration_since(now);
245 let extended = Duration::from_secs_f64(remaining.as_secs_f64() * factor);
246 let new_expiry = now + extended;
247 expiry_map.entry(new_expiry).or_default().push(key);
248 }
249 }
250
251 pub fn start_background_cleanup(&self) {
262 let expiry_map = self.expiry_map.clone();
263 let store = self.store.clone();
264
265 let handle = tokio::spawn(async move {
266 let mut interval = tokio::time::interval(Duration::from_secs(10));
267
268 loop {
269 interval.tick().await;
270
271 let now = Instant::now();
273 let expired_docs = {
274 let mut expiry_map = expiry_map.write().unwrap_or_else(|e| e.into_inner());
275
276 let split_key = expiry_map
278 .range(..=now)
279 .next_back()
280 .map(|(k, _)| *k)
281 .unwrap_or(now);
282
283 let expired: Vec<_> = expiry_map
284 .range(..=split_key)
285 .flat_map(|(_, docs)| docs.clone())
286 .collect();
287
288 expiry_map.retain(|&expiry_time, _| expiry_time > now);
290
291 expired
292 };
293
294 for key in expired_docs {
296 if let Err(e) = store.delete(&key) {
297 eprintln!("TTL cleanup failed for {}: {}", key, e);
298 }
299 }
300 }
301 });
302
303 *self.cleanup_task.write().unwrap_or_else(|e| e.into_inner()) = Some(handle);
304 }
305
306 pub fn stop_background_cleanup(&self) {
308 if let Some(handle) = self
309 .cleanup_task
310 .write()
311 .unwrap_or_else(|e| e.into_inner())
312 .take()
313 {
314 handle.abort();
315 }
316 }
317
318 pub fn config(&self) -> &TtlConfig {
320 &self.config
321 }
322
323 pub fn pending_count(&self) -> usize {
325 self.expiry_map
326 .read()
327 .unwrap()
328 .values()
329 .map(|docs| docs.len())
330 .sum()
331 }
332}
333
334impl Drop for TtlManager {
335 fn drop(&mut self) {
336 self.stop_background_cleanup();
337 }
338}
339
340#[cfg(test)]
345mod tests {
346 use super::*;
347 use crate::storage::ttl::EvictionStrategy;
348 use automerge::Automerge;
349 use std::time::Duration;
350 use tokio::time::sleep;
351
352 #[tokio::test]
353 async fn test_set_ttl() -> Result<()> {
354 let temp_dir = tempfile::tempdir()?;
355 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
356 let config = TtlConfig::tactical();
357 let ttl_manager = TtlManager::new(store, config);
358
359 ttl_manager.set_ttl("beacons/node-123", Duration::from_secs(30))?;
361
362 assert_eq!(ttl_manager.pending_count(), 1);
364
365 Ok(())
366 }
367
368 #[tokio::test]
369 async fn test_cleanup_expired() -> Result<()> {
370 let temp_dir = tempfile::tempdir()?;
371 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
372 let config = TtlConfig::tactical();
373 let ttl_manager = TtlManager::new(store.clone(), config);
374
375 let doc = Automerge::new();
377 store.put("beacons/node-123", &doc)?;
378
379 ttl_manager.set_ttl("beacons/node-123", Duration::from_millis(100))?;
381
382 sleep(Duration::from_millis(150)).await;
384
385 let count = ttl_manager.cleanup_expired()?;
387 assert_eq!(count, 1);
388
389 let result = store.get("beacons/node-123")?;
391 assert!(result.is_none());
392
393 Ok(())
394 }
395
396 #[tokio::test]
397 async fn test_background_cleanup() -> Result<()> {
398 let temp_dir = tempfile::tempdir()?;
399 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
400 let config = TtlConfig::tactical();
401 let ttl_manager = TtlManager::new(store.clone(), config);
402
403 let doc = Automerge::new();
405 store.put("beacons/node-456", &doc)?;
406
407 ttl_manager.start_background_cleanup();
409
410 ttl_manager.set_ttl("beacons/node-456", Duration::from_secs(1))?;
412
413 sleep(Duration::from_secs(11)).await;
416
417 let result = store.get("beacons/node-456")?;
419 assert!(result.is_none());
420
421 ttl_manager.stop_background_cleanup();
422
423 Ok(())
424 }
425
426 #[tokio::test]
427 async fn test_put_with_ttl_registers_expiry() -> Result<()> {
428 let temp_dir = tempfile::tempdir()?;
429 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
430 let config = TtlConfig::tactical();
431 let ttl_manager = TtlManager::new(store.clone(), config);
432
433 let doc = Automerge::new();
435 store.put("beacons/doc1", &doc)?;
436
437 let beacon_ttl = ttl_manager.config().get_collection_ttl("beacons").unwrap();
439 ttl_manager.set_ttl("beacons/doc1", beacon_ttl)?;
440
441 assert_eq!(ttl_manager.pending_count(), 1);
442
443 Ok(())
444 }
445
446 #[tokio::test]
447 async fn test_put_with_ttl_no_ttl_collection() -> Result<()> {
448 let temp_dir = tempfile::tempdir()?;
449 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
450 let config = TtlConfig::tactical();
451 let ttl_manager = TtlManager::new(store.clone(), config);
452
453 let doc = Automerge::new();
455 store.put("hierarchical_commands/doc1", &doc)?;
456
457 let collection_ttl = ttl_manager
459 .config()
460 .get_collection_ttl("hierarchical_commands");
461 assert!(
462 collection_ttl.is_none(),
463 "hierarchical_commands should have no TTL"
464 );
465
466 if let Some(ttl) = collection_ttl {
468 ttl_manager.set_ttl("hierarchical_commands/doc1", ttl)?;
469 }
470
471 assert_eq!(ttl_manager.pending_count(), 0);
472
473 Ok(())
474 }
475
476 #[test]
477 fn test_tactical_preset_ttl_values() {
478 let config = TtlConfig::tactical();
479
480 assert_eq!(
481 config.beacon_ttl,
482 Duration::from_secs(300),
483 "beacon_ttl should be 5 minutes"
484 );
485 assert_eq!(
486 config.position_ttl,
487 Duration::from_secs(600),
488 "position_ttl should be 10 minutes"
489 );
490 assert_eq!(
491 config.capability_ttl,
492 Duration::from_secs(7200),
493 "capability_ttl should be 2 hours"
494 );
495 assert_eq!(
496 config.tombstone_ttl_hours, 168,
497 "tombstone TTL should be 7 days (168 hours)"
498 );
499 assert!(matches!(
500 config.evict_strategy,
501 EvictionStrategy::OldestFirst
502 ));
503 }
504
505 #[test]
506 fn test_effective_ttl_returns_collection_ttl() {
507 let config = TtlConfig::tactical();
508
509 assert_eq!(
511 config.get_collection_ttl("beacons"),
512 Some(Duration::from_secs(300))
513 );
514 assert_eq!(
515 config.get_collection_ttl("node_positions"),
516 Some(Duration::from_secs(600))
517 );
518 assert_eq!(
519 config.get_collection_ttl("capabilities"),
520 Some(Duration::from_secs(7200))
521 );
522 assert_eq!(
523 config.get_collection_ttl("cells"),
524 Some(Duration::from_secs(3600))
525 );
526
527 assert_eq!(config.get_collection_ttl("hierarchical_commands"), None);
529 assert_eq!(config.get_collection_ttl("unknown_collection"), None);
530 }
531
532 #[tokio::test]
533 async fn test_ttl_manager_with_automerge_store_cleanup() -> Result<()> {
534 let temp_dir = tempfile::tempdir()?;
535 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
536 let config = TtlConfig::tactical();
537 let ttl_manager = TtlManager::new(store.clone(), config);
538
539 let doc = Automerge::new();
541 store.put("beacons/ephemeral1", &doc)?;
542
543 let result = store.get("beacons/ephemeral1")?;
545 assert!(result.is_some(), "Document should exist before TTL expiry");
546
547 ttl_manager.set_ttl("beacons/ephemeral1", Duration::from_millis(100))?;
549 assert_eq!(ttl_manager.pending_count(), 1);
550
551 sleep(Duration::from_millis(150)).await;
553
554 let cleaned = ttl_manager.cleanup_expired()?;
556 assert_eq!(cleaned, 1, "Should have cleaned up 1 expired document");
557
558 let result = store.get("beacons/ephemeral1")?;
560 assert!(
561 result.is_none(),
562 "Document should be deleted after TTL expiry and cleanup"
563 );
564
565 assert_eq!(ttl_manager.pending_count(), 0);
567
568 Ok(())
569 }
570
571 #[tokio::test]
572 async fn test_eviction_strategy_keep_last_n() -> Result<()> {
573 let temp_dir = tempfile::tempdir()?;
574 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
575 let config = TtlConfig::new().with_eviction(EvictionStrategy::KeepLastN(2));
576 let ttl_manager = TtlManager::new(store.clone(), config);
577
578 for i in 0..5 {
580 let doc = Automerge::new();
581 store.put(&format!("beacons/node-{}", i), &doc)?;
582 ttl_manager.set_ttl(
583 &format!("beacons/node-{}", i),
584 Duration::from_millis(50 + i * 10),
585 )?;
586 }
587
588 sleep(Duration::from_millis(200)).await;
590
591 let count = ttl_manager.cleanup_expired()?;
593 assert_eq!(count, 3, "KeepLastN(2) should delete 3 of 5 expired docs");
594
595 Ok(())
596 }
597
598 #[tokio::test]
599 async fn test_extend_ttls_for_offline() -> Result<()> {
600 let temp_dir = tempfile::tempdir()?;
601 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
602 let config = TtlConfig::tactical(); let ttl_manager = TtlManager::new(store.clone(), config);
604
605 let doc = Automerge::new();
607 store.put("beacons/test-offline", &doc)?;
608 ttl_manager.set_ttl("beacons/test-offline", Duration::from_secs(1))?;
609 assert_eq!(ttl_manager.pending_count(), 1);
610
611 ttl_manager.extend_ttls_for_offline();
613
614 sleep(Duration::from_millis(1500)).await;
616
617 let count = ttl_manager.cleanup_expired()?;
619 assert_eq!(count, 0, "Extended TTL should not have expired yet");
620 assert_eq!(ttl_manager.pending_count(), 1);
621
622 Ok(())
623 }
624
625 #[tokio::test]
626 async fn test_extend_ttls_no_offline_policy() -> Result<()> {
627 let temp_dir = tempfile::tempdir()?;
628 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
629 let config = TtlConfig::long_duration(); let ttl_manager = TtlManager::new(store.clone(), config);
631
632 ttl_manager.set_ttl("beacons/test", Duration::from_millis(100))?;
633
634 ttl_manager.extend_ttls_for_offline();
636
637 sleep(Duration::from_millis(150)).await;
638 let count = ttl_manager.cleanup_expired()?;
639 assert_eq!(
640 count, 1,
641 "Without offline policy, TTL should not be extended"
642 );
643
644 Ok(())
645 }
646
647 #[tokio::test]
648 async fn test_multiple_documents_same_expiry() -> Result<()> {
649 let temp_dir = tempfile::tempdir()?;
650 let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
651 let config = TtlConfig::tactical();
652 let ttl_manager = TtlManager::new(store.clone(), config);
653
654 for i in 0..5 {
656 let doc = Automerge::new();
657 store.put(&format!("beacons/node-{}", i), &doc)?;
658 ttl_manager.set_ttl(&format!("beacons/node-{}", i), Duration::from_millis(100))?;
659 }
660
661 assert_eq!(ttl_manager.pending_count(), 5);
662
663 sleep(Duration::from_millis(150)).await;
665
666 let count = ttl_manager.cleanup_expired()?;
668 assert_eq!(count, 5);
669
670 for i in 0..5 {
672 let result = store.get(&format!("beacons/node-{}", i))?;
673 assert!(result.is_none());
674 }
675
676 Ok(())
677 }
678}