reifydb_catalog/store/retention_policy/
find.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{FlowNodeId, QueryTransaction, SourceId},
6	key::{OperatorRetentionPolicyKey, SourceRetentionPolicyKey},
7	retention::RetentionPolicy,
8};
9
10use super::decode_retention_policy;
11use crate::CatalogStore;
12
13impl CatalogStore {
14	/// Find a retention policy for a source (table, view, or ring buffer)
15	/// Returns None if no retention policy is set
16	pub async fn find_source_retention_policy(
17		txn: &mut impl QueryTransaction,
18		source: SourceId,
19	) -> crate::Result<Option<RetentionPolicy>> {
20		let value = txn.get(&SourceRetentionPolicyKey::encoded(source)).await?;
21		Ok(value.and_then(|v| decode_retention_policy(&v.values)))
22	}
23
24	/// Find a retention policy for an operator
25	/// Returns None if no retention policy is set
26	pub async fn find_operator_retention_policy(
27		txn: &mut impl QueryTransaction,
28		operator: FlowNodeId,
29	) -> crate::Result<Option<RetentionPolicy>> {
30		let value = txn.get(&OperatorRetentionPolicyKey::encoded(operator)).await?;
31		Ok(value.and_then(|v| decode_retention_policy(&v.values)))
32	}
33}
34
35#[cfg(test)]
36mod tests {
37	use reifydb_core::{
38		interface::TableId,
39		retention::{CleanupMode, RetentionPolicy},
40	};
41	use reifydb_engine::test_utils::create_test_command_transaction;
42
43	use super::*;
44	use crate::store::retention_policy::create::{
45		_create_operator_retention_policy, create_source_retention_policy,
46	};
47
48	#[tokio::test]
49	async fn test_find_source_retention_policy_exists() {
50		let mut txn = create_test_command_transaction().await;
51		let source = SourceId::Table(TableId(42));
52
53		let policy = RetentionPolicy::KeepVersions {
54			count: 10,
55			cleanup_mode: CleanupMode::Delete,
56		};
57
58		create_source_retention_policy(&mut txn, source, &policy).await.unwrap();
59
60		let found = CatalogStore::find_source_retention_policy(&mut txn, source).await.unwrap();
61		assert_eq!(found, Some(policy));
62	}
63
64	#[tokio::test]
65	async fn test_find_source_retention_policy_not_exists() {
66		let mut txn = create_test_command_transaction().await;
67		let source = SourceId::Table(TableId(9999));
68
69		let found = CatalogStore::find_source_retention_policy(&mut txn, source).await.unwrap();
70		assert_eq!(found, None);
71	}
72
73	#[tokio::test]
74	async fn test_find_operator_retention_policy_exists() {
75		let mut txn = create_test_command_transaction().await;
76		let operator = FlowNodeId(999);
77
78		let policy = RetentionPolicy::KeepVersions {
79			count: 5,
80			cleanup_mode: CleanupMode::Drop,
81		};
82
83		_create_operator_retention_policy(&mut txn, operator, &policy).await.unwrap();
84
85		let found = CatalogStore::find_operator_retention_policy(&mut txn, operator).await.unwrap();
86		assert_eq!(found, Some(policy));
87	}
88
89	#[tokio::test]
90	async fn test_find_operator_retention_policy_not_exists() {
91		let mut txn = create_test_command_transaction().await;
92		let operator = FlowNodeId(9999);
93
94		let found = CatalogStore::find_operator_retention_policy(&mut txn, operator).await.unwrap();
95		assert_eq!(found, None);
96	}
97}