reifydb_catalog/materialized/
source_retention_policy.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{CommitVersion, interface::SourceId, retention::RetentionPolicy};
5
6use crate::materialized::{MaterializedCatalog, MultiVersionRetentionPolicy};
7
8impl MaterializedCatalog {
9	/// Find a retention policy for a source at a specific version
10	pub fn find_source_retention_policy(
11		&self,
12		source: SourceId,
13		version: CommitVersion,
14	) -> Option<RetentionPolicy> {
15		self.source_retention_policies.get(&source).and_then(|entry| {
16			let multi = entry.value();
17			multi.get(version)
18		})
19	}
20
21	/// Set a retention policy for a source at a specific version
22	pub fn set_source_retention_policy(
23		&self,
24		source: SourceId,
25		version: CommitVersion,
26		policy: Option<RetentionPolicy>,
27	) {
28		let multi = self.source_retention_policies.get_or_insert_with(source, MultiVersionRetentionPolicy::new);
29
30		if let Some(new_policy) = policy {
31			multi.value().insert(version, new_policy);
32		} else {
33			multi.value().remove(version);
34		}
35	}
36}
37
38#[cfg(test)]
39mod tests {
40	use reifydb_core::{
41		interface::TableId,
42		retention::{CleanupMode, RetentionPolicy},
43	};
44
45	use super::*;
46
47	#[test]
48	fn test_set_and_find_source_retention_policy() {
49		let catalog = MaterializedCatalog::new();
50		let source = SourceId::Table(TableId(1));
51		let policy = RetentionPolicy::KeepVersions {
52			count: 10,
53			cleanup_mode: CleanupMode::Delete,
54		};
55
56		// Set policy at version 1
57		catalog.set_source_retention_policy(source, CommitVersion(1), Some(policy.clone()));
58
59		// Find policy at version 1
60		let found = catalog.find_source_retention_policy(source, CommitVersion(1));
61		assert_eq!(found, Some(policy.clone()));
62
63		// Find policy at later version (should return same policy)
64		let found = catalog.find_source_retention_policy(source, CommitVersion(5));
65		assert_eq!(found, Some(policy));
66
67		// Policy shouldn't exist at version 0
68		let found = catalog.find_source_retention_policy(source, CommitVersion(0));
69		assert_eq!(found, None);
70	}
71
72	#[test]
73	fn test_source_retention_policy_update() {
74		let catalog = MaterializedCatalog::new();
75		let source = SourceId::Table(TableId(42));
76
77		// Set initial policy
78		let policy_v1 = RetentionPolicy::KeepForever;
79		catalog.set_source_retention_policy(source, CommitVersion(1), Some(policy_v1.clone()));
80
81		// Verify initial state
82		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(1)), Some(policy_v1.clone()));
83
84		// Update policy
85		let policy_v2 = RetentionPolicy::KeepVersions {
86			count: 20,
87			cleanup_mode: CleanupMode::Drop,
88		};
89		catalog.set_source_retention_policy(source, CommitVersion(2), Some(policy_v2.clone()));
90
91		// Historical query at version 1 should still show old policy
92		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(1)), Some(policy_v1));
93
94		// Current version should show new policy
95		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(2)), Some(policy_v2.clone()));
96		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(10)), Some(policy_v2));
97	}
98
99	#[test]
100	fn test_source_retention_policy_deletion() {
101		let catalog = MaterializedCatalog::new();
102		let source = SourceId::Table(TableId(99));
103
104		// Create and set policy
105		let policy = RetentionPolicy::KeepVersions {
106			count: 5,
107			cleanup_mode: CleanupMode::Delete,
108		};
109		catalog.set_source_retention_policy(source, CommitVersion(1), Some(policy.clone()));
110
111		// Verify it exists
112		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(1)), Some(policy.clone()));
113
114		// Delete the policy
115		catalog.set_source_retention_policy(source, CommitVersion(2), None);
116
117		// Should not exist at version 2
118		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(2)), None);
119
120		// Should still exist at version 1 (historical)
121		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(1)), Some(policy));
122	}
123
124	#[test]
125	fn test_source_retention_policy_versioning() {
126		let catalog = MaterializedCatalog::new();
127		let source = SourceId::Table(TableId(100));
128
129		// Create multiple versions
130		let policy_v1 = RetentionPolicy::KeepForever;
131		let policy_v2 = RetentionPolicy::KeepVersions {
132			count: 10,
133			cleanup_mode: CleanupMode::Delete,
134		};
135		let policy_v3 = RetentionPolicy::KeepVersions {
136			count: 100,
137			cleanup_mode: CleanupMode::Drop,
138		};
139
140		// Set at different versions
141		catalog.set_source_retention_policy(source, CommitVersion(10), Some(policy_v1.clone()));
142		catalog.set_source_retention_policy(source, CommitVersion(20), Some(policy_v2.clone()));
143		catalog.set_source_retention_policy(source, CommitVersion(30), Some(policy_v3.clone()));
144
145		// Query at different versions
146		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(5)), None);
147		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(10)), Some(policy_v1.clone()));
148		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(15)), Some(policy_v1));
149		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(20)), Some(policy_v2.clone()));
150		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(25)), Some(policy_v2));
151		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(30)), Some(policy_v3.clone()));
152		assert_eq!(catalog.find_source_retention_policy(source, CommitVersion(100)), Some(policy_v3));
153	}
154}