reifydb_catalog/materialized/
source_retention_policy.rs1use reifydb_core::{CommitVersion, interface::SourceId, retention::RetentionPolicy};
5
6use crate::materialized::{MaterializedCatalog, MultiVersionRetentionPolicy};
7
8impl MaterializedCatalog {
9 pub fn find_source_retention_policy(
11 &self,
12 source: SourceId,
13 version: CommitVersion,
14 ) -> Option<RetentionPolicy> {
15 self.source_retention_policies.get(&source).and_then(|entry| {
16 let multi = entry.value();
17 multi.get(version)
18 })
19 }
20
21 pub fn set_source_retention_policy(
23 &self,
24 source: SourceId,
25 version: CommitVersion,
26 policy: Option<RetentionPolicy>,
27 ) {
28 let multi = self.source_retention_policies.get_or_insert_with(source, MultiVersionRetentionPolicy::new);
29
30 if let Some(new_policy) = policy {
31 multi.value().insert(version, new_policy);
32 } else {
33 multi.value().remove(version);
34 }
35 }
36}
37
38#[cfg(test)]
39mod tests {
40 use reifydb_core::{
41 interface::TableId,
42 retention::{CleanupMode, RetentionPolicy},
43 };
44
45 use super::*;
46
47 #[test]
48 fn test_set_and_find_source_retention_policy() {
49 let catalog = MaterializedCatalog::new();
50 let source = SourceId::Table(TableId(1));
51 let policy = RetentionPolicy::KeepVersions {
52 count: 10,
53 cleanup_mode: CleanupMode::Delete,
54 };
55
56 catalog.set_source_retention_policy(source, CommitVersion(1), Some(policy.clone()));
58
59 let found = catalog.find_source_retention_policy(source, CommitVersion(1));
61 assert_eq!(found, Some(policy.clone()));
62
63 let found = catalog.find_source_retention_policy(source, CommitVersion(5));
65 assert_eq!(found, Some(policy));
66
67 let found = catalog.find_source_retention_policy(source, CommitVersion(0));
69 assert_eq!(found, None);
70 }
71
72 #[test]
73 fn test_source_retention_policy_update() {
74 let catalog = MaterializedCatalog::new();
75 let source = SourceId::Table(TableId(42));
76
77 let policy_v1 = RetentionPolicy::KeepForever;
79 catalog.set_source_retention_policy(source, CommitVersion(1), Some(policy_v1.clone()));
80
81 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(1)), Some(policy_v1.clone()));
83
84 let policy_v2 = RetentionPolicy::KeepVersions {
86 count: 20,
87 cleanup_mode: CleanupMode::Drop,
88 };
89 catalog.set_source_retention_policy(source, CommitVersion(2), Some(policy_v2.clone()));
90
91 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(1)), Some(policy_v1));
93
94 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(2)), Some(policy_v2.clone()));
96 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(10)), Some(policy_v2));
97 }
98
99 #[test]
100 fn test_source_retention_policy_deletion() {
101 let catalog = MaterializedCatalog::new();
102 let source = SourceId::Table(TableId(99));
103
104 let policy = RetentionPolicy::KeepVersions {
106 count: 5,
107 cleanup_mode: CleanupMode::Delete,
108 };
109 catalog.set_source_retention_policy(source, CommitVersion(1), Some(policy.clone()));
110
111 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(1)), Some(policy.clone()));
113
114 catalog.set_source_retention_policy(source, CommitVersion(2), None);
116
117 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(2)), None);
119
120 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(1)), Some(policy));
122 }
123
124 #[test]
125 fn test_source_retention_policy_versioning() {
126 let catalog = MaterializedCatalog::new();
127 let source = SourceId::Table(TableId(100));
128
129 let policy_v1 = RetentionPolicy::KeepForever;
131 let policy_v2 = RetentionPolicy::KeepVersions {
132 count: 10,
133 cleanup_mode: CleanupMode::Delete,
134 };
135 let policy_v3 = RetentionPolicy::KeepVersions {
136 count: 100,
137 cleanup_mode: CleanupMode::Drop,
138 };
139
140 catalog.set_source_retention_policy(source, CommitVersion(10), Some(policy_v1.clone()));
142 catalog.set_source_retention_policy(source, CommitVersion(20), Some(policy_v2.clone()));
143 catalog.set_source_retention_policy(source, CommitVersion(30), Some(policy_v3.clone()));
144
145 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(5)), None);
147 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(10)), Some(policy_v1.clone()));
148 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(15)), Some(policy_v1));
149 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(20)), Some(policy_v2.clone()));
150 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(25)), Some(policy_v2));
151 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(30)), Some(policy_v3.clone()));
152 assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(100)), Some(policy_v3));
153 }
154}