reifydb_catalog/store/retention_policy/
get.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	Error,
6	interface::{FlowNodeId, QueryTransaction, SourceId},
7	retention::RetentionPolicy,
8};
9use reifydb_type::internal;
10
11use crate::CatalogStore;
12
13impl CatalogStore {
14	/// Get a retention policy for a source (table, view, or ring buffer)
15	/// Returns an error if no retention policy is set
16	pub async fn get_source_retention_policy(
17		txn: &mut impl QueryTransaction,
18		source: SourceId,
19	) -> crate::Result<RetentionPolicy> {
20		Self::find_source_retention_policy(txn, source).await?.ok_or_else(|| {
21			Error(internal!(
22				"Retention policy for source {:?} not found in catalog. This indicates a critical catalog inconsistency.",
23				source
24			))
25		})
26	}
27
28	/// Get a retention policy for an operator (flow node)
29	/// Returns an error if no retention policy is set
30	pub async fn get_operator_retention_policy(
31		txn: &mut impl QueryTransaction,
32		operator: FlowNodeId,
33	) -> crate::Result<RetentionPolicy> {
34		Self::find_operator_retention_policy(txn, operator).await?.ok_or_else(|| {
35			Error(internal!(
36				"Retention policy for operator {:?} not found in catalog. This indicates a critical catalog inconsistency.",
37				operator
38			))
39		})
40	}
41}
42
43#[cfg(test)]
44mod tests {
45	use reifydb_core::{
46		interface::{RingBufferId, ViewId},
47		retention::{CleanupMode, RetentionPolicy},
48	};
49	use reifydb_engine::test_utils::create_test_command_transaction;
50
51	use super::*;
52	use crate::store::retention_policy::create::{
53		_create_operator_retention_policy, create_source_retention_policy,
54	};
55
56	#[tokio::test]
57	async fn test_get_source_retention_policy_exists() {
58		let mut txn = create_test_command_transaction().await;
59		let source = SourceId::View(ViewId(100));
60
61		let policy = RetentionPolicy::KeepForever;
62
63		create_source_retention_policy(&mut txn, source, &policy).await.unwrap();
64
65		let retrieved = CatalogStore::get_source_retention_policy(&mut txn, source).await.unwrap();
66		assert_eq!(retrieved, policy);
67	}
68
69	#[tokio::test]
70	async fn test_get_source_retention_policy_not_exists() {
71		let mut txn = create_test_command_transaction().await;
72		let source = SourceId::RingBuffer(RingBufferId(9999));
73
74		let err = CatalogStore::get_source_retention_policy(&mut txn, source).await.unwrap_err();
75
76		assert_eq!(err.code, "INTERNAL_ERROR");
77		assert!(err.message.contains("Retention policy"));
78		assert!(err.message.contains("not found in catalog"));
79	}
80
81	#[tokio::test]
82	async fn test_get_operator_retention_policy_exists() {
83		let mut txn = create_test_command_transaction().await;
84		let operator = FlowNodeId(777);
85
86		let policy = RetentionPolicy::KeepVersions {
87			count: 3,
88			cleanup_mode: CleanupMode::Delete,
89		};
90
91		_create_operator_retention_policy(&mut txn, operator, &policy).await.unwrap();
92
93		let retrieved = CatalogStore::get_operator_retention_policy(&mut txn, operator).await.unwrap();
94		assert_eq!(retrieved, policy);
95	}
96
97	#[tokio::test]
98	async fn test_get_operator_retention_policy_not_exists() {
99		let mut txn = create_test_command_transaction().await;
100		let operator = FlowNodeId(9999);
101
102		let err = CatalogStore::get_operator_retention_policy(&mut txn, operator).await.unwrap_err();
103
104		assert_eq!(err.code, "INTERNAL_ERROR");
105		assert!(err.message.contains("Retention policy"));
106		assert!(err.message.contains("not found in catalog"));
107	}
108}