use reifydb_core::{common::CommitVersion, interface::catalog::flow::FlowNodeId, retention::RetentionStrategy};
use crate::materialized::{MaterializedCatalog, MultiVersionRetentionStrategy};
impl MaterializedCatalog {
pub fn find_operator_retention_strategy_at(
&self,
operator: FlowNodeId,
version: CommitVersion,
) -> Option<RetentionStrategy> {
self.operator_retention_strategies.get(&operator).and_then(|entry| {
let multi = entry.value();
multi.get(version)
})
}
pub fn find_operator_retention_strategy(&self, operator: FlowNodeId) -> Option<RetentionStrategy> {
self.operator_retention_strategies.get(&operator).and_then(|entry| {
let multi = entry.value();
multi.get_latest()
})
}
pub fn set_operator_retention_strategy(
&self,
operator: FlowNodeId,
version: CommitVersion,
policy: Option<RetentionStrategy>,
) {
let multi = self
.operator_retention_strategies
.get_or_insert_with(operator, MultiVersionRetentionStrategy::new);
if let Some(new_policy) = policy {
multi.value().insert(version, new_policy);
} else {
multi.value().remove(version);
}
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::retention::{CleanupMode, RetentionStrategy};
use super::*;
#[test]
fn test_set_and_find_operator_retention_strategy() {
let catalog = MaterializedCatalog::new();
let operator = FlowNodeId(100);
let policy = RetentionStrategy::KeepVersions {
count: 5,
cleanup_mode: CleanupMode::Drop,
};
catalog.set_operator_retention_strategy(operator, CommitVersion(1), Some(policy.clone()));
let found = catalog.find_operator_retention_strategy_at(operator, CommitVersion(1));
assert_eq!(found, Some(policy.clone()));
let found = catalog.find_operator_retention_strategy_at(operator, CommitVersion(5));
assert_eq!(found, Some(policy));
let found = catalog.find_operator_retention_strategy_at(operator, CommitVersion(0));
assert_eq!(found, None);
}
#[test]
fn test_operator_retention_strategy_update() {
let catalog = MaterializedCatalog::new();
let operator = FlowNodeId(42);
let policy_v1 = RetentionStrategy::KeepForever;
catalog.set_operator_retention_strategy(operator, CommitVersion(1), Some(policy_v1.clone()));
assert_eq!(
catalog.find_operator_retention_strategy_at(operator, CommitVersion(1)),
Some(policy_v1.clone())
);
let policy_v2 = RetentionStrategy::KeepVersions {
count: 3,
cleanup_mode: CleanupMode::Delete,
};
catalog.set_operator_retention_strategy(operator, CommitVersion(2), Some(policy_v2.clone()));
assert_eq!(catalog.find_operator_retention_strategy_at(operator, CommitVersion(1)), Some(policy_v1));
assert_eq!(
catalog.find_operator_retention_strategy_at(operator, CommitVersion(2)),
Some(policy_v2.clone())
);
assert_eq!(catalog.find_operator_retention_strategy_at(operator, CommitVersion(10)), Some(policy_v2));
}
#[test]
fn test_operator_retention_strategy_deletion() {
let catalog = MaterializedCatalog::new();
let operator = FlowNodeId(999);
let policy = RetentionStrategy::KeepVersions {
count: 100,
cleanup_mode: CleanupMode::Drop,
};
catalog.set_operator_retention_strategy(operator, CommitVersion(1), Some(policy.clone()));
assert_eq!(
catalog.find_operator_retention_strategy_at(operator, CommitVersion(1)),
Some(policy.clone())
);
catalog.set_operator_retention_strategy(operator, CommitVersion(2), None);
assert_eq!(catalog.find_operator_retention_strategy_at(operator, CommitVersion(2)), None);
assert_eq!(catalog.find_operator_retention_strategy_at(operator, CommitVersion(1)), Some(policy));
}
#[test]
fn test_operator_retention_strategy_versioning() {
let catalog = MaterializedCatalog::new();
let operator = FlowNodeId(777);
let policy_v1 = RetentionStrategy::KeepForever;
let policy_v2 = RetentionStrategy::KeepVersions {
count: 2,
cleanup_mode: CleanupMode::Delete,
};
let policy_v3 = RetentionStrategy::KeepVersions {
count: 50,
cleanup_mode: CleanupMode::Drop,
};
catalog.set_operator_retention_strategy(operator, CommitVersion(10), Some(policy_v1.clone()));
catalog.set_operator_retention_strategy(operator, CommitVersion(20), Some(policy_v2.clone()));
catalog.set_operator_retention_strategy(operator, CommitVersion(30), Some(policy_v3.clone()));
assert_eq!(catalog.find_operator_retention_strategy_at(operator, CommitVersion(5)), None);
assert_eq!(
catalog.find_operator_retention_strategy_at(operator, CommitVersion(10)),
Some(policy_v1.clone())
);
assert_eq!(catalog.find_operator_retention_strategy_at(operator, CommitVersion(15)), Some(policy_v1));
assert_eq!(
catalog.find_operator_retention_strategy_at(operator, CommitVersion(20)),
Some(policy_v2.clone())
);
assert_eq!(catalog.find_operator_retention_strategy_at(operator, CommitVersion(25)), Some(policy_v2));
assert_eq!(
catalog.find_operator_retention_strategy_at(operator, CommitVersion(30)),
Some(policy_v3.clone())
);
assert_eq!(catalog.find_operator_retention_strategy_at(operator, CommitVersion(100)), Some(policy_v3));
}
}