reifydb_catalog/materialized/
operator_retention_policy.rs1use reifydb_core::{CommitVersion, interface::FlowNodeId, retention::RetentionPolicy};
5
6use crate::materialized::{MaterializedCatalog, MultiVersionRetentionPolicy};
7
8impl MaterializedCatalog {
9 pub fn find_operator_retention_policy(
11 &self,
12 operator: FlowNodeId,
13 version: CommitVersion,
14 ) -> Option<RetentionPolicy> {
15 self.operator_retention_policies.get(&operator).and_then(|entry| {
16 let multi = entry.value();
17 multi.get(version)
18 })
19 }
20
21 pub fn set_operator_retention_policy(
23 &self,
24 operator: FlowNodeId,
25 version: CommitVersion,
26 policy: Option<RetentionPolicy>,
27 ) {
28 let multi =
29 self.operator_retention_policies.get_or_insert_with(operator, MultiVersionRetentionPolicy::new);
30
31 if let Some(new_policy) = policy {
32 multi.value().insert(version, new_policy);
33 } else {
34 multi.value().remove(version);
35 }
36 }
37}
38
39#[cfg(test)]
40mod tests {
41 use reifydb_core::retention::{CleanupMode, RetentionPolicy};
42
43 use super::*;
44
45 #[test]
46 fn test_set_and_find_operator_retention_policy() {
47 let catalog = MaterializedCatalog::new();
48 let operator = FlowNodeId(100);
49 let policy = RetentionPolicy::KeepVersions {
50 count: 5,
51 cleanup_mode: CleanupMode::Drop,
52 };
53
54 catalog.set_operator_retention_policy(operator, CommitVersion(1), Some(policy.clone()));
56
57 let found = catalog.find_operator_retention_policy(operator, CommitVersion(1));
59 assert_eq!(found, Some(policy.clone()));
60
61 let found = catalog.find_operator_retention_policy(operator, CommitVersion(5));
63 assert_eq!(found, Some(policy));
64
65 let found = catalog.find_operator_retention_policy(operator, CommitVersion(0));
67 assert_eq!(found, None);
68 }
69
70 #[test]
71 fn test_operator_retention_policy_update() {
72 let catalog = MaterializedCatalog::new();
73 let operator = FlowNodeId(42);
74
75 let policy_v1 = RetentionPolicy::KeepForever;
77 catalog.set_operator_retention_policy(operator, CommitVersion(1), Some(policy_v1.clone()));
78
79 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(1)), Some(policy_v1.clone()));
81
82 let policy_v2 = RetentionPolicy::KeepVersions {
84 count: 3,
85 cleanup_mode: CleanupMode::Delete,
86 };
87 catalog.set_operator_retention_policy(operator, CommitVersion(2), Some(policy_v2.clone()));
88
89 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(1)), Some(policy_v1));
91
92 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(2)), Some(policy_v2.clone()));
94 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(10)), Some(policy_v2));
95 }
96
97 #[test]
98 fn test_operator_retention_policy_deletion() {
99 let catalog = MaterializedCatalog::new();
100 let operator = FlowNodeId(999);
101
102 let policy = RetentionPolicy::KeepVersions {
104 count: 100,
105 cleanup_mode: CleanupMode::Drop,
106 };
107 catalog.set_operator_retention_policy(operator, CommitVersion(1), Some(policy.clone()));
108
109 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(1)), Some(policy.clone()));
111
112 catalog.set_operator_retention_policy(operator, CommitVersion(2), None);
114
115 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(2)), None);
117
118 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(1)), Some(policy));
120 }
121
122 #[test]
123 fn test_operator_retention_policy_versioning() {
124 let catalog = MaterializedCatalog::new();
125 let operator = FlowNodeId(777);
126
127 let policy_v1 = RetentionPolicy::KeepForever;
129 let policy_v2 = RetentionPolicy::KeepVersions {
130 count: 2,
131 cleanup_mode: CleanupMode::Delete,
132 };
133 let policy_v3 = RetentionPolicy::KeepVersions {
134 count: 50,
135 cleanup_mode: CleanupMode::Drop,
136 };
137
138 catalog.set_operator_retention_policy(operator, CommitVersion(10), Some(policy_v1.clone()));
140 catalog.set_operator_retention_policy(operator, CommitVersion(20), Some(policy_v2.clone()));
141 catalog.set_operator_retention_policy(operator, CommitVersion(30), Some(policy_v3.clone()));
142
143 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(5)), None);
145 assert_eq!(
146 catalog.find_operator_retention_policy(operator, CommitVersion(10)),
147 Some(policy_v1.clone())
148 );
149 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(15)), Some(policy_v1));
150 assert_eq!(
151 catalog.find_operator_retention_policy(operator, CommitVersion(20)),
152 Some(policy_v2.clone())
153 );
154 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(25)), Some(policy_v2));
155 assert_eq!(
156 catalog.find_operator_retention_policy(operator, CommitVersion(30)),
157 Some(policy_v3.clone())
158 );
159 assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(100)), Some(policy_v3));
160 }
161}