reifydb_core/key/
retention_policy.rs1use serde::{Deserialize, Serialize};
2
3use super::{EncodableKey, KeyKind};
4use crate::{
5 EncodedKey, EncodedKeyRange,
6 interface::{DictionaryId, FlowId, FlowNodeId, RingBufferId, SourceId, TableId, TableVirtualId, ViewId},
7 util::encoding::keycode::{KeyDeserializer, KeySerializer},
8};
9
10const VERSION: u8 = 1;
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct SourceRetentionPolicyKey {
15 pub source: SourceId,
16}
17
18impl EncodableKey for SourceRetentionPolicyKey {
19 const KIND: KeyKind = KeyKind::SourceRetentionPolicy;
20
21 fn encode(&self) -> EncodedKey {
22 let mut serializer = KeySerializer::with_capacity(11);
23 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
24
25 match &self.source {
27 SourceId::Table(id) => {
28 serializer.extend_u8(0x01).extend_u64(id.0);
29 }
30 SourceId::View(id) => {
31 serializer.extend_u8(0x02).extend_u64(id.0);
32 }
33 SourceId::Flow(id) => {
34 serializer.extend_u8(0x05).extend_u64(id.0);
35 }
36 SourceId::TableVirtual(id) => {
37 serializer.extend_u8(0x03).extend_u64(id.0);
38 }
39 SourceId::RingBuffer(id) => {
40 serializer.extend_u8(0x04).extend_u64(id.0);
41 }
42 SourceId::Dictionary(id) => {
43 serializer.extend_u8(0x06).extend_u64(id.0);
44 }
45 }
46
47 serializer.to_encoded_key()
48 }
49
50 fn decode(key: &EncodedKey) -> Option<Self> {
51 let mut de = KeyDeserializer::from_bytes(key.as_slice());
52
53 let version = de.read_u8().ok()?;
54 if version != VERSION {
55 return None;
56 }
57
58 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
59 if kind != Self::KIND {
60 return None;
61 }
62
63 let discriminator = de.read_u8().ok()?;
64 let id = de.read_u64().ok()?;
65
66 let source_id = match discriminator {
67 0x01 => SourceId::Table(TableId(id)),
68 0x02 => SourceId::View(ViewId(id)),
69 0x03 => SourceId::TableVirtual(TableVirtualId(id)),
70 0x04 => SourceId::RingBuffer(RingBufferId(id)),
71 0x05 => SourceId::Flow(FlowId(id)),
72 0x06 => SourceId::Dictionary(DictionaryId(id)),
73 _ => return None,
74 };
75
76 Some(Self {
77 source: source_id,
78 })
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
84pub struct OperatorRetentionPolicyKey {
85 pub operator: FlowNodeId,
86}
87
88impl EncodableKey for OperatorRetentionPolicyKey {
89 const KIND: KeyKind = KeyKind::OperatorRetentionPolicy;
90
91 fn encode(&self) -> EncodedKey {
92 let mut serializer = KeySerializer::with_capacity(10);
93 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.operator);
94 serializer.to_encoded_key()
95 }
96
97 fn decode(key: &EncodedKey) -> Option<Self> {
98 let mut de = KeyDeserializer::from_bytes(key.as_slice());
99
100 let version = de.read_u8().ok()?;
101 if version != VERSION {
102 return None;
103 }
104
105 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
106 if kind != Self::KIND {
107 return None;
108 }
109
110 Some(Self {
111 operator: FlowNodeId(de.read_u64().ok()?),
112 })
113 }
114}
115
116pub struct SourceRetentionPolicyKeyRange;
118
119impl SourceRetentionPolicyKeyRange {
120 pub fn full_scan() -> EncodedKeyRange {
121 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
122 }
123
124 fn start() -> EncodedKey {
125 let mut serializer = KeySerializer::with_capacity(2);
126 serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8);
127 serializer.to_encoded_key()
128 }
129
130 fn end() -> EncodedKey {
131 let mut serializer = KeySerializer::with_capacity(2);
132 serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8 - 1);
133 serializer.to_encoded_key()
134 }
135}
136
137pub struct OperatorRetentionPolicyKeyRange;
139
140impl OperatorRetentionPolicyKeyRange {
141 pub fn full_scan() -> EncodedKeyRange {
142 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
143 }
144
145 fn start() -> EncodedKey {
146 let mut serializer = KeySerializer::with_capacity(2);
147 serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8);
148 serializer.to_encoded_key()
149 }
150
151 fn end() -> EncodedKey {
152 let mut serializer = KeySerializer::with_capacity(2);
153 serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8 - 1);
154 serializer.to_encoded_key()
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn test_source_retention_policy_key_encoding() {
164 let key = SourceRetentionPolicyKey {
165 source: SourceId::Table(TableId(42)),
166 };
167
168 let encoded = key.encode();
169 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE8); assert_eq!(&encoded[3..11], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xD5]);
172
173 let decoded = SourceRetentionPolicyKey::decode(&encoded).unwrap();
174 assert_eq!(key, decoded);
175 }
176
177 #[test]
178 fn test_operator_retention_policy_key_encoding() {
179 let key = OperatorRetentionPolicyKey {
180 operator: FlowNodeId(12345),
181 };
182
183 let encoded = key.encode();
184 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE7); assert_eq!(&encoded[2..10], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xCF, 0xC6]);
187
188 let decoded = OperatorRetentionPolicyKey::decode(&encoded).unwrap();
189 assert_eq!(key, decoded);
190 }
191}