reifydb_catalog/store/retention_policy/
list.rs1use reifydb_core::{
5 interface::{FlowNodeId, QueryTransaction, SourceId},
6 key::{
7 EncodableKey, OperatorRetentionPolicyKey, OperatorRetentionPolicyKeyRange, SourceRetentionPolicyKey,
8 SourceRetentionPolicyKeyRange,
9 },
10 retention::RetentionPolicy,
11};
12
13use super::decode_retention_policy;
14use crate::CatalogStore;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct SourceRetentionPolicyEntry {
19 pub source: SourceId,
20 pub policy: RetentionPolicy,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct OperatorRetentionPolicyEntry {
26 pub operator: FlowNodeId,
27 pub policy: RetentionPolicy,
28}
29
30impl CatalogStore {
31 pub async fn list_source_retention_policies(
33 rx: &mut impl QueryTransaction,
34 ) -> crate::Result<Vec<SourceRetentionPolicyEntry>> {
35 let mut result = Vec::new();
36
37 let batch = rx.range(SourceRetentionPolicyKeyRange::full_scan()).await?;
38
39 for entry in batch.items {
40 if let Some(key) = SourceRetentionPolicyKey::decode(&entry.key) {
41 if let Some(policy) = decode_retention_policy(&entry.values) {
42 result.push(SourceRetentionPolicyEntry {
43 source: key.source,
44 policy,
45 });
46 }
47 }
48 }
49
50 Ok(result)
51 }
52
53 pub async fn list_operator_retention_policies(
55 rx: &mut impl QueryTransaction,
56 ) -> crate::Result<Vec<OperatorRetentionPolicyEntry>> {
57 let mut result = Vec::new();
58
59 let batch = rx.range(OperatorRetentionPolicyKeyRange::full_scan()).await?;
60
61 for entry in batch.items {
62 if let Some(key) = OperatorRetentionPolicyKey::decode(&entry.key) {
63 if let Some(policy) = decode_retention_policy(&entry.values) {
64 result.push(OperatorRetentionPolicyEntry {
65 operator: key.operator,
66 policy,
67 });
68 }
69 }
70 }
71
72 Ok(result)
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use reifydb_core::{
79 interface::{RingBufferId, TableId, ViewId},
80 retention::{CleanupMode, RetentionPolicy},
81 };
82 use reifydb_engine::test_utils::create_test_command_transaction;
83
84 use super::*;
85 use crate::store::retention_policy::create::{
86 _create_operator_retention_policy, create_source_retention_policy,
87 };
88
89 #[tokio::test]
90 async fn test_list_source_retention_policies_empty() {
91 let mut txn = create_test_command_transaction().await;
92
93 let policies = CatalogStore::list_source_retention_policies(&mut txn).await.unwrap();
94
95 assert_eq!(policies.len(), 0);
96 }
97
98 #[tokio::test]
99 async fn test_list_source_retention_policies_multiple() {
100 let mut txn = create_test_command_transaction().await;
101
102 let table_source = SourceId::Table(TableId(1));
104 let table_policy = RetentionPolicy::KeepVersions {
105 count: 10,
106 cleanup_mode: CleanupMode::Delete,
107 };
108 create_source_retention_policy(&mut txn, table_source, &table_policy).await.unwrap();
109
110 let view_source = SourceId::View(ViewId(2));
111 let view_policy = RetentionPolicy::KeepForever;
112 create_source_retention_policy(&mut txn, view_source, &view_policy).await.unwrap();
113
114 let ringbuffer_source = SourceId::RingBuffer(RingBufferId(3));
115 let ringbuffer_policy = RetentionPolicy::KeepVersions {
116 count: 50,
117 cleanup_mode: CleanupMode::Drop,
118 };
119 create_source_retention_policy(&mut txn, ringbuffer_source, &ringbuffer_policy).await.unwrap();
120
121 let policies = CatalogStore::list_source_retention_policies(&mut txn).await.unwrap();
123
124 assert_eq!(policies.len(), 3);
125
126 assert!(policies.iter().any(|p| p.source == table_source && p.policy == table_policy));
128 assert!(policies.iter().any(|p| p.source == view_source && p.policy == view_policy));
129 assert!(policies.iter().any(|p| p.source == ringbuffer_source && p.policy == ringbuffer_policy));
130 }
131
132 #[tokio::test]
133 async fn test_list_operator_retention_policies_empty() {
134 let mut txn = create_test_command_transaction().await;
135
136 let policies = CatalogStore::list_operator_retention_policies(&mut txn).await.unwrap();
137
138 assert_eq!(policies.len(), 0);
139 }
140
141 #[tokio::test]
142 async fn test_list_operator_retention_policies_multiple() {
143 let mut txn = create_test_command_transaction().await;
144
145 let operator1 = FlowNodeId(100);
147 let policy1 = RetentionPolicy::KeepVersions {
148 count: 5,
149 cleanup_mode: CleanupMode::Delete,
150 };
151 _create_operator_retention_policy(&mut txn, operator1, &policy1).await.unwrap();
152
153 let operator2 = FlowNodeId(200);
154 let policy2 = RetentionPolicy::KeepForever;
155 _create_operator_retention_policy(&mut txn, operator2, &policy2).await.unwrap();
156
157 let operator3 = FlowNodeId(300);
158 let policy3 = RetentionPolicy::KeepVersions {
159 count: 3,
160 cleanup_mode: CleanupMode::Drop,
161 };
162 _create_operator_retention_policy(&mut txn, operator3, &policy3).await.unwrap();
163
164 let policies = CatalogStore::list_operator_retention_policies(&mut txn).await.unwrap();
166
167 assert_eq!(policies.len(), 3);
168
169 assert!(policies.iter().any(|p| p.operator == operator1 && p.policy == policy1));
171 assert!(policies.iter().any(|p| p.operator == operator2 && p.policy == policy2));
172 assert!(policies.iter().any(|p| p.operator == operator3 && p.policy == policy3));
173 }
174
175 #[tokio::test]
176 async fn test_list_source_retention_policies_after_updates() {
177 let mut txn = create_test_command_transaction().await;
178
179 let source = SourceId::Table(TableId(42));
180
181 let policy1 = RetentionPolicy::KeepForever;
183 create_source_retention_policy(&mut txn, source, &policy1).await.unwrap();
184
185 let policies = CatalogStore::list_source_retention_policies(&mut txn).await.unwrap();
186 assert_eq!(policies.len(), 1);
187 assert_eq!(policies[0].policy, policy1);
188
189 let policy2 = RetentionPolicy::KeepVersions {
191 count: 20,
192 cleanup_mode: CleanupMode::Drop,
193 };
194 create_source_retention_policy(&mut txn, source, &policy2).await.unwrap();
195
196 let policies = CatalogStore::list_source_retention_policies(&mut txn).await.unwrap();
198 assert_eq!(policies.len(), 1);
199 assert_eq!(policies[0].policy, policy2);
200 }
201
202 #[tokio::test]
203 async fn test_list_operator_retention_policies_after_updates() {
204 let mut txn = create_test_command_transaction().await;
205
206 let operator = FlowNodeId(999);
207
208 let policy1 = RetentionPolicy::KeepVersions {
210 count: 3,
211 cleanup_mode: CleanupMode::Delete,
212 };
213 _create_operator_retention_policy(&mut txn, operator, &policy1).await.unwrap();
214
215 let policies = CatalogStore::list_operator_retention_policies(&mut txn).await.unwrap();
216 assert_eq!(policies.len(), 1);
217 assert_eq!(policies[0].policy, policy1);
218
219 let policy2 = RetentionPolicy::KeepForever;
221 _create_operator_retention_policy(&mut txn, operator, &policy2).await.unwrap();
222
223 let policies = CatalogStore::list_operator_retention_policies(&mut txn).await.unwrap();
225 assert_eq!(policies.len(), 1);
226 assert_eq!(policies[0].policy, policy2);
227 }
228}