reifydb_core/key/
retention_policy.rs

1use serde::{Deserialize, Serialize};
2
3use super::{EncodableKey, KeyKind};
4use crate::{
5	EncodedKey, EncodedKeyRange,
6	interface::{DictionaryId, FlowId, FlowNodeId, RingBufferId, SourceId, TableId, TableVirtualId, ViewId},
7	util::encoding::keycode::{KeyDeserializer, KeySerializer},
8};
9
10const VERSION: u8 = 1;
11
12/// Key for storing retention policy for a data source (table, view, ringbuffer)
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct SourceRetentionPolicyKey {
15	pub source: SourceId,
16}
17
18impl SourceRetentionPolicyKey {
19	pub fn encoded(source: impl Into<SourceId>) -> EncodedKey {
20		Self {
21			source: source.into(),
22		}
23		.encode()
24	}
25}
26
27impl EncodableKey for SourceRetentionPolicyKey {
28	const KIND: KeyKind = KeyKind::SourceRetentionPolicy;
29
30	fn encode(&self) -> EncodedKey {
31		let mut serializer = KeySerializer::with_capacity(11);
32		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
33
34		// Encode source_id with discriminator
35		match &self.source {
36			SourceId::Table(id) => {
37				serializer.extend_u8(0x01).extend_u64(id.0);
38			}
39			SourceId::View(id) => {
40				serializer.extend_u8(0x02).extend_u64(id.0);
41			}
42			SourceId::Flow(id) => {
43				serializer.extend_u8(0x05).extend_u64(id.0);
44			}
45			SourceId::TableVirtual(id) => {
46				serializer.extend_u8(0x03).extend_u64(id.0);
47			}
48			SourceId::RingBuffer(id) => {
49				serializer.extend_u8(0x04).extend_u64(id.0);
50			}
51			SourceId::Dictionary(id) => {
52				serializer.extend_u8(0x06).extend_u64(id.0);
53			}
54		}
55
56		serializer.to_encoded_key()
57	}
58
59	fn decode(key: &EncodedKey) -> Option<Self> {
60		let mut de = KeyDeserializer::from_bytes(key.as_slice());
61
62		let version = de.read_u8().ok()?;
63		if version != VERSION {
64			return None;
65		}
66
67		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
68		if kind != Self::KIND {
69			return None;
70		}
71
72		let discriminator = de.read_u8().ok()?;
73		let id = de.read_u64().ok()?;
74
75		let source_id = match discriminator {
76			0x01 => SourceId::Table(TableId(id)),
77			0x02 => SourceId::View(ViewId(id)),
78			0x03 => SourceId::TableVirtual(TableVirtualId(id)),
79			0x04 => SourceId::RingBuffer(RingBufferId(id)),
80			0x05 => SourceId::Flow(FlowId(id)),
81			0x06 => SourceId::Dictionary(DictionaryId(id)),
82			_ => return None,
83		};
84
85		Some(Self {
86			source: source_id,
87		})
88	}
89}
90
91/// Key for storing retention policy for a flow operator
92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
93pub struct OperatorRetentionPolicyKey {
94	pub operator: FlowNodeId,
95}
96
97impl OperatorRetentionPolicyKey {
98	pub fn encoded(operator: impl Into<FlowNodeId>) -> EncodedKey {
99		Self {
100			operator: operator.into(),
101		}
102		.encode()
103	}
104}
105
106impl EncodableKey for OperatorRetentionPolicyKey {
107	const KIND: KeyKind = KeyKind::OperatorRetentionPolicy;
108
109	fn encode(&self) -> EncodedKey {
110		let mut serializer = KeySerializer::with_capacity(10);
111		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.operator);
112		serializer.to_encoded_key()
113	}
114
115	fn decode(key: &EncodedKey) -> Option<Self> {
116		let mut de = KeyDeserializer::from_bytes(key.as_slice());
117
118		let version = de.read_u8().ok()?;
119		if version != VERSION {
120			return None;
121		}
122
123		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
124		if kind != Self::KIND {
125			return None;
126		}
127
128		Some(Self {
129			operator: FlowNodeId(de.read_u64().ok()?),
130		})
131	}
132}
133
134/// Range for scanning all source retention policies
135pub struct SourceRetentionPolicyKeyRange;
136
137impl SourceRetentionPolicyKeyRange {
138	pub fn full_scan() -> EncodedKeyRange {
139		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
140	}
141
142	fn start() -> EncodedKey {
143		let mut serializer = KeySerializer::with_capacity(2);
144		serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8);
145		serializer.to_encoded_key()
146	}
147
148	fn end() -> EncodedKey {
149		let mut serializer = KeySerializer::with_capacity(2);
150		serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8 - 1);
151		serializer.to_encoded_key()
152	}
153}
154
155/// Range for scanning all operator retention policies
156pub struct OperatorRetentionPolicyKeyRange;
157
158impl OperatorRetentionPolicyKeyRange {
159	pub fn full_scan() -> EncodedKeyRange {
160		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
161	}
162
163	fn start() -> EncodedKey {
164		let mut serializer = KeySerializer::with_capacity(2);
165		serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8);
166		serializer.to_encoded_key()
167	}
168
169	fn end() -> EncodedKey {
170		let mut serializer = KeySerializer::with_capacity(2);
171		serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8 - 1);
172		serializer.to_encoded_key()
173	}
174}
175
176#[cfg(test)]
177mod tests {
178	use super::*;
179
180	#[test]
181	fn test_source_retention_policy_key_encoding() {
182		let key = SourceRetentionPolicyKey {
183			source: SourceId::Table(TableId(42)),
184		};
185
186		let encoded = key.encode();
187		assert_eq!(encoded[0], 0xFE); // version (1 encoded as !1)
188		assert_eq!(encoded[1], 0xE8); // kind (0x17 encoded as !0x17)
189		assert_eq!(&encoded[3..11], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xD5]);
190
191		let decoded = SourceRetentionPolicyKey::decode(&encoded).unwrap();
192		assert_eq!(key, decoded);
193	}
194
195	#[test]
196	fn test_operator_retention_policy_key_encoding() {
197		let key = OperatorRetentionPolicyKey {
198			operator: FlowNodeId(12345),
199		};
200
201		let encoded = key.encode();
202		assert_eq!(encoded[0], 0xFE); // version (1 encoded as !1)
203		assert_eq!(encoded[1], 0xE7); // kind (0x18 encoded as !0x18)
204		assert_eq!(&encoded[2..10], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xCF, 0xC6]);
205
206		let decoded = OperatorRetentionPolicyKey::decode(&encoded).unwrap();
207		assert_eq!(key, decoded);
208	}
209}