use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId, row::Ttl};
use crate::cache::{CatalogCache, MultiVersionRowTtl};
impl CatalogCache {
pub fn find_operator_ttl_at(&self, node: FlowNodeId, version: CommitVersion) -> Option<Ttl> {
self.operator_ttls.get(&node).and_then(|entry| {
let multi = entry.value();
multi.get(version)
})
}
pub fn find_operator_ttl(&self, node: FlowNodeId) -> Option<Ttl> {
self.operator_ttls.get(&node).and_then(|entry| {
let multi = entry.value();
multi.get_latest()
})
}
pub fn set_operator_ttl(&self, node: FlowNodeId, version: CommitVersion, config: Option<Ttl>) {
let multi = self.operator_ttls.get_or_insert_with(node, MultiVersionRowTtl::new);
if let Some(new_config) = config {
multi.value().insert(version, new_config);
} else {
multi.value().remove(version);
}
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::row::{TtlAnchor, TtlCleanupMode};
use super::*;
#[test]
fn test_set_and_find_operator_ttl() {
let catalog = CatalogCache::new();
let node = FlowNodeId(1);
let config = Ttl {
duration_nanos: 300_000_000_000,
anchor: TtlAnchor::Created,
cleanup_mode: TtlCleanupMode::Drop,
};
catalog.set_operator_ttl(node, CommitVersion(1), Some(config.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(1)), Some(config.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(5)), Some(config.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(0)), None);
assert_eq!(catalog.find_operator_ttl(node), Some(config));
}
#[test]
fn test_operator_ttl_update() {
let catalog = CatalogCache::new();
let node = FlowNodeId(42);
let config_v1 = Ttl {
duration_nanos: 300_000_000_000,
anchor: TtlAnchor::Created,
cleanup_mode: TtlCleanupMode::Drop,
};
let config_v2 = Ttl {
duration_nanos: 600_000_000_000,
anchor: TtlAnchor::Updated,
cleanup_mode: TtlCleanupMode::Delete,
};
catalog.set_operator_ttl(node, CommitVersion(1), Some(config_v1.clone()));
catalog.set_operator_ttl(node, CommitVersion(2), Some(config_v2.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(1)), Some(config_v1));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(2)), Some(config_v2.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(10)), Some(config_v2));
}
#[test]
fn test_operator_ttl_deletion() {
let catalog = CatalogCache::new();
let node = FlowNodeId(99);
let config = Ttl {
duration_nanos: 300_000_000_000,
anchor: TtlAnchor::Created,
cleanup_mode: TtlCleanupMode::Drop,
};
catalog.set_operator_ttl(node, CommitVersion(1), Some(config.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(1)), Some(config.clone()));
catalog.set_operator_ttl(node, CommitVersion(2), None);
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(2)), None);
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(1)), Some(config));
}
#[test]
fn test_operator_ttl_versioning() {
let catalog = CatalogCache::new();
let node = FlowNodeId(100);
let config_v1 = Ttl {
duration_nanos: 60_000_000_000,
anchor: TtlAnchor::Created,
cleanup_mode: TtlCleanupMode::Drop,
};
let config_v2 = Ttl {
duration_nanos: 300_000_000_000,
anchor: TtlAnchor::Updated,
cleanup_mode: TtlCleanupMode::Delete,
};
let config_v3 = Ttl {
duration_nanos: 86_400_000_000_000,
anchor: TtlAnchor::Created,
cleanup_mode: TtlCleanupMode::Drop,
};
catalog.set_operator_ttl(node, CommitVersion(10), Some(config_v1.clone()));
catalog.set_operator_ttl(node, CommitVersion(20), Some(config_v2.clone()));
catalog.set_operator_ttl(node, CommitVersion(30), Some(config_v3.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(5)), None);
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(10)), Some(config_v1.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(15)), Some(config_v1));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(20)), Some(config_v2.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(25)), Some(config_v2));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(30)), Some(config_v3.clone()));
assert_eq!(catalog.find_operator_ttl_at(node, CommitVersion(100)), Some(config_v3));
}
#[test]
fn test_operator_ttl_per_node_isolation() {
let catalog = CatalogCache::new();
let node_a = FlowNodeId(1);
let node_b = FlowNodeId(2);
let cfg_a = Ttl {
duration_nanos: 5_000,
anchor: TtlAnchor::Created,
cleanup_mode: TtlCleanupMode::Drop,
};
let cfg_b = Ttl {
duration_nanos: 3_600_000_000_000,
anchor: TtlAnchor::Updated,
cleanup_mode: TtlCleanupMode::Drop,
};
catalog.set_operator_ttl(node_a, CommitVersion(1), Some(cfg_a.clone()));
catalog.set_operator_ttl(node_b, CommitVersion(1), Some(cfg_b.clone()));
assert_eq!(catalog.find_operator_ttl(node_a), Some(cfg_a));
assert_eq!(catalog.find_operator_ttl(node_b), Some(cfg_b));
catalog.set_operator_ttl(node_a, CommitVersion(2), None);
assert_eq!(catalog.find_operator_ttl(node_a), None);
assert!(catalog.find_operator_ttl(node_b).is_some());
}
}