use super::automerge_store::AutomergeStore;
use super::ttl::TtlConfig;
use anyhow::Result;
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
pub struct TtlManager {
store: Arc<AutomergeStore>,
config: TtlConfig,
expiry_map: Arc<RwLock<BTreeMap<Instant, Vec<String>>>>,
cleanup_task: Arc<RwLock<Option<JoinHandle<()>>>>,
}
impl TtlManager {
pub fn new(store: Arc<AutomergeStore>, config: TtlConfig) -> Self {
Self {
store,
config,
expiry_map: Arc::new(RwLock::new(BTreeMap::new())),
cleanup_task: Arc::new(RwLock::new(None)),
}
}
pub fn set_ttl(&self, key: &str, ttl: Duration) -> Result<()> {
let expiry_time = Instant::now() + ttl;
let mut expiry_map = self.expiry_map.write().unwrap_or_else(|e| e.into_inner());
expiry_map
.entry(expiry_time)
.or_default()
.push(key.to_string());
Ok(())
}
pub fn cleanup_expired(&self) -> Result<usize> {
let now = Instant::now();
let mut count = 0;
let expired_keys = {
let mut expiry_map = self.expiry_map.write().unwrap_or_else(|e| e.into_inner());
let split_key = expiry_map
.range(..=now)
.next_back()
.map(|(k, _)| *k)
.unwrap_or(now);
let expired: Vec<_> = expiry_map
.range(..=split_key)
.flat_map(|(expiry, keys)| keys.iter().map(move |k| (*expiry, k.clone())))
.collect();
expiry_map.retain(|&expiry_time, _| expiry_time > now);
expired
};
let ordered_keys = self.apply_eviction_strategy(expired_keys);
for key in ordered_keys {
self.store.delete(&key)?;
count += 1;
}
Ok(count)
}
fn apply_eviction_strategy(&self, mut expired: Vec<(Instant, String)>) -> Vec<String> {
use super::ttl::EvictionStrategy;
match self.config.evict_strategy {
EvictionStrategy::OldestFirst => {
expired.sort_by_key(|(expiry, _)| *expiry);
expired.into_iter().map(|(_, key)| key).collect()
}
EvictionStrategy::KeepLastN(n) => {
use std::collections::HashMap;
let mut by_collection: HashMap<String, Vec<(Instant, String)>> = HashMap::new();
for (expiry, key) in expired {
let collection = key.split('/').next().unwrap_or("").to_string();
by_collection
.entry(collection)
.or_default()
.push((expiry, key));
}
let mut to_delete = Vec::new();
for (_collection, mut entries) in by_collection {
entries.sort_by_key(|(expiry, _)| std::cmp::Reverse(*expiry));
to_delete.extend(entries.into_iter().skip(n).map(|(_, key)| key));
}
to_delete
}
EvictionStrategy::StoragePressure { .. } | EvictionStrategy::None => {
expired.into_iter().map(|(_, key)| key).collect()
}
}
}
pub fn extend_ttls_for_offline(&self) {
let policy = match &self.config.offline_policy {
Some(p) => p,
None => return, };
let online_secs = policy.online_ttl.as_secs_f64();
let offline_secs = policy.offline_ttl.as_secs_f64();
if offline_secs <= 0.0 || online_secs <= 0.0 {
return;
}
let factor = online_secs / offline_secs;
if factor <= 1.0 {
return; }
let now = Instant::now();
let mut expiry_map = self.expiry_map.write().unwrap_or_else(|e| e.into_inner());
let entries: Vec<_> = expiry_map
.iter()
.filter(|(expiry, _)| **expiry > now)
.flat_map(|(expiry, keys)| keys.iter().map(move |k| (*expiry, k.clone())))
.collect();
expiry_map.clear();
for (old_expiry, key) in entries {
let remaining = old_expiry.duration_since(now);
let extended = Duration::from_secs_f64(remaining.as_secs_f64() * factor);
let new_expiry = now + extended;
expiry_map.entry(new_expiry).or_default().push(key);
}
}
pub fn start_background_cleanup(&self) {
let expiry_map = self.expiry_map.clone();
let store = self.store.clone();
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
let now = Instant::now();
let expired_docs = {
let mut expiry_map = expiry_map.write().unwrap_or_else(|e| e.into_inner());
let split_key = expiry_map
.range(..=now)
.next_back()
.map(|(k, _)| *k)
.unwrap_or(now);
let expired: Vec<_> = expiry_map
.range(..=split_key)
.flat_map(|(_, docs)| docs.clone())
.collect();
expiry_map.retain(|&expiry_time, _| expiry_time > now);
expired
};
for key in expired_docs {
if let Err(e) = store.delete(&key) {
eprintln!("TTL cleanup failed for {}: {}", key, e);
}
}
}
});
*self.cleanup_task.write().unwrap_or_else(|e| e.into_inner()) = Some(handle);
}
pub fn stop_background_cleanup(&self) {
if let Some(handle) = self
.cleanup_task
.write()
.unwrap_or_else(|e| e.into_inner())
.take()
{
handle.abort();
}
}
pub fn config(&self) -> &TtlConfig {
&self.config
}
pub fn pending_count(&self) -> usize {
self.expiry_map
.read()
.unwrap()
.values()
.map(|docs| docs.len())
.sum()
}
}
impl Drop for TtlManager {
fn drop(&mut self) {
self.stop_background_cleanup();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::ttl::EvictionStrategy;
use automerge::Automerge;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_set_ttl() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::tactical();
let ttl_manager = TtlManager::new(store, config);
ttl_manager.set_ttl("beacons/node-123", Duration::from_secs(30))?;
assert_eq!(ttl_manager.pending_count(), 1);
Ok(())
}
#[tokio::test]
async fn test_cleanup_expired() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::tactical();
let ttl_manager = TtlManager::new(store.clone(), config);
let doc = Automerge::new();
store.put("beacons/node-123", &doc)?;
ttl_manager.set_ttl("beacons/node-123", Duration::from_millis(100))?;
sleep(Duration::from_millis(150)).await;
let count = ttl_manager.cleanup_expired()?;
assert_eq!(count, 1);
let result = store.get("beacons/node-123")?;
assert!(result.is_none());
Ok(())
}
#[tokio::test]
async fn test_background_cleanup() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::tactical();
let ttl_manager = TtlManager::new(store.clone(), config);
let doc = Automerge::new();
store.put("beacons/node-456", &doc)?;
ttl_manager.start_background_cleanup();
ttl_manager.set_ttl("beacons/node-456", Duration::from_secs(1))?;
sleep(Duration::from_secs(11)).await;
let result = store.get("beacons/node-456")?;
assert!(result.is_none());
ttl_manager.stop_background_cleanup();
Ok(())
}
#[tokio::test]
async fn test_put_with_ttl_registers_expiry() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::tactical();
let ttl_manager = TtlManager::new(store.clone(), config);
let doc = Automerge::new();
store.put("beacons/doc1", &doc)?;
let beacon_ttl = ttl_manager.config().get_collection_ttl("beacons").unwrap();
ttl_manager.set_ttl("beacons/doc1", beacon_ttl)?;
assert_eq!(ttl_manager.pending_count(), 1);
Ok(())
}
#[tokio::test]
async fn test_put_with_ttl_no_ttl_collection() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::tactical();
let ttl_manager = TtlManager::new(store.clone(), config);
let doc = Automerge::new();
store.put("hierarchical_commands/doc1", &doc)?;
let collection_ttl = ttl_manager
.config()
.get_collection_ttl("hierarchical_commands");
assert!(
collection_ttl.is_none(),
"hierarchical_commands should have no TTL"
);
if let Some(ttl) = collection_ttl {
ttl_manager.set_ttl("hierarchical_commands/doc1", ttl)?;
}
assert_eq!(ttl_manager.pending_count(), 0);
Ok(())
}
#[test]
fn test_tactical_preset_ttl_values() {
let config = TtlConfig::tactical();
assert_eq!(
config.beacon_ttl,
Duration::from_secs(300),
"beacon_ttl should be 5 minutes"
);
assert_eq!(
config.position_ttl,
Duration::from_secs(600),
"position_ttl should be 10 minutes"
);
assert_eq!(
config.capability_ttl,
Duration::from_secs(7200),
"capability_ttl should be 2 hours"
);
assert_eq!(
config.tombstone_ttl_hours, 168,
"tombstone TTL should be 7 days (168 hours)"
);
assert!(matches!(
config.evict_strategy,
EvictionStrategy::OldestFirst
));
}
#[test]
fn test_effective_ttl_returns_collection_ttl() {
let config = TtlConfig::tactical();
assert_eq!(
config.get_collection_ttl("beacons"),
Some(Duration::from_secs(300))
);
assert_eq!(
config.get_collection_ttl("node_positions"),
Some(Duration::from_secs(600))
);
assert_eq!(
config.get_collection_ttl("capabilities"),
Some(Duration::from_secs(7200))
);
assert_eq!(
config.get_collection_ttl("cells"),
Some(Duration::from_secs(3600))
);
assert_eq!(config.get_collection_ttl("hierarchical_commands"), None);
assert_eq!(config.get_collection_ttl("unknown_collection"), None);
}
#[tokio::test]
async fn test_ttl_manager_with_automerge_store_cleanup() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::tactical();
let ttl_manager = TtlManager::new(store.clone(), config);
let doc = Automerge::new();
store.put("beacons/ephemeral1", &doc)?;
let result = store.get("beacons/ephemeral1")?;
assert!(result.is_some(), "Document should exist before TTL expiry");
ttl_manager.set_ttl("beacons/ephemeral1", Duration::from_millis(100))?;
assert_eq!(ttl_manager.pending_count(), 1);
sleep(Duration::from_millis(150)).await;
let cleaned = ttl_manager.cleanup_expired()?;
assert_eq!(cleaned, 1, "Should have cleaned up 1 expired document");
let result = store.get("beacons/ephemeral1")?;
assert!(
result.is_none(),
"Document should be deleted after TTL expiry and cleanup"
);
assert_eq!(ttl_manager.pending_count(), 0);
Ok(())
}
#[tokio::test]
async fn test_eviction_strategy_keep_last_n() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::new().with_eviction(EvictionStrategy::KeepLastN(2));
let ttl_manager = TtlManager::new(store.clone(), config);
for i in 0..5 {
let doc = Automerge::new();
store.put(&format!("beacons/node-{}", i), &doc)?;
ttl_manager.set_ttl(
&format!("beacons/node-{}", i),
Duration::from_millis(50 + i * 10),
)?;
}
sleep(Duration::from_millis(200)).await;
let count = ttl_manager.cleanup_expired()?;
assert_eq!(count, 3, "KeepLastN(2) should delete 3 of 5 expired docs");
Ok(())
}
#[tokio::test]
async fn test_extend_ttls_for_offline() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::tactical(); let ttl_manager = TtlManager::new(store.clone(), config);
let doc = Automerge::new();
store.put("beacons/test-offline", &doc)?;
ttl_manager.set_ttl("beacons/test-offline", Duration::from_secs(1))?;
assert_eq!(ttl_manager.pending_count(), 1);
ttl_manager.extend_ttls_for_offline();
sleep(Duration::from_millis(1500)).await;
let count = ttl_manager.cleanup_expired()?;
assert_eq!(count, 0, "Extended TTL should not have expired yet");
assert_eq!(ttl_manager.pending_count(), 1);
Ok(())
}
#[tokio::test]
async fn test_extend_ttls_no_offline_policy() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::long_duration(); let ttl_manager = TtlManager::new(store.clone(), config);
ttl_manager.set_ttl("beacons/test", Duration::from_millis(100))?;
ttl_manager.extend_ttls_for_offline();
sleep(Duration::from_millis(150)).await;
let count = ttl_manager.cleanup_expired()?;
assert_eq!(
count, 1,
"Without offline policy, TTL should not be extended"
);
Ok(())
}
#[tokio::test]
async fn test_multiple_documents_same_expiry() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let store = Arc::new(AutomergeStore::open(temp_dir.path())?);
let config = TtlConfig::tactical();
let ttl_manager = TtlManager::new(store.clone(), config);
for i in 0..5 {
let doc = Automerge::new();
store.put(&format!("beacons/node-{}", i), &doc)?;
ttl_manager.set_ttl(&format!("beacons/node-{}", i), Duration::from_millis(100))?;
}
assert_eq!(ttl_manager.pending_count(), 5);
sleep(Duration::from_millis(150)).await;
let count = ttl_manager.cleanup_expired()?;
assert_eq!(count, 5);
for i in 0..5 {
let result = store.get(&format!("beacons/node-{}", i))?;
assert!(result.is_none());
}
Ok(())
}
}