reifydb_catalog/store/retention_policy/
get.rs1use reifydb_core::{
5 Error,
6 interface::{FlowNodeId, QueryTransaction, SourceId},
7 retention::RetentionPolicy,
8};
9use reifydb_type::internal;
10
11use crate::CatalogStore;
12
13impl CatalogStore {
14 pub async fn get_source_retention_policy(
17 txn: &mut impl QueryTransaction,
18 source: SourceId,
19 ) -> crate::Result<RetentionPolicy> {
20 Self::find_source_retention_policy(txn, source).await?.ok_or_else(|| {
21 Error(internal!(
22 "Retention policy for source {:?} not found in catalog. This indicates a critical catalog inconsistency.",
23 source
24 ))
25 })
26 }
27
28 pub async fn get_operator_retention_policy(
31 txn: &mut impl QueryTransaction,
32 operator: FlowNodeId,
33 ) -> crate::Result<RetentionPolicy> {
34 Self::find_operator_retention_policy(txn, operator).await?.ok_or_else(|| {
35 Error(internal!(
36 "Retention policy for operator {:?} not found in catalog. This indicates a critical catalog inconsistency.",
37 operator
38 ))
39 })
40 }
41}
42
43#[cfg(test)]
44mod tests {
45 use reifydb_core::{
46 interface::{RingBufferId, ViewId},
47 retention::{CleanupMode, RetentionPolicy},
48 };
49 use reifydb_engine::test_utils::create_test_command_transaction;
50
51 use super::*;
52 use crate::store::retention_policy::create::{
53 _create_operator_retention_policy, create_source_retention_policy,
54 };
55
56 #[tokio::test]
57 async fn test_get_source_retention_policy_exists() {
58 let mut txn = create_test_command_transaction().await;
59 let source = SourceId::View(ViewId(100));
60
61 let policy = RetentionPolicy::KeepForever;
62
63 create_source_retention_policy(&mut txn, source, &policy).await.unwrap();
64
65 let retrieved = CatalogStore::get_source_retention_policy(&mut txn, source).await.unwrap();
66 assert_eq!(retrieved, policy);
67 }
68
69 #[tokio::test]
70 async fn test_get_source_retention_policy_not_exists() {
71 let mut txn = create_test_command_transaction().await;
72 let source = SourceId::RingBuffer(RingBufferId(9999));
73
74 let err = CatalogStore::get_source_retention_policy(&mut txn, source).await.unwrap_err();
75
76 assert_eq!(err.code, "INTERNAL_ERROR");
77 assert!(err.message.contains("Retention policy"));
78 assert!(err.message.contains("not found in catalog"));
79 }
80
81 #[tokio::test]
82 async fn test_get_operator_retention_policy_exists() {
83 let mut txn = create_test_command_transaction().await;
84 let operator = FlowNodeId(777);
85
86 let policy = RetentionPolicy::KeepVersions {
87 count: 3,
88 cleanup_mode: CleanupMode::Delete,
89 };
90
91 _create_operator_retention_policy(&mut txn, operator, &policy).await.unwrap();
92
93 let retrieved = CatalogStore::get_operator_retention_policy(&mut txn, operator).await.unwrap();
94 assert_eq!(retrieved, policy);
95 }
96
97 #[tokio::test]
98 async fn test_get_operator_retention_policy_not_exists() {
99 let mut txn = create_test_command_transaction().await;
100 let operator = FlowNodeId(9999);
101
102 let err = CatalogStore::get_operator_retention_policy(&mut txn, operator).await.unwrap_err();
103
104 assert_eq!(err.code, "INTERNAL_ERROR");
105 assert!(err.message.contains("Retention policy"));
106 assert!(err.message.contains("not found in catalog"));
107 }
108}