reifydb_core/key/
retention_policy.rs1use serde::{Deserialize, Serialize};
2
3use super::{EncodableKey, KeyKind};
4use crate::{
5 EncodedKey, EncodedKeyRange,
6 interface::{DictionaryId, FlowId, FlowNodeId, RingBufferId, SourceId, TableId, TableVirtualId, ViewId},
7 util::encoding::keycode::{KeyDeserializer, KeySerializer},
8};
9
10const VERSION: u8 = 1;
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct SourceRetentionPolicyKey {
15 pub source: SourceId,
16}
17
18impl SourceRetentionPolicyKey {
19 pub fn encoded(source: impl Into<SourceId>) -> EncodedKey {
20 Self {
21 source: source.into(),
22 }
23 .encode()
24 }
25}
26
27impl EncodableKey for SourceRetentionPolicyKey {
28 const KIND: KeyKind = KeyKind::SourceRetentionPolicy;
29
30 fn encode(&self) -> EncodedKey {
31 let mut serializer = KeySerializer::with_capacity(11);
32 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
33
34 match &self.source {
36 SourceId::Table(id) => {
37 serializer.extend_u8(0x01).extend_u64(id.0);
38 }
39 SourceId::View(id) => {
40 serializer.extend_u8(0x02).extend_u64(id.0);
41 }
42 SourceId::Flow(id) => {
43 serializer.extend_u8(0x05).extend_u64(id.0);
44 }
45 SourceId::TableVirtual(id) => {
46 serializer.extend_u8(0x03).extend_u64(id.0);
47 }
48 SourceId::RingBuffer(id) => {
49 serializer.extend_u8(0x04).extend_u64(id.0);
50 }
51 SourceId::Dictionary(id) => {
52 serializer.extend_u8(0x06).extend_u64(id.0);
53 }
54 }
55
56 serializer.to_encoded_key()
57 }
58
59 fn decode(key: &EncodedKey) -> Option<Self> {
60 let mut de = KeyDeserializer::from_bytes(key.as_slice());
61
62 let version = de.read_u8().ok()?;
63 if version != VERSION {
64 return None;
65 }
66
67 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
68 if kind != Self::KIND {
69 return None;
70 }
71
72 let discriminator = de.read_u8().ok()?;
73 let id = de.read_u64().ok()?;
74
75 let source_id = match discriminator {
76 0x01 => SourceId::Table(TableId(id)),
77 0x02 => SourceId::View(ViewId(id)),
78 0x03 => SourceId::TableVirtual(TableVirtualId(id)),
79 0x04 => SourceId::RingBuffer(RingBufferId(id)),
80 0x05 => SourceId::Flow(FlowId(id)),
81 0x06 => SourceId::Dictionary(DictionaryId(id)),
82 _ => return None,
83 };
84
85 Some(Self {
86 source: source_id,
87 })
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
93pub struct OperatorRetentionPolicyKey {
94 pub operator: FlowNodeId,
95}
96
97impl OperatorRetentionPolicyKey {
98 pub fn encoded(operator: impl Into<FlowNodeId>) -> EncodedKey {
99 Self {
100 operator: operator.into(),
101 }
102 .encode()
103 }
104}
105
106impl EncodableKey for OperatorRetentionPolicyKey {
107 const KIND: KeyKind = KeyKind::OperatorRetentionPolicy;
108
109 fn encode(&self) -> EncodedKey {
110 let mut serializer = KeySerializer::with_capacity(10);
111 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.operator);
112 serializer.to_encoded_key()
113 }
114
115 fn decode(key: &EncodedKey) -> Option<Self> {
116 let mut de = KeyDeserializer::from_bytes(key.as_slice());
117
118 let version = de.read_u8().ok()?;
119 if version != VERSION {
120 return None;
121 }
122
123 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
124 if kind != Self::KIND {
125 return None;
126 }
127
128 Some(Self {
129 operator: FlowNodeId(de.read_u64().ok()?),
130 })
131 }
132}
133
134pub struct SourceRetentionPolicyKeyRange;
136
137impl SourceRetentionPolicyKeyRange {
138 pub fn full_scan() -> EncodedKeyRange {
139 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
140 }
141
142 fn start() -> EncodedKey {
143 let mut serializer = KeySerializer::with_capacity(2);
144 serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8);
145 serializer.to_encoded_key()
146 }
147
148 fn end() -> EncodedKey {
149 let mut serializer = KeySerializer::with_capacity(2);
150 serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8 - 1);
151 serializer.to_encoded_key()
152 }
153}
154
155pub struct OperatorRetentionPolicyKeyRange;
157
158impl OperatorRetentionPolicyKeyRange {
159 pub fn full_scan() -> EncodedKeyRange {
160 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
161 }
162
163 fn start() -> EncodedKey {
164 let mut serializer = KeySerializer::with_capacity(2);
165 serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8);
166 serializer.to_encoded_key()
167 }
168
169 fn end() -> EncodedKey {
170 let mut serializer = KeySerializer::with_capacity(2);
171 serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8 - 1);
172 serializer.to_encoded_key()
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179
180 #[test]
181 fn test_source_retention_policy_key_encoding() {
182 let key = SourceRetentionPolicyKey {
183 source: SourceId::Table(TableId(42)),
184 };
185
186 let encoded = key.encode();
187 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE8); assert_eq!(&encoded[3..11], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xD5]);
190
191 let decoded = SourceRetentionPolicyKey::decode(&encoded).unwrap();
192 assert_eq!(key, decoded);
193 }
194
195 #[test]
196 fn test_operator_retention_policy_key_encoding() {
197 let key = OperatorRetentionPolicyKey {
198 operator: FlowNodeId(12345),
199 };
200
201 let encoded = key.encode();
202 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE7); assert_eq!(&encoded[2..10], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xCF, 0xC6]);
205
206 let decoded = OperatorRetentionPolicyKey::decode(&encoded).unwrap();
207 assert_eq!(key, decoded);
208 }
209}