reifydb_core/key/
retention_policy.rs1use reifydb_type::value::dictionary::DictionaryId;
5use serde::{Deserialize, Serialize};
6
7use super::{EncodableKey, KeyKind};
8use crate::{
9 encoded::key::{EncodedKey, EncodedKeyRange},
10 interface::catalog::{
11 flow::{FlowId, FlowNodeId},
12 id::{RingBufferId, TableId, ViewId},
13 primitive::PrimitiveId,
14 vtable::VTableId,
15 },
16 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
17};
18
19const VERSION: u8 = 1;
20
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub struct PrimitiveRetentionPolicyKey {
24 pub primitive: PrimitiveId,
25}
26
27impl PrimitiveRetentionPolicyKey {
28 pub fn encoded(primitive: impl Into<PrimitiveId>) -> EncodedKey {
29 Self {
30 primitive: primitive.into(),
31 }
32 .encode()
33 }
34}
35
36impl EncodableKey for PrimitiveRetentionPolicyKey {
37 const KIND: KeyKind = KeyKind::PrimitiveRetentionPolicy;
38
39 fn encode(&self) -> EncodedKey {
40 let mut serializer = KeySerializer::with_capacity(11);
41 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
42
43 match &self.primitive {
45 PrimitiveId::Table(id) => {
46 serializer.extend_u8(0x01).extend_u64(id.0);
47 }
48 PrimitiveId::View(id) => {
49 serializer.extend_u8(0x02).extend_u64(id.0);
50 }
51 PrimitiveId::Flow(id) => {
52 serializer.extend_u8(0x05).extend_u64(id.0);
53 }
54 PrimitiveId::TableVirtual(id) => {
55 serializer.extend_u8(0x03).extend_u64(id.0);
56 }
57 PrimitiveId::RingBuffer(id) => {
58 serializer.extend_u8(0x04).extend_u64(id.0);
59 }
60 PrimitiveId::Dictionary(id) => {
61 serializer.extend_u8(0x06).extend_u64(id.0);
62 }
63 }
64
65 serializer.to_encoded_key()
66 }
67
68 fn decode(key: &EncodedKey) -> Option<Self> {
69 let mut de = KeyDeserializer::from_bytes(key.as_slice());
70
71 let version = de.read_u8().ok()?;
72 if version != VERSION {
73 return None;
74 }
75
76 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
77 if kind != Self::KIND {
78 return None;
79 }
80
81 let discriminator = de.read_u8().ok()?;
82 let id = de.read_u64().ok()?;
83
84 let primitive_id = match discriminator {
85 0x01 => PrimitiveId::Table(TableId(id)),
86 0x02 => PrimitiveId::View(ViewId(id)),
87 0x03 => PrimitiveId::TableVirtual(VTableId(id)),
88 0x04 => PrimitiveId::RingBuffer(RingBufferId(id)),
89 0x05 => PrimitiveId::Flow(FlowId(id)),
90 0x06 => PrimitiveId::Dictionary(DictionaryId(id)),
91 _ => return None,
92 };
93
94 Some(Self {
95 primitive: primitive_id,
96 })
97 }
98}
99
100#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
102pub struct OperatorRetentionPolicyKey {
103 pub operator: FlowNodeId,
104}
105
106impl OperatorRetentionPolicyKey {
107 pub fn encoded(operator: impl Into<FlowNodeId>) -> EncodedKey {
108 Self {
109 operator: operator.into(),
110 }
111 .encode()
112 }
113}
114
115impl EncodableKey for OperatorRetentionPolicyKey {
116 const KIND: KeyKind = KeyKind::OperatorRetentionPolicy;
117
118 fn encode(&self) -> EncodedKey {
119 let mut serializer = KeySerializer::with_capacity(10);
120 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.operator);
121 serializer.to_encoded_key()
122 }
123
124 fn decode(key: &EncodedKey) -> Option<Self> {
125 let mut de = KeyDeserializer::from_bytes(key.as_slice());
126
127 let version = de.read_u8().ok()?;
128 if version != VERSION {
129 return None;
130 }
131
132 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
133 if kind != Self::KIND {
134 return None;
135 }
136
137 Some(Self {
138 operator: FlowNodeId(de.read_u64().ok()?),
139 })
140 }
141}
142
143pub struct PrimitiveRetentionPolicyKeyRange;
145
146impl PrimitiveRetentionPolicyKeyRange {
147 pub fn full_scan() -> EncodedKeyRange {
148 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
149 }
150
151 fn start() -> EncodedKey {
152 let mut serializer = KeySerializer::with_capacity(2);
153 serializer.extend_u8(VERSION).extend_u8(PrimitiveRetentionPolicyKey::KIND as u8);
154 serializer.to_encoded_key()
155 }
156
157 fn end() -> EncodedKey {
158 let mut serializer = KeySerializer::with_capacity(2);
159 serializer.extend_u8(VERSION).extend_u8(PrimitiveRetentionPolicyKey::KIND as u8 - 1);
160 serializer.to_encoded_key()
161 }
162}
163
164pub struct OperatorRetentionPolicyKeyRange;
166
167impl OperatorRetentionPolicyKeyRange {
168 pub fn full_scan() -> EncodedKeyRange {
169 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
170 }
171
172 fn start() -> EncodedKey {
173 let mut serializer = KeySerializer::with_capacity(2);
174 serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8);
175 serializer.to_encoded_key()
176 }
177
178 fn end() -> EncodedKey {
179 let mut serializer = KeySerializer::with_capacity(2);
180 serializer.extend_u8(VERSION).extend_u8(OperatorRetentionPolicyKey::KIND as u8 - 1);
181 serializer.to_encoded_key()
182 }
183}
184
185#[cfg(test)]
186pub mod tests {
187 use super::*;
188
189 #[test]
190 fn test_primitive_retention_policy_key_encoding() {
191 let key = PrimitiveRetentionPolicyKey {
192 primitive: PrimitiveId::Table(TableId(42)),
193 };
194
195 let encoded = key.encode();
196 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE8); assert_eq!(&encoded[3..11], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xD5]);
199
200 let decoded = PrimitiveRetentionPolicyKey::decode(&encoded).unwrap();
201 assert_eq!(key, decoded);
202 }
203
204 #[test]
205 fn test_operator_retention_policy_key_encoding() {
206 let key = OperatorRetentionPolicyKey {
207 operator: FlowNodeId(12345),
208 };
209
210 let encoded = key.encode();
211 assert_eq!(encoded[0], 0xFE); assert_eq!(encoded[1], 0xE7); assert_eq!(&encoded[2..10], &[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xCF, 0xC6]);
214
215 let decoded = OperatorRetentionPolicyKey::decode(&encoded).unwrap();
216 assert_eq!(key, decoded);
217 }
218}