reifydb_catalog/materialized/
operator_retention_policy.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{CommitVersion, interface::FlowNodeId, retention::RetentionPolicy};
5
6use crate::materialized::{MaterializedCatalog, MultiVersionRetentionPolicy};
7
8impl MaterializedCatalog {
9	/// Find a retention policy for an operator at a specific version
10	pub fn find_operator_retention_policy(
11		&self,
12		operator: FlowNodeId,
13		version: CommitVersion,
14	) -> Option<RetentionPolicy> {
15		self.operator_retention_policies.get(&operator).and_then(|entry| {
16			let multi = entry.value();
17			multi.get(version)
18		})
19	}
20
21	/// Set a retention policy for an operator at a specific version
22	pub fn set_operator_retention_policy(
23		&self,
24		operator: FlowNodeId,
25		version: CommitVersion,
26		policy: Option<RetentionPolicy>,
27	) {
28		let multi =
29			self.operator_retention_policies.get_or_insert_with(operator, MultiVersionRetentionPolicy::new);
30
31		if let Some(new_policy) = policy {
32			multi.value().insert(version, new_policy);
33		} else {
34			multi.value().remove(version);
35		}
36	}
37}
38
39#[cfg(test)]
40mod tests {
41	use reifydb_core::retention::{CleanupMode, RetentionPolicy};
42
43	use super::*;
44
45	#[test]
46	fn test_set_and_find_operator_retention_policy() {
47		let catalog = MaterializedCatalog::new();
48		let operator = FlowNodeId(100);
49		let policy = RetentionPolicy::KeepVersions {
50			count: 5,
51			cleanup_mode: CleanupMode::Drop,
52		};
53
54		// Set policy at version 1
55		catalog.set_operator_retention_policy(operator, CommitVersion(1), Some(policy.clone()));
56
57		// Find policy at version 1
58		let found = catalog.find_operator_retention_policy(operator, CommitVersion(1));
59		assert_eq!(found, Some(policy.clone()));
60
61		// Find policy at later version (should return same policy)
62		let found = catalog.find_operator_retention_policy(operator, CommitVersion(5));
63		assert_eq!(found, Some(policy));
64
65		// Policy shouldn't exist at version 0
66		let found = catalog.find_operator_retention_policy(operator, CommitVersion(0));
67		assert_eq!(found, None);
68	}
69
70	#[test]
71	fn test_operator_retention_policy_update() {
72		let catalog = MaterializedCatalog::new();
73		let operator = FlowNodeId(42);
74
75		// Set initial policy
76		let policy_v1 = RetentionPolicy::KeepForever;
77		catalog.set_operator_retention_policy(operator, CommitVersion(1), Some(policy_v1.clone()));
78
79		// Verify initial state
80		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(1)), Some(policy_v1.clone()));
81
82		// Update policy
83		let policy_v2 = RetentionPolicy::KeepVersions {
84			count: 3,
85			cleanup_mode: CleanupMode::Delete,
86		};
87		catalog.set_operator_retention_policy(operator, CommitVersion(2), Some(policy_v2.clone()));
88
89		// Historical query at version 1 should still show old policy
90		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(1)), Some(policy_v1));
91
92		// Current version should show new policy
93		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(2)), Some(policy_v2.clone()));
94		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(10)), Some(policy_v2));
95	}
96
97	#[test]
98	fn test_operator_retention_policy_deletion() {
99		let catalog = MaterializedCatalog::new();
100		let operator = FlowNodeId(999);
101
102		// Create and set policy
103		let policy = RetentionPolicy::KeepVersions {
104			count: 100,
105			cleanup_mode: CleanupMode::Drop,
106		};
107		catalog.set_operator_retention_policy(operator, CommitVersion(1), Some(policy.clone()));
108
109		// Verify it exists
110		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(1)), Some(policy.clone()));
111
112		// Delete the policy
113		catalog.set_operator_retention_policy(operator, CommitVersion(2), None);
114
115		// Should not exist at version 2
116		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(2)), None);
117
118		// Should still exist at version 1 (historical)
119		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(1)), Some(policy));
120	}
121
122	#[test]
123	fn test_operator_retention_policy_versioning() {
124		let catalog = MaterializedCatalog::new();
125		let operator = FlowNodeId(777);
126
127		// Create multiple versions
128		let policy_v1 = RetentionPolicy::KeepForever;
129		let policy_v2 = RetentionPolicy::KeepVersions {
130			count: 2,
131			cleanup_mode: CleanupMode::Delete,
132		};
133		let policy_v3 = RetentionPolicy::KeepVersions {
134			count: 50,
135			cleanup_mode: CleanupMode::Drop,
136		};
137
138		// Set at different versions
139		catalog.set_operator_retention_policy(operator, CommitVersion(10), Some(policy_v1.clone()));
140		catalog.set_operator_retention_policy(operator, CommitVersion(20), Some(policy_v2.clone()));
141		catalog.set_operator_retention_policy(operator, CommitVersion(30), Some(policy_v3.clone()));
142
143		// Query at different versions
144		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(5)), None);
145		assert_eq!(
146			catalog.find_operator_retention_policy(operator, CommitVersion(10)),
147			Some(policy_v1.clone())
148		);
149		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(15)), Some(policy_v1));
150		assert_eq!(
151			catalog.find_operator_retention_policy(operator, CommitVersion(20)),
152			Some(policy_v2.clone())
153		);
154		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(25)), Some(policy_v2));
155		assert_eq!(
156			catalog.find_operator_retention_policy(operator, CommitVersion(30)),
157			Some(policy_v3.clone())
158		);
159		assert_eq!(catalog.find_operator_retention_policy(operator, CommitVersion(100)), Some(policy_v3));
160	}
161}