reifydb_catalog/store/retention_policy/
find.rs1use reifydb_core::{
5 interface::{FlowNodeId, QueryTransaction, SourceId},
6 key::{OperatorRetentionPolicyKey, SourceRetentionPolicyKey},
7 retention::RetentionPolicy,
8};
9
10use super::decode_retention_policy;
11use crate::CatalogStore;
12
13impl CatalogStore {
14 pub async fn find_source_retention_policy(
17 txn: &mut impl QueryTransaction,
18 source: SourceId,
19 ) -> crate::Result<Option<RetentionPolicy>> {
20 let value = txn.get(&SourceRetentionPolicyKey::encoded(source)).await?;
21 Ok(value.and_then(|v| decode_retention_policy(&v.values)))
22 }
23
24 pub async fn find_operator_retention_policy(
27 txn: &mut impl QueryTransaction,
28 operator: FlowNodeId,
29 ) -> crate::Result<Option<RetentionPolicy>> {
30 let value = txn.get(&OperatorRetentionPolicyKey::encoded(operator)).await?;
31 Ok(value.and_then(|v| decode_retention_policy(&v.values)))
32 }
33}
34
35#[cfg(test)]
36mod tests {
37 use reifydb_core::{
38 interface::TableId,
39 retention::{CleanupMode, RetentionPolicy},
40 };
41 use reifydb_engine::test_utils::create_test_command_transaction;
42
43 use super::*;
44 use crate::store::retention_policy::create::{
45 _create_operator_retention_policy, create_source_retention_policy,
46 };
47
48 #[tokio::test]
49 async fn test_find_source_retention_policy_exists() {
50 let mut txn = create_test_command_transaction().await;
51 let source = SourceId::Table(TableId(42));
52
53 let policy = RetentionPolicy::KeepVersions {
54 count: 10,
55 cleanup_mode: CleanupMode::Delete,
56 };
57
58 create_source_retention_policy(&mut txn, source, &policy).await.unwrap();
59
60 let found = CatalogStore::find_source_retention_policy(&mut txn, source).await.unwrap();
61 assert_eq!(found, Some(policy));
62 }
63
64 #[tokio::test]
65 async fn test_find_source_retention_policy_not_exists() {
66 let mut txn = create_test_command_transaction().await;
67 let source = SourceId::Table(TableId(9999));
68
69 let found = CatalogStore::find_source_retention_policy(&mut txn, source).await.unwrap();
70 assert_eq!(found, None);
71 }
72
73 #[tokio::test]
74 async fn test_find_operator_retention_policy_exists() {
75 let mut txn = create_test_command_transaction().await;
76 let operator = FlowNodeId(999);
77
78 let policy = RetentionPolicy::KeepVersions {
79 count: 5,
80 cleanup_mode: CleanupMode::Drop,
81 };
82
83 _create_operator_retention_policy(&mut txn, operator, &policy).await.unwrap();
84
85 let found = CatalogStore::find_operator_retention_policy(&mut txn, operator).await.unwrap();
86 assert_eq!(found, Some(policy));
87 }
88
89 #[tokio::test]
90 async fn test_find_operator_retention_policy_not_exists() {
91 let mut txn = create_test_command_transaction().await;
92 let operator = FlowNodeId(9999);
93
94 let found = CatalogStore::find_operator_retention_policy(&mut txn, operator).await.unwrap();
95 assert_eq!(found, None);
96 }
97}