Skip to main content

reifydb_core/key/
retention_policy.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_type::value::dictionary::DictionaryId;
5use serde::{Deserialize, Serialize};
6
7use super::{EncodableKey, KeyKind};
8use crate::{
9	encoded::key::{EncodedKey, EncodedKeyRange},
10	interface::catalog::{
11		flow::{FlowId, FlowNodeId},
12		id::{RingBufferId, TableId, ViewId},
13		primitive::PrimitiveId,
14		vtable::VTableId,
15	},
16	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
17};
18
19const VERSION: u8 = 1;
20
21/// Key for storing retention policy for a data primitive (table, view, ringbuffer)
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub struct PrimitiveRetentionPolicyKey {
24	pub primitive: PrimitiveId,
25}
26
27impl PrimitiveRetentionPolicyKey {
28	pub fn encoded(primitive: impl Into<PrimitiveId>) -> EncodedKey {
29		Self {
30			primitive: primitive.into(),
31		}
32		.encode()
33	}
34}
35
36impl EncodableKey for PrimitiveRetentionPolicyKey {
37	const KIND: KeyKind = KeyKind::PrimitiveRetentionPolicy;
38
39	fn encode(&self) -> EncodedKey {
40		let mut serializer = KeySerializer::with_capacity(11);
41		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
42
43		// Encode primitive_id with discriminator
44		match &self.primitive {
45			PrimitiveId::Table(id) => {
46				serializer.extend_u8(0x01).extend_u64(id.0);
47			}
48			PrimitiveId::View(id) => {
49				serializer.extend_u8(0x02).extend_u64(id.0);
50			}
51			PrimitiveId::Flow(id) => {
52				serializer.extend_u8(0x05).extend_u64(id.0);
53			}
54			PrimitiveId::TableVirtual(id) => {
55				serializer.extend_u8(0x03).extend_u64(id.0);
56			}
57			PrimitiveId::RingBuffer(id) => {
58				serializer.extend_u8(0x04).extend_u64(id.0);
59			}
60			PrimitiveId::Dictionary(id) => {
61				serializer.extend_u8(0x06).extend_u64(id.0);
62			}
63		}
64
65		serializer.to_encoded_key()
66	}
67
68	fn decode(key: &EncodedKey) -> Option<Self> {
69		let mut de = KeyDeserializer::from_bytes(key.as_slice());
70
71		let version = de.read_u8().ok()?;
72		if version != VERSION {
73			return None;
74		}
75
76		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
77		if kind != Self::KIND {
78			return None;
79		}
80
81		let discriminator = de.read_u8().ok()?;
82		let id = de.read_u64().ok()?;
83
84		let primitive_id = match discriminator {
85			0x01 => PrimitiveId::Table(TableId(id)),
86			0x02 => PrimitiveId::View(ViewId(id)),
87			0x03 => PrimitiveId::TableVirtual(VTableId(id)),
88			0x04 => PrimitiveId::RingBuffer(RingBufferId(id)),
89			0x05 => PrimitiveId::Flow(FlowId(id)),
90			0x06 => PrimitiveId::Dictionary(DictionaryId(id)),
91			_ => return None,
92		};
93
94		Some(Self {
95			primitive: primitive_id,
96		})
97	}
98}
99
100/// Key for storing retention policy for a flow operator
101#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
102pub struct OperatorRetentionPolicyKey {
103	pub operator: FlowNodeId,
104}
105
106impl OperatorRetentionPolicyKey {
107	pub fn encoded(operator: impl Into<FlowNodeId>) -> EncodedKey {
108		Self {
109			operator: operator.into(),
110		}
111		.encode()
112	}
113}
114
115impl EncodableKey for OperatorRetentionPolicyKey {
116	const KIND: KeyKind = KeyKind::OperatorRetentionPolicy;
117
118	fn encode(&self) -> EncodedKey {
119		let mut serializer = KeySerializer::with_capacity(10);
120		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.operator);
121		serializer.to_encoded_key()
122	}
123
124	fn decode(key: &EncodedKey) -> Option<Self> {
125		let mut de = KeyDeserializer::from_bytes(key.as_slice());
126
127		let version = de.read_u8().ok()?;
128		if version != VERSION {
129			return None;
130		}
131
132		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
133		if kind != Self::KIND {
134			return None;
135		}
136
137		Some(Self {
138			operator: FlowNodeId(de.read_u64().ok()?),
139		})
140	}
141}
142
143/// Range for scanning all primitive retention policies
144pub struct PrimitiveRetentionPolicyKeyRange;
145
146impl PrimitiveRetentionPolicyKeyRange {
147	pub fn full_scan() -> EncodedKeyRange {
148		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
149	}
150
151	fn start() -> EncodedKey {
152		let mut serializer = KeySerializer::with_capacity(2);
153		serializer.extend_u8(VERSION).extend_u8(PrimitiveRetentionPolicyKey::KIND as u8);
154		serializer.to_encoded_key()
155	}
156
157	fn end() -> EncodedKey {
158		let mut serializer = KeySerializer::with_capacity(2);
159		serializer.extend_u8(VERSION).extend_u8(PrimitiveRetentionPolicyKey::KIND as u8 - 1);
160		serializer.to_encoded_key()
161	}
162}
163
164/// Range for scanning all operator retention policies
165pub struct OperatorRetentionPolicyKeyRange;
166
167impl OperatorRetentionPolicyKeyRange {
168	pub fn full_scan() -> EncodedKeyRange {
169		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
170	}
171
172	fn start() -> EncodedKey {
173		let mut serializer = KeySerializer::with_capacity(2);
174		serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8);
175		serializer.to_encoded_key()
176	}
177
178	fn end() -> EncodedKey {
179		let mut serializer = KeySerializer::with_capacity(2);
180		serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8 - 1);
181		serializer.to_encoded_key()
182	}
183}
184
185#[cfg(test)]
186pub mod tests {
187	use super::*;
188
189	#[test]
190	fn test_primitive_retention_policy_key_encoding() {
191		let key = PrimitiveRetentionPolicyKey {
192			primitive: PrimitiveId::Table(TableId(42)),
193		};
194
195		let encoded = key.encode();
196		assert_eq!(encoded[0], 0xFE); // version (1 encoded as !1)
197		assert_eq!(encoded[1], 0xE8); // kind (0x17 encoded as !0x17)
198		assert_eq!(&encoded[3..11], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xD5]);
199
200		let decoded = PrimitiveRetentionPolicyKey::decode(&encoded).unwrap();
201		assert_eq!(key, decoded);
202	}
203
204	#[test]
205	fn test_operator_retention_policy_key_encoding() {
206		let key = OperatorRetentionPolicyKey {
207			operator: FlowNodeId(12345),
208		};
209
210		let encoded = key.encode();
211		assert_eq!(encoded[0], 0xFE); // version (1 encoded as !1)
212		assert_eq!(encoded[1], 0xE7); // kind (0x18 encoded as !0x18)
213		assert_eq!(&encoded[2..10], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xCF, 0xC6]);
214
215		let decoded = OperatorRetentionPolicyKey::decode(&encoded).unwrap();
216		assert_eq!(key, decoded);
217	}
218}