reifydb_catalog/store/retention_policy/
list.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{FlowNodeId, QueryTransaction, SourceId},
6	key::{
7		EncodableKey, OperatorRetentionPolicyKey, OperatorRetentionPolicyKeyRange, SourceRetentionPolicyKey,
8		SourceRetentionPolicyKeyRange,
9	},
10	retention::RetentionPolicy,
11};
12
13use super::decode_retention_policy;
14use crate::CatalogStore;
15
16/// A source retention policy entry
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct SourceRetentionPolicyEntry {
19	pub source: SourceId,
20	pub policy: RetentionPolicy,
21}
22
23/// An operator retention policy entry
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct OperatorRetentionPolicyEntry {
26	pub operator: FlowNodeId,
27	pub policy: RetentionPolicy,
28}
29
30impl CatalogStore {
31	/// List all retention policies for sources (tables, views, ring buffers)
32	pub async fn list_source_retention_policies(
33		rx: &mut impl QueryTransaction,
34	) -> crate::Result<Vec<SourceRetentionPolicyEntry>> {
35		let mut result = Vec::new();
36
37		let batch = rx.range(SourceRetentionPolicyKeyRange::full_scan()).await?;
38
39		for entry in batch.items {
40			if let Some(key) = SourceRetentionPolicyKey::decode(&entry.key) {
41				if let Some(policy) = decode_retention_policy(&entry.values) {
42					result.push(SourceRetentionPolicyEntry {
43						source: key.source,
44						policy,
45					});
46				}
47			}
48		}
49
50		Ok(result)
51	}
52
53	/// List all retention policies for operators
54	pub async fn list_operator_retention_policies(
55		rx: &mut impl QueryTransaction,
56	) -> crate::Result<Vec<OperatorRetentionPolicyEntry>> {
57		let mut result = Vec::new();
58
59		let batch = rx.range(OperatorRetentionPolicyKeyRange::full_scan()).await?;
60
61		for entry in batch.items {
62			if let Some(key) = OperatorRetentionPolicyKey::decode(&entry.key) {
63				if let Some(policy) = decode_retention_policy(&entry.values) {
64					result.push(OperatorRetentionPolicyEntry {
65						operator: key.operator,
66						policy,
67					});
68				}
69			}
70		}
71
72		Ok(result)
73	}
74}
75
76#[cfg(test)]
77mod tests {
78	use reifydb_core::{
79		interface::{RingBufferId, TableId, ViewId},
80		retention::{CleanupMode, RetentionPolicy},
81	};
82	use reifydb_engine::test_utils::create_test_command_transaction;
83
84	use super::*;
85	use crate::store::retention_policy::create::{
86		_create_operator_retention_policy, create_source_retention_policy,
87	};
88
89	#[tokio::test]
90	async fn test_list_source_retention_policies_empty() {
91		let mut txn = create_test_command_transaction().await;
92
93		let policies = CatalogStore::list_source_retention_policies(&mut txn).await.unwrap();
94
95		assert_eq!(policies.len(), 0);
96	}
97
98	#[tokio::test]
99	async fn test_list_source_retention_policies_multiple() {
100		let mut txn = create_test_command_transaction().await;
101
102		// Create policies for different sources
103		let table_source = SourceId::Table(TableId(1));
104		let table_policy = RetentionPolicy::KeepVersions {
105			count: 10,
106			cleanup_mode: CleanupMode::Delete,
107		};
108		create_source_retention_policy(&mut txn, table_source, &table_policy).await.unwrap();
109
110		let view_source = SourceId::View(ViewId(2));
111		let view_policy = RetentionPolicy::KeepForever;
112		create_source_retention_policy(&mut txn, view_source, &view_policy).await.unwrap();
113
114		let ringbuffer_source = SourceId::RingBuffer(RingBufferId(3));
115		let ringbuffer_policy = RetentionPolicy::KeepVersions {
116			count: 50,
117			cleanup_mode: CleanupMode::Drop,
118		};
119		create_source_retention_policy(&mut txn, ringbuffer_source, &ringbuffer_policy).await.unwrap();
120
121		// List all policies
122		let policies = CatalogStore::list_source_retention_policies(&mut txn).await.unwrap();
123
124		assert_eq!(policies.len(), 3);
125
126		// Verify each policy
127		assert!(policies.iter().any(|p| p.source == table_source && p.policy == table_policy));
128		assert!(policies.iter().any(|p| p.source == view_source && p.policy == view_policy));
129		assert!(policies.iter().any(|p| p.source == ringbuffer_source && p.policy == ringbuffer_policy));
130	}
131
132	#[tokio::test]
133	async fn test_list_operator_retention_policies_empty() {
134		let mut txn = create_test_command_transaction().await;
135
136		let policies = CatalogStore::list_operator_retention_policies(&mut txn).await.unwrap();
137
138		assert_eq!(policies.len(), 0);
139	}
140
141	#[tokio::test]
142	async fn test_list_operator_retention_policies_multiple() {
143		let mut txn = create_test_command_transaction().await;
144
145		// Create policies for different operators
146		let operator1 = FlowNodeId(100);
147		let policy1 = RetentionPolicy::KeepVersions {
148			count: 5,
149			cleanup_mode: CleanupMode::Delete,
150		};
151		_create_operator_retention_policy(&mut txn, operator1, &policy1).await.unwrap();
152
153		let operator2 = FlowNodeId(200);
154		let policy2 = RetentionPolicy::KeepForever;
155		_create_operator_retention_policy(&mut txn, operator2, &policy2).await.unwrap();
156
157		let operator3 = FlowNodeId(300);
158		let policy3 = RetentionPolicy::KeepVersions {
159			count: 3,
160			cleanup_mode: CleanupMode::Drop,
161		};
162		_create_operator_retention_policy(&mut txn, operator3, &policy3).await.unwrap();
163
164		// List all policies
165		let policies = CatalogStore::list_operator_retention_policies(&mut txn).await.unwrap();
166
167		assert_eq!(policies.len(), 3);
168
169		// Verify each policy
170		assert!(policies.iter().any(|p| p.operator == operator1 && p.policy == policy1));
171		assert!(policies.iter().any(|p| p.operator == operator2 && p.policy == policy2));
172		assert!(policies.iter().any(|p| p.operator == operator3 && p.policy == policy3));
173	}
174
175	#[tokio::test]
176	async fn test_list_source_retention_policies_after_updates() {
177		let mut txn = create_test_command_transaction().await;
178
179		let source = SourceId::Table(TableId(42));
180
181		// Create initial policy
182		let policy1 = RetentionPolicy::KeepForever;
183		create_source_retention_policy(&mut txn, source, &policy1).await.unwrap();
184
185		let policies = CatalogStore::list_source_retention_policies(&mut txn).await.unwrap();
186		assert_eq!(policies.len(), 1);
187		assert_eq!(policies[0].policy, policy1);
188
189		// Update policy
190		let policy2 = RetentionPolicy::KeepVersions {
191			count: 20,
192			cleanup_mode: CleanupMode::Drop,
193		};
194		create_source_retention_policy(&mut txn, source, &policy2).await.unwrap();
195
196		// Should still have only 1 entry (updated, not added)
197		let policies = CatalogStore::list_source_retention_policies(&mut txn).await.unwrap();
198		assert_eq!(policies.len(), 1);
199		assert_eq!(policies[0].policy, policy2);
200	}
201
202	#[tokio::test]
203	async fn test_list_operator_retention_policies_after_updates() {
204		let mut txn = create_test_command_transaction().await;
205
206		let operator = FlowNodeId(999);
207
208		// Create initial policy
209		let policy1 = RetentionPolicy::KeepVersions {
210			count: 3,
211			cleanup_mode: CleanupMode::Delete,
212		};
213		_create_operator_retention_policy(&mut txn, operator, &policy1).await.unwrap();
214
215		let policies = CatalogStore::list_operator_retention_policies(&mut txn).await.unwrap();
216		assert_eq!(policies.len(), 1);
217		assert_eq!(policies[0].policy, policy1);
218
219		// Update policy
220		let policy2 = RetentionPolicy::KeepForever;
221		_create_operator_retention_policy(&mut txn, operator, &policy2).await.unwrap();
222
223		// Should still have only 1 entry (updated, not added)
224		let policies = CatalogStore::list_operator_retention_policies(&mut txn).await.unwrap();
225		assert_eq!(policies.len(), 1);
226		assert_eq!(policies[0].policy, policy2);
227	}
228}