reifydb_core/key/
retention_policy.rs

1use serde::{Deserialize, Serialize};
2
3use super::{EncodableKey, KeyKind};
4use crate::{
5	EncodedKey, EncodedKeyRange,
6	interface::{DictionaryId, FlowId, FlowNodeId, RingBufferId, SourceId, TableId, TableVirtualId, ViewId},
7	util::encoding::keycode::{KeyDeserializer, KeySerializer},
8};
9
10const VERSION: u8 = 1;
11
12/// Key for storing retention policy for a data source (table, view, ring_buffer)
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct SourceRetentionPolicyKey {
15	pub source: SourceId,
16}
17
18impl EncodableKey for SourceRetentionPolicyKey {
19	const KIND: KeyKind = KeyKind::SourceRetentionPolicy;
20
21	fn encode(&self) -> EncodedKey {
22		let mut serializer = KeySerializer::with_capacity(11);
23		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
24
25		// Encode source_id with discriminator
26		match &self.source {
27			SourceId::Table(id) => {
28				serializer.extend_u8(0x01).extend_u64(id.0);
29			}
30			SourceId::View(id) => {
31				serializer.extend_u8(0x02).extend_u64(id.0);
32			}
33			SourceId::Flow(id) => {
34				serializer.extend_u8(0x05).extend_u64(id.0);
35			}
36			SourceId::TableVirtual(id) => {
37				serializer.extend_u8(0x03).extend_u64(id.0);
38			}
39			SourceId::RingBuffer(id) => {
40				serializer.extend_u8(0x04).extend_u64(id.0);
41			}
42			SourceId::Dictionary(id) => {
43				serializer.extend_u8(0x06).extend_u64(id.0);
44			}
45		}
46
47		serializer.to_encoded_key()
48	}
49
50	fn decode(key: &EncodedKey) -> Option<Self> {
51		let mut de = KeyDeserializer::from_bytes(key.as_slice());
52
53		let version = de.read_u8().ok()?;
54		if version != VERSION {
55			return None;
56		}
57
58		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
59		if kind != Self::KIND {
60			return None;
61		}
62
63		let discriminator = de.read_u8().ok()?;
64		let id = de.read_u64().ok()?;
65
66		let source_id = match discriminator {
67			0x01 => SourceId::Table(TableId(id)),
68			0x02 => SourceId::View(ViewId(id)),
69			0x03 => SourceId::TableVirtual(TableVirtualId(id)),
70			0x04 => SourceId::RingBuffer(RingBufferId(id)),
71			0x05 => SourceId::Flow(FlowId(id)),
72			0x06 => SourceId::Dictionary(DictionaryId(id)),
73			_ => return None,
74		};
75
76		Some(Self {
77			source: source_id,
78		})
79	}
80}
81
82/// Key for storing retention policy for a flow operator
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84pub struct OperatorRetentionPolicyKey {
85	pub operator: FlowNodeId,
86}
87
88impl EncodableKey for OperatorRetentionPolicyKey {
89	const KIND: KeyKind = KeyKind::OperatorRetentionPolicy;
90
91	fn encode(&self) -> EncodedKey {
92		let mut serializer = KeySerializer::with_capacity(10);
93		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.operator);
94		serializer.to_encoded_key()
95	}
96
97	fn decode(key: &EncodedKey) -> Option<Self> {
98		let mut de = KeyDeserializer::from_bytes(key.as_slice());
99
100		let version = de.read_u8().ok()?;
101		if version != VERSION {
102			return None;
103		}
104
105		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
106		if kind != Self::KIND {
107			return None;
108		}
109
110		Some(Self {
111			operator: FlowNodeId(de.read_u64().ok()?),
112		})
113	}
114}
115
116/// Range for scanning all source retention policies
117pub struct SourceRetentionPolicyKeyRange;
118
119impl SourceRetentionPolicyKeyRange {
120	pub fn full_scan() -> EncodedKeyRange {
121		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
122	}
123
124	fn start() -> EncodedKey {
125		let mut serializer = KeySerializer::with_capacity(2);
126		serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8);
127		serializer.to_encoded_key()
128	}
129
130	fn end() -> EncodedKey {
131		let mut serializer = KeySerializer::with_capacity(2);
132		serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8 - 1);
133		serializer.to_encoded_key()
134	}
135}
136
137/// Range for scanning all operator retention policies
138pub struct OperatorRetentionPolicyKeyRange;
139
140impl OperatorRetentionPolicyKeyRange {
141	pub fn full_scan() -> EncodedKeyRange {
142		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
143	}
144
145	fn start() -> EncodedKey {
146		let mut serializer = KeySerializer::with_capacity(2);
147		serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8);
148		serializer.to_encoded_key()
149	}
150
151	fn end() -> EncodedKey {
152		let mut serializer = KeySerializer::with_capacity(2);
153		serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8 - 1);
154		serializer.to_encoded_key()
155	}
156}
157
158#[cfg(test)]
159mod tests {
160	use super::*;
161
162	#[test]
163	fn test_source_retention_policy_key_encoding() {
164		let key = SourceRetentionPolicyKey {
165			source: SourceId::Table(TableId(42)),
166		};
167
168		let encoded = key.encode();
169		assert_eq!(encoded[0], 0xFE); // version (1 encoded as !1)
170		assert_eq!(encoded[1], 0xE8); // kind (0x17 encoded as !0x17)
171		assert_eq!(&encoded[3..11], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xD5]);
172
173		let decoded = SourceRetentionPolicyKey::decode(&encoded).unwrap();
174		assert_eq!(key, decoded);
175	}
176
177	#[test]
178	fn test_operator_retention_policy_key_encoding() {
179		let key = OperatorRetentionPolicyKey {
180			operator: FlowNodeId(12345),
181		};
182
183		let encoded = key.encode();
184		assert_eq!(encoded[0], 0xFE); // version (1 encoded as !1)
185		assert_eq!(encoded[1], 0xE7); // kind (0x18 encoded as !0x18)
186		assert_eq!(&encoded[2..10], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xCF, 0xC6]);
187
188		let decoded = OperatorRetentionPolicyKey::decode(&encoded).unwrap();
189		assert_eq!(key, decoded);
190	}
191}