reifydb_core/key/
retention_policy.rs

1use serde::{Deserialize, Serialize};
2
3use super::{EncodableKey, KeyKind};
4use crate::{
5	EncodedKey, EncodedKeyRange,
6	interface::{FlowId, FlowNodeId, RingBufferId, SourceId, TableId, TableVirtualId, ViewId},
7	util::encoding::keycode::{KeyDeserializer, KeySerializer},
8};
9
10const VERSION: u8 = 1;
11
12/// Key for storing retention policy for a data source (table, view, ring_buffer)
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct SourceRetentionPolicyKey {
15	pub source: SourceId,
16}
17
18impl EncodableKey for SourceRetentionPolicyKey {
19	const KIND: KeyKind = KeyKind::SourceRetentionPolicy;
20
21	fn encode(&self) -> EncodedKey {
22		let mut serializer = KeySerializer::with_capacity(11);
23		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
24
25		// Encode source_id with discriminator
26		match &self.source {
27			SourceId::Table(id) => {
28				serializer.extend_u8(0x01).extend_u64(id.0);
29			}
30			SourceId::View(id) => {
31				serializer.extend_u8(0x02).extend_u64(id.0);
32			}
33			SourceId::Flow(id) => {
34				serializer.extend_u8(0x05).extend_u64(id.0);
35			}
36			SourceId::TableVirtual(id) => {
37				serializer.extend_u8(0x03).extend_u64(id.0);
38			}
39			SourceId::RingBuffer(id) => {
40				serializer.extend_u8(0x04).extend_u64(id.0);
41			}
42		}
43
44		serializer.to_encoded_key()
45	}
46
47	fn decode(key: &EncodedKey) -> Option<Self> {
48		let mut de = KeyDeserializer::from_bytes(key.as_slice());
49
50		let version = de.read_u8().ok()?;
51		if version != VERSION {
52			return None;
53		}
54
55		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
56		if kind != Self::KIND {
57			return None;
58		}
59
60		let discriminator = de.read_u8().ok()?;
61		let id = de.read_u64().ok()?;
62
63		let source_id = match discriminator {
64			0x01 => SourceId::Table(TableId(id)),
65			0x02 => SourceId::View(ViewId(id)),
66			0x03 => SourceId::TableVirtual(TableVirtualId(id)),
67			0x04 => SourceId::RingBuffer(RingBufferId(id)),
68			0x05 => SourceId::Flow(FlowId(id)),
69			_ => return None,
70		};
71
72		Some(Self {
73			source: source_id,
74		})
75	}
76}
77
78/// Key for storing retention policy for a flow operator
79#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
80pub struct OperatorRetentionPolicyKey {
81	pub operator: FlowNodeId,
82}
83
84impl EncodableKey for OperatorRetentionPolicyKey {
85	const KIND: KeyKind = KeyKind::OperatorRetentionPolicy;
86
87	fn encode(&self) -> EncodedKey {
88		let mut serializer = KeySerializer::with_capacity(10);
89		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.operator);
90		serializer.to_encoded_key()
91	}
92
93	fn decode(key: &EncodedKey) -> Option<Self> {
94		let mut de = KeyDeserializer::from_bytes(key.as_slice());
95
96		let version = de.read_u8().ok()?;
97		if version != VERSION {
98			return None;
99		}
100
101		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
102		if kind != Self::KIND {
103			return None;
104		}
105
106		Some(Self {
107			operator: FlowNodeId(de.read_u64().ok()?),
108		})
109	}
110}
111
112/// Range for scanning all source retention policies
113pub struct SourceRetentionPolicyKeyRange;
114
115impl SourceRetentionPolicyKeyRange {
116	pub fn full_scan() -> EncodedKeyRange {
117		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
118	}
119
120	fn start() -> EncodedKey {
121		let mut serializer = KeySerializer::with_capacity(2);
122		serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8);
123		serializer.to_encoded_key()
124	}
125
126	fn end() -> EncodedKey {
127		let mut serializer = KeySerializer::with_capacity(2);
128		serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8 - 1);
129		serializer.to_encoded_key()
130	}
131}
132
133/// Range for scanning all operator retention policies
134pub struct OperatorRetentionPolicyKeyRange;
135
136impl OperatorRetentionPolicyKeyRange {
137	pub fn full_scan() -> EncodedKeyRange {
138		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
139	}
140
141	fn start() -> EncodedKey {
142		let mut serializer = KeySerializer::with_capacity(2);
143		serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8);
144		serializer.to_encoded_key()
145	}
146
147	fn end() -> EncodedKey {
148		let mut serializer = KeySerializer::with_capacity(2);
149		serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8 - 1);
150		serializer.to_encoded_key()
151	}
152}
153
154#[cfg(test)]
155mod tests {
156	use super::*;
157
158	#[test]
159	fn test_source_retention_policy_key_encoding() {
160		let key = SourceRetentionPolicyKey {
161			source: SourceId::Table(TableId(42)),
162		};
163
164		let encoded = key.encode();
165		assert_eq!(encoded[0], 0xFE); // version (1 encoded as !1)
166		assert_eq!(encoded[1], 0xE8); // kind (0x17 encoded as !0x17)
167		assert_eq!(&encoded[3..11], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xD5]);
168
169		let decoded = SourceRetentionPolicyKey::decode(&encoded).unwrap();
170		assert_eq!(key, decoded);
171	}
172
173	#[test]
174	fn test_operator_retention_policy_key_encoding() {
175		let key = OperatorRetentionPolicyKey {
176			operator: FlowNodeId(12345),
177		};
178
179		let encoded = key.encode();
180		assert_eq!(encoded[0], 0xFE); // version (1 encoded as !1)
181		assert_eq!(encoded[1], 0xE7); // kind (0x18 encoded as !0x18)
182		assert_eq!(&encoded[2..10], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xCF, 0xC6]);
183
184		let decoded = OperatorRetentionPolicyKey::decode(&encoded).unwrap();
185		assert_eq!(key, decoded);
186	}
187}