reifydb_core/key/
retention_policy.rs1use serde::{Deserialize, Serialize};
2
3use super::{EncodableKey, KeyKind};
4use crate::{
5 EncodedKey, EncodedKeyRange,
6 interface::{FlowId, FlowNodeId, RingBufferId, SourceId, TableId, TableVirtualId, ViewId},
7 util::encoding::keycode::{KeyDeserializer, KeySerializer},
8};
9
10const VERSION: u8 = 1;
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct SourceRetentionPolicyKey {
15 pub source: SourceId,
16}
17
18impl EncodableKey for SourceRetentionPolicyKey {
19 const KIND: KeyKind = KeyKind::SourceRetentionPolicy;
20
21 fn encode(&self) -> EncodedKey {
22 let mut serializer = KeySerializer::with_capacity(11);
23 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
24
25 match &self.source {
27 SourceId::Table(id) => {
28 serializer.extend_u8(0x01).extend_u64(id.0);
29 }
30 SourceId::View(id) => {
31 serializer.extend_u8(0x02).extend_u64(id.0);
32 }
33 SourceId::Flow(id) => {
34 serializer.extend_u8(0x05).extend_u64(id.0);
35 }
36 SourceId::TableVirtual(id) => {
37 serializer.extend_u8(0x03).extend_u64(id.0);
38 }
39 SourceId::RingBuffer(id) => {
40 serializer.extend_u8(0x04).extend_u64(id.0);
41 }
42 }
43
44 serializer.to_encoded_key()
45 }
46
47 fn decode(key: &EncodedKey) -> Option<Self> {
48 let mut de = KeyDeserializer::from_bytes(key.as_slice());
49
50 let version = de.read_u8().ok()?;
51 if version != VERSION {
52 return None;
53 }
54
55 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
56 if kind != Self::KIND {
57 return None;
58 }
59
60 let discriminator = de.read_u8().ok()?;
61 let id = de.read_u64().ok()?;
62
63 let source_id = match discriminator {
64 0x01 => SourceId::Table(TableId(id)),
65 0x02 => SourceId::View(ViewId(id)),
66 0x03 => SourceId::TableVirtual(TableVirtualId(id)),
67 0x04 => SourceId::RingBuffer(RingBufferId(id)),
68 0x05 => SourceId::Flow(FlowId(id)),
69 _ => return None,
70 };
71
72 Some(Self {
73 source: source_id,
74 })
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
80pub struct OperatorRetentionPolicyKey {
81 pub operator: FlowNodeId,
82}
83
84impl EncodableKey for OperatorRetentionPolicyKey {
85 const KIND: KeyKind = KeyKind::OperatorRetentionPolicy;
86
87 fn encode(&self) -> EncodedKey {
88 let mut serializer = KeySerializer::with_capacity(10);
89 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.operator);
90 serializer.to_encoded_key()
91 }
92
93 fn decode(key: &EncodedKey) -> Option<Self> {
94 let mut de = KeyDeserializer::from_bytes(key.as_slice());
95
96 let version = de.read_u8().ok()?;
97 if version != VERSION {
98 return None;
99 }
100
101 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
102 if kind != Self::KIND {
103 return None;
104 }
105
106 Some(Self {
107 operator: FlowNodeId(de.read_u64().ok()?),
108 })
109 }
110}
111
112pub struct SourceRetentionPolicyKeyRange;
114
115impl SourceRetentionPolicyKeyRange {
116 pub fn full_scan() -> EncodedKeyRange {
117 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
118 }
119
120 fn start() -> EncodedKey {
121 let mut serializer = KeySerializer::with_capacity(2);
122 serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8);
123 serializer.to_encoded_key()
124 }
125
126 fn end() -> EncodedKey {
127 let mut serializer = KeySerializer::with_capacity(2);
128 serializer.extend_u8(VERSION).extend_u8(SourceRetentionPolicyKey::KIND as u8 - 1);
129 serializer.to_encoded_key()
130 }
131}
132
133pub struct OperatorRetentionPolicyKeyRange;
135
136impl OperatorRetentionPolicyKeyRange {
137 pub fn full_scan() -> EncodedKeyRange {
138 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
139 }
140
141 fn start() -> EncodedKey {
142 let mut serializer = KeySerializer::with_capacity(2);
143 serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8);
144 serializer.to_encoded_key()
145 }
146
147 fn end() -> EncodedKey {
148 let mut serializer = KeySerializer::with_capacity(2);
149 serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8 - 1);
150 serializer.to_encoded_key()
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[test]
159 fn test_source_retention_policy_key_encoding() {
160 let key = SourceRetentionPolicyKey {
161 source: SourceId::Table(TableId(42)),
162 };
163
164 let encoded = key.encode();
165 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE8); assert_eq!(&encoded[3..11], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xD5]);
168
169 let decoded = SourceRetentionPolicyKey::decode(&encoded).unwrap();
170 assert_eq!(key, decoded);
171 }
172
173 #[test]
174 fn test_operator_retention_policy_key_encoding() {
175 let key = OperatorRetentionPolicyKey {
176 operator: FlowNodeId(12345),
177 };
178
179 let encoded = key.encode();
180 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE7); assert_eq!(&encoded[2..10], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xCF, 0xC6]);
183
184 let decoded = OperatorRetentionPolicyKey::decode(&encoded).unwrap();
185 assert_eq!(key, decoded);
186 }
187}