Skip to main content

peat_mesh/storage/
ttl_manager.rs

1//! TTL Manager for automatic document expiration
2//!
3//! This module provides automatic cleanup of expired documents based on Time-To-Live (TTL)
4//! configuration. It is particularly critical for beacon documents which must expire after
5//! 30 seconds to prevent stale position data from affecting cell formation.
6//!
7//! # Architecture
8//!
9//! ```text
10//! Collection API
11//!     ↓
12//! upsert_with_ttl(key, data, ttl)
13//!     ↓
14//! TtlManager::set_ttl(key, ttl)
15//!     ↓
16//! BTreeMap<Instant, Vec<String>>
17//!     ↓
18//! Background cleanup task (every 10s)
19//!     ↓
20//! AutomergeStore::delete()
21//! ```
22//!
23//! # Two-Layer TTL Strategy (ADR-002)
24//!
25//! **Storage Layer**: Tombstone TTL-based eviction
26//! **Memory Layer**: Janitor cleanup for Automerge+Iroh (this module)
27//!
28//! # Usage Example
29//!
30//! ```ignore
31//! let ttl_manager = TtlManager::new(store.clone(), config.clone());
32//! ttl_manager.start_background_cleanup();
33//!
34//! // Set TTL for a beacon document
35//! ttl_manager.set_ttl("beacons", "node-123", Duration::from_secs(30))?;
36//!
37//! // Document will be automatically deleted after 30 seconds
38//! ```
39
40use super::automerge_store::AutomergeStore;
41use super::ttl::TtlConfig;
42use anyhow::Result;
43use std::collections::BTreeMap;
44use std::sync::{Arc, RwLock};
45use std::time::{Duration, Instant};
46use tokio::task::JoinHandle;
47
48/// TTL Manager for automatic document expiration
49///
50/// Tracks document expiry times and runs background cleanup task to remove
51/// expired documents from the store.
52pub struct TtlManager {
53    /// Underlying Automerge store for document operations
54    store: Arc<AutomergeStore>,
55
56    /// TTL configuration
57    config: TtlConfig,
58
59    /// Expiry tracking: Instant → Vec<key>
60    ///
61    /// BTreeMap ensures efficient range queries for expired documents.
62    /// Each expiry time maps to all document keys that expire at that time.
63    /// Keys are in the format "collection/doc_id" (e.g., "beacons/node-123").
64    expiry_map: Arc<RwLock<BTreeMap<Instant, Vec<String>>>>,
65
66    /// Background cleanup task handle
67    cleanup_task: Arc<RwLock<Option<JoinHandle<()>>>>,
68}
69
70impl TtlManager {
71    /// Create a new TTL Manager
72    ///
73    /// # Arguments
74    ///
75    /// * `store` - AutomergeStore for document deletion
76    /// * `config` - TTL configuration (beacon_ttl, position_ttl, etc.)
77    ///
78    /// # Example
79    ///
80    /// ```ignore
81    /// let store = AutomergeStore::new(iroh_transport.clone());
82    /// let config = TtlConfig::tactical(); // 30s beacon TTL
83    /// let ttl_manager = TtlManager::new(store, config);
84    /// ```
85    pub fn new(store: Arc<AutomergeStore>, config: TtlConfig) -> Self {
86        Self {
87            store,
88            config,
89            expiry_map: Arc::new(RwLock::new(BTreeMap::new())),
90            cleanup_task: Arc::new(RwLock::new(None)),
91        }
92    }
93
94    /// Schedule a document for expiration
95    ///
96    /// # Arguments
97    ///
98    /// * `key` - Full document key in format "collection/doc_id" (e.g., "beacons/node-123")
99    /// * `ttl` - Time until expiration
100    ///
101    /// # Example
102    ///
103    /// ```ignore
104    /// // Beacon expires in 30 seconds
105    /// ttl_manager.set_ttl("beacons/node-123", Duration::from_secs(30))?;
106    /// ```
107    pub fn set_ttl(&self, key: &str, ttl: Duration) -> Result<()> {
108        let expiry_time = Instant::now() + ttl;
109
110        let mut expiry_map = self.expiry_map.write().unwrap_or_else(|e| e.into_inner());
111        expiry_map
112            .entry(expiry_time)
113            .or_default()
114            .push(key.to_string());
115
116        Ok(())
117    }
118
119    /// Remove all expired documents
120    ///
121    /// This method is called by the background cleanup task every 10 seconds.
122    /// It finds all documents with expiry times <= now and deletes them.
123    /// Documents are ordered according to the configured eviction strategy
124    /// (OldestFirst, KeepLastN, etc.) before deletion.
125    ///
126    /// # Returns
127    ///
128    /// Number of documents cleaned up
129    pub fn cleanup_expired(&self) -> Result<usize> {
130        let now = Instant::now();
131        let mut count = 0;
132
133        // Get all expired entries (expiry_time <= now)
134        let expired_keys = {
135            let mut expiry_map = self.expiry_map.write().unwrap_or_else(|e| e.into_inner());
136
137            // Split at first entry > now, take everything before
138            let split_key = expiry_map
139                .range(..=now)
140                .next_back()
141                .map(|(k, _)| *k)
142                .unwrap_or(now);
143
144            // Collect expired entries with their expiry times for strategy-aware ordering
145            let expired: Vec<_> = expiry_map
146                .range(..=split_key)
147                .flat_map(|(expiry, keys)| keys.iter().map(move |k| (*expiry, k.clone())))
148                .collect();
149
150            // Remove from map
151            expiry_map.retain(|&expiry_time, _| expiry_time > now);
152
153            expired
154        };
155
156        // Apply eviction strategy ordering
157        let ordered_keys = self.apply_eviction_strategy(expired_keys);
158
159        // Delete each expired document
160        for key in ordered_keys {
161            self.store.delete(&key)?;
162            count += 1;
163        }
164
165        Ok(count)
166    }
167
168    /// Order expired keys according to the configured eviction strategy
169    fn apply_eviction_strategy(&self, mut expired: Vec<(Instant, String)>) -> Vec<String> {
170        use super::ttl::EvictionStrategy;
171
172        match self.config.evict_strategy {
173            EvictionStrategy::OldestFirst => {
174                // Sort by expiry time ascending (oldest expired first)
175                expired.sort_by_key(|(expiry, _)| *expiry);
176                expired.into_iter().map(|(_, key)| key).collect()
177            }
178            EvictionStrategy::KeepLastN(n) => {
179                // Group by collection, keep last N per collection, evict the rest
180                use std::collections::HashMap;
181                let mut by_collection: HashMap<String, Vec<(Instant, String)>> = HashMap::new();
182                for (expiry, key) in expired {
183                    let collection = key.split('/').next().unwrap_or("").to_string();
184                    by_collection
185                        .entry(collection)
186                        .or_default()
187                        .push((expiry, key));
188                }
189                let mut to_delete = Vec::new();
190                for (_collection, mut entries) in by_collection {
191                    // Sort newest first, skip the last N, delete the rest
192                    entries.sort_by_key(|(expiry, _)| std::cmp::Reverse(*expiry));
193                    to_delete.extend(entries.into_iter().skip(n).map(|(_, key)| key));
194                }
195                to_delete
196            }
197            EvictionStrategy::StoragePressure { .. } | EvictionStrategy::None => {
198                // No special ordering — delete all expired
199                expired.into_iter().map(|(_, key)| key).collect()
200            }
201        }
202    }
203
204    /// Extend TTLs for all pending documents when the node is offline
205    ///
206    /// When no peers are connected, this extends remaining TTLs by the
207    /// configured offline retention multiplier (offline_ttl / online_ttl ratio).
208    /// This prevents premature eviction of data that can't be re-synced.
209    ///
210    /// Should be called periodically from the sync loop when `connected_peers()` is empty.
211    pub fn extend_ttls_for_offline(&self) {
212        let policy = match &self.config.offline_policy {
213            Some(p) => p,
214            None => return, // No offline policy configured
215        };
216
217        // Calculate extension factor: ratio of online to offline TTL
218        // A ratio > 1 means we extend (online is longer than offline)
219        // But when going offline, we want to retain longer, so use online/offline
220        let online_secs = policy.online_ttl.as_secs_f64();
221        let offline_secs = policy.offline_ttl.as_secs_f64();
222        if offline_secs <= 0.0 || online_secs <= 0.0 {
223            return;
224        }
225        // Extension factor: e.g., online=600s, offline=60s → factor=10x
226        let factor = online_secs / offline_secs;
227        if factor <= 1.0 {
228            return; // Offline TTL is already >= online TTL, nothing to extend
229        }
230
231        let now = Instant::now();
232        let mut expiry_map = self.expiry_map.write().unwrap_or_else(|e| e.into_inner());
233
234        // Collect entries that haven't expired yet and extend them
235        let entries: Vec<_> = expiry_map
236            .iter()
237            .filter(|(expiry, _)| **expiry > now)
238            .flat_map(|(expiry, keys)| keys.iter().map(move |k| (*expiry, k.clone())))
239            .collect();
240
241        // Remove the old entries and re-insert with extended expiry
242        expiry_map.clear();
243        for (old_expiry, key) in entries {
244            let remaining = old_expiry.duration_since(now);
245            let extended = Duration::from_secs_f64(remaining.as_secs_f64() * factor);
246            let new_expiry = now + extended;
247            expiry_map.entry(new_expiry).or_default().push(key);
248        }
249    }
250
251    /// Start background cleanup task
252    ///
253    /// Spawns a tokio task that runs cleanup_expired() every 10 seconds.
254    ///
255    /// # Example
256    ///
257    /// ```ignore
258    /// let ttl_manager = TtlManager::new(store, config);
259    /// ttl_manager.start_background_cleanup();
260    /// ```
261    pub fn start_background_cleanup(&self) {
262        let expiry_map = self.expiry_map.clone();
263        let store = self.store.clone();
264
265        let handle = tokio::spawn(async move {
266            let mut interval = tokio::time::interval(Duration::from_secs(10));
267
268            loop {
269                interval.tick().await;
270
271                // Run cleanup
272                let now = Instant::now();
273                let expired_docs = {
274                    let mut expiry_map = expiry_map.write().unwrap_or_else(|e| e.into_inner());
275
276                    // Get all expired entries
277                    let split_key = expiry_map
278                        .range(..=now)
279                        .next_back()
280                        .map(|(k, _)| *k)
281                        .unwrap_or(now);
282
283                    let expired: Vec<_> = expiry_map
284                        .range(..=split_key)
285                        .flat_map(|(_, docs)| docs.clone())
286                        .collect();
287
288                    // Remove from map
289                    expiry_map.retain(|&expiry_time, _| expiry_time > now);
290
291                    expired
292                };
293
294                // Delete expired documents
295                for key in expired_docs {
296                    if let Err(e) = store.delete(&key) {
297                        eprintln!("TTL cleanup failed for {}: {}", key, e);
298                    }
299                }
300            }
301        });
302
303        *self.cleanup_task.write().unwrap_or_else(|e| e.into_inner()) = Some(handle);
304    }
305
306    /// Stop background cleanup task
307    pub fn stop_background_cleanup(&self) {
308        if let Some(handle) = self
309            .cleanup_task
310            .write()
311            .unwrap_or_else(|e| e.into_inner())
312            .take()
313        {
314            handle.abort();
315        }
316    }
317
318    /// Get TTL configuration
319    pub fn config(&self) -> &TtlConfig {
320        &self.config
321    }
322
323    /// Get count of documents scheduled for expiration
324    pub fn pending_count(&self) -> usize {
325        self.expiry_map
326            .read()
327            .unwrap()
328            .values()
329            .map(|docs| docs.len())
330            .sum()
331    }
332}
333
334impl Drop for TtlManager {
335    fn drop(&mut self) {
336        self.stop_background_cleanup();
337    }
338}
339
340// ============================================================================
341// Tests
342// ============================================================================
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use crate::storage::ttl::EvictionStrategy;
348    use automerge::Automerge;
349    use std::time::Duration;
350    use tokio::time::sleep;
351
352    #[tokio::test]
353    async fn test_set_ttl() -> Result<()> {
354        let temp_dir = tempfile::tempdir()?;
355        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
356        let config = TtlConfig::tactical();
357        let ttl_manager = TtlManager::new(store, config);
358
359        // Set TTL for a beacon
360        ttl_manager.set_ttl("beacons/node-123", Duration::from_secs(30))?;
361
362        // Verify it's tracked
363        assert_eq!(ttl_manager.pending_count(), 1);
364
365        Ok(())
366    }
367
368    #[tokio::test]
369    async fn test_cleanup_expired() -> Result<()> {
370        let temp_dir = tempfile::tempdir()?;
371        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
372        let config = TtlConfig::tactical();
373        let ttl_manager = TtlManager::new(store.clone(), config);
374
375        // Insert a test document
376        let doc = Automerge::new();
377        store.put("beacons/node-123", &doc)?;
378
379        // Set very short TTL (100ms)
380        ttl_manager.set_ttl("beacons/node-123", Duration::from_millis(100))?;
381
382        // Wait for expiration
383        sleep(Duration::from_millis(150)).await;
384
385        // Run cleanup
386        let count = ttl_manager.cleanup_expired()?;
387        assert_eq!(count, 1);
388
389        // Verify document is gone
390        let result = store.get("beacons/node-123")?;
391        assert!(result.is_none());
392
393        Ok(())
394    }
395
396    #[tokio::test]
397    async fn test_background_cleanup() -> Result<()> {
398        let temp_dir = tempfile::tempdir()?;
399        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
400        let config = TtlConfig::tactical();
401        let ttl_manager = TtlManager::new(store.clone(), config);
402
403        // Insert test document
404        let doc = Automerge::new();
405        store.put("beacons/node-456", &doc)?;
406
407        // Start background cleanup
408        ttl_manager.start_background_cleanup();
409
410        // Set very short TTL (1 second)
411        ttl_manager.set_ttl("beacons/node-456", Duration::from_secs(1))?;
412
413        // Wait for background cleanup (runs every 10s, but document expires in 1s)
414        // We need to wait at least 11 seconds for cleanup to run
415        sleep(Duration::from_secs(11)).await;
416
417        // Verify document is gone
418        let result = store.get("beacons/node-456")?;
419        assert!(result.is_none());
420
421        ttl_manager.stop_background_cleanup();
422
423        Ok(())
424    }
425
426    #[tokio::test]
427    async fn test_put_with_ttl_registers_expiry() -> Result<()> {
428        let temp_dir = tempfile::tempdir()?;
429        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
430        let config = TtlConfig::tactical();
431        let ttl_manager = TtlManager::new(store.clone(), config);
432
433        // Create a document and store it with TTL via the integrated path
434        let doc = Automerge::new();
435        store.put("beacons/doc1", &doc)?;
436
437        // Beacons have a TTL in tactical config (5 min), so set_ttl should register it
438        let beacon_ttl = ttl_manager.config().get_collection_ttl("beacons").unwrap();
439        ttl_manager.set_ttl("beacons/doc1", beacon_ttl)?;
440
441        assert_eq!(ttl_manager.pending_count(), 1);
442
443        Ok(())
444    }
445
446    #[tokio::test]
447    async fn test_put_with_ttl_no_ttl_collection() -> Result<()> {
448        let temp_dir = tempfile::tempdir()?;
449        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
450        let config = TtlConfig::tactical();
451        let ttl_manager = TtlManager::new(store.clone(), config);
452
453        // hierarchical_commands has no TTL configured (returns None)
454        let doc = Automerge::new();
455        store.put("hierarchical_commands/doc1", &doc)?;
456
457        // Only register TTL if the collection has one configured
458        let collection_ttl = ttl_manager
459            .config()
460            .get_collection_ttl("hierarchical_commands");
461        assert!(
462            collection_ttl.is_none(),
463            "hierarchical_commands should have no TTL"
464        );
465
466        // Since there's no TTL for this collection, nothing should be registered
467        if let Some(ttl) = collection_ttl {
468            ttl_manager.set_ttl("hierarchical_commands/doc1", ttl)?;
469        }
470
471        assert_eq!(ttl_manager.pending_count(), 0);
472
473        Ok(())
474    }
475
476    #[test]
477    fn test_tactical_preset_ttl_values() {
478        let config = TtlConfig::tactical();
479
480        assert_eq!(
481            config.beacon_ttl,
482            Duration::from_secs(300),
483            "beacon_ttl should be 5 minutes"
484        );
485        assert_eq!(
486            config.position_ttl,
487            Duration::from_secs(600),
488            "position_ttl should be 10 minutes"
489        );
490        assert_eq!(
491            config.capability_ttl,
492            Duration::from_secs(7200),
493            "capability_ttl should be 2 hours"
494        );
495        assert_eq!(
496            config.tombstone_ttl_hours, 168,
497            "tombstone TTL should be 7 days (168 hours)"
498        );
499        assert!(matches!(
500            config.evict_strategy,
501            EvictionStrategy::OldestFirst
502        ));
503    }
504
505    #[test]
506    fn test_effective_ttl_returns_collection_ttl() {
507        let config = TtlConfig::tactical();
508
509        // Known collections should return their configured TTLs
510        assert_eq!(
511            config.get_collection_ttl("beacons"),
512            Some(Duration::from_secs(300))
513        );
514        assert_eq!(
515            config.get_collection_ttl("node_positions"),
516            Some(Duration::from_secs(600))
517        );
518        assert_eq!(
519            config.get_collection_ttl("capabilities"),
520            Some(Duration::from_secs(7200))
521        );
522        assert_eq!(
523            config.get_collection_ttl("cells"),
524            Some(Duration::from_secs(3600))
525        );
526
527        // Collections without TTL should return None
528        assert_eq!(config.get_collection_ttl("hierarchical_commands"), None);
529        assert_eq!(config.get_collection_ttl("unknown_collection"), None);
530    }
531
532    #[tokio::test]
533    async fn test_ttl_manager_with_automerge_store_cleanup() -> Result<()> {
534        let temp_dir = tempfile::tempdir()?;
535        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
536        let config = TtlConfig::tactical();
537        let ttl_manager = TtlManager::new(store.clone(), config);
538
539        // Insert a document into the store
540        let doc = Automerge::new();
541        store.put("beacons/ephemeral1", &doc)?;
542
543        // Verify document exists
544        let result = store.get("beacons/ephemeral1")?;
545        assert!(result.is_some(), "Document should exist before TTL expiry");
546
547        // Set a very short TTL (100ms)
548        ttl_manager.set_ttl("beacons/ephemeral1", Duration::from_millis(100))?;
549        assert_eq!(ttl_manager.pending_count(), 1);
550
551        // Wait for expiration
552        sleep(Duration::from_millis(150)).await;
553
554        // Run cleanup and verify the document was deleted
555        let cleaned = ttl_manager.cleanup_expired()?;
556        assert_eq!(cleaned, 1, "Should have cleaned up 1 expired document");
557
558        // Verify the document is gone from the store
559        let result = store.get("beacons/ephemeral1")?;
560        assert!(
561            result.is_none(),
562            "Document should be deleted after TTL expiry and cleanup"
563        );
564
565        // Verify pending count is back to 0
566        assert_eq!(ttl_manager.pending_count(), 0);
567
568        Ok(())
569    }
570
571    #[tokio::test]
572    async fn test_eviction_strategy_keep_last_n() -> Result<()> {
573        let temp_dir = tempfile::tempdir()?;
574        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
575        let config = TtlConfig::new().with_eviction(EvictionStrategy::KeepLastN(2));
576        let ttl_manager = TtlManager::new(store.clone(), config);
577
578        // Insert 5 beacon documents with staggered short TTLs
579        for i in 0..5 {
580            let doc = Automerge::new();
581            store.put(&format!("beacons/node-{}", i), &doc)?;
582            ttl_manager.set_ttl(
583                &format!("beacons/node-{}", i),
584                Duration::from_millis(50 + i * 10),
585            )?;
586        }
587
588        // Wait for all to expire
589        sleep(Duration::from_millis(200)).await;
590
591        // Cleanup should keep last 2 (newest) and delete 3
592        let count = ttl_manager.cleanup_expired()?;
593        assert_eq!(count, 3, "KeepLastN(2) should delete 3 of 5 expired docs");
594
595        Ok(())
596    }
597
598    #[tokio::test]
599    async fn test_extend_ttls_for_offline() -> Result<()> {
600        let temp_dir = tempfile::tempdir()?;
601        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
602        let config = TtlConfig::tactical(); // online_ttl=600s, offline_ttl=60s → 10x extension
603        let ttl_manager = TtlManager::new(store.clone(), config);
604
605        // Set a TTL that expires in 1 second
606        let doc = Automerge::new();
607        store.put("beacons/test-offline", &doc)?;
608        ttl_manager.set_ttl("beacons/test-offline", Duration::from_secs(1))?;
609        assert_eq!(ttl_manager.pending_count(), 1);
610
611        // Extend TTLs (simulates going offline) — should multiply remaining by 10x
612        ttl_manager.extend_ttls_for_offline();
613
614        // Wait 1.5 seconds — original would have expired, extended should not
615        sleep(Duration::from_millis(1500)).await;
616
617        // Cleanup should find 0 expired (because TTL was extended)
618        let count = ttl_manager.cleanup_expired()?;
619        assert_eq!(count, 0, "Extended TTL should not have expired yet");
620        assert_eq!(ttl_manager.pending_count(), 1);
621
622        Ok(())
623    }
624
625    #[tokio::test]
626    async fn test_extend_ttls_no_offline_policy() -> Result<()> {
627        let temp_dir = tempfile::tempdir()?;
628        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
629        let config = TtlConfig::long_duration(); // offline_policy = None
630        let ttl_manager = TtlManager::new(store.clone(), config);
631
632        ttl_manager.set_ttl("beacons/test", Duration::from_millis(100))?;
633
634        // Should be a no-op when no offline policy configured
635        ttl_manager.extend_ttls_for_offline();
636
637        sleep(Duration::from_millis(150)).await;
638        let count = ttl_manager.cleanup_expired()?;
639        assert_eq!(
640            count, 1,
641            "Without offline policy, TTL should not be extended"
642        );
643
644        Ok(())
645    }
646
647    #[tokio::test]
648    async fn test_multiple_documents_same_expiry() -> Result<()> {
649        let temp_dir = tempfile::tempdir()?;
650        let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
651        let config = TtlConfig::tactical();
652        let ttl_manager = TtlManager::new(store.clone(), config);
653
654        // Insert multiple documents
655        for i in 0..5 {
656            let doc = Automerge::new();
657            store.put(&format!("beacons/node-{}", i), &doc)?;
658            ttl_manager.set_ttl(&format!("beacons/node-{}", i), Duration::from_millis(100))?;
659        }
660
661        assert_eq!(ttl_manager.pending_count(), 5);
662
663        // Wait for expiration
664        sleep(Duration::from_millis(150)).await;
665
666        // Run cleanup
667        let count = ttl_manager.cleanup_expired()?;
668        assert_eq!(count, 5);
669
670        // Verify all documents are gone
671        for i in 0..5 {
672            let result = store.get(&format!("beacons/node-{}", i))?;
673            assert!(result.is_none());
674        }
675
676        Ok(())
677    }
678}