reifydb_core/key/
retention_strategy.rs1use reifydb_type::value::dictionary::DictionaryId;
5use serde::{Deserialize, Serialize};
6
7use super::{EncodableKey, KeyKind};
8use crate::{
9 encoded::key::{EncodedKey, EncodedKeyRange},
10 interface::catalog::{
11 flow::FlowNodeId,
12 id::{RingBufferId, SeriesId, TableId, ViewId},
13 shape::ShapeId,
14 vtable::VTableId,
15 },
16 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
17};
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct ShapeRetentionStrategyKey {
21 pub shape: ShapeId,
22}
23
24impl ShapeRetentionStrategyKey {
25 pub fn encoded(shape: impl Into<ShapeId>) -> EncodedKey {
26 Self {
27 shape: shape.into(),
28 }
29 .encode()
30 }
31}
32
33impl EncodableKey for ShapeRetentionStrategyKey {
34 const KIND: KeyKind = KeyKind::ShapeRetentionStrategy;
35
36 fn encode(&self) -> EncodedKey {
37 let mut serializer = KeySerializer::with_capacity(10);
38 serializer.extend_u8(Self::KIND as u8);
39
40 match &self.shape {
41 ShapeId::Table(id) => {
42 serializer.extend_u8(0x01).extend_u64(id.0);
43 }
44 ShapeId::View(id) => {
45 serializer.extend_u8(0x02).extend_u64(id.0);
46 }
47 ShapeId::TableVirtual(id) => {
48 serializer.extend_u8(0x03).extend_u64(id.0);
49 }
50 ShapeId::RingBuffer(id) => {
51 serializer.extend_u8(0x04).extend_u64(id.0);
52 }
53 ShapeId::Dictionary(id) => {
54 serializer.extend_u8(0x06).extend_u64(id.0);
55 }
56 ShapeId::Series(id) => {
57 serializer.extend_u8(0x07).extend_u64(id.0);
58 }
59 }
60
61 serializer.to_encoded_key()
62 }
63
64 fn decode(key: &EncodedKey) -> Option<Self> {
65 let mut de = KeyDeserializer::from_bytes(key.as_slice());
66
67 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
68 if kind != Self::KIND {
69 return None;
70 }
71
72 let discriminator = de.read_u8().ok()?;
73 let id = de.read_u64().ok()?;
74
75 let object_id = match discriminator {
76 0x01 => ShapeId::Table(TableId(id)),
77 0x02 => ShapeId::View(ViewId(id)),
78 0x03 => ShapeId::TableVirtual(VTableId(id)),
79 0x04 => ShapeId::RingBuffer(RingBufferId(id)),
80 0x06 => ShapeId::Dictionary(DictionaryId(id)),
81 0x07 => ShapeId::Series(SeriesId(id)),
82 _ => return None,
83 };
84
85 Some(Self {
86 shape: object_id,
87 })
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92pub struct OperatorRetentionStrategyKey {
93 pub operator: FlowNodeId,
94}
95
96impl OperatorRetentionStrategyKey {
97 pub fn encoded(operator: impl Into<FlowNodeId>) -> EncodedKey {
98 Self {
99 operator: operator.into(),
100 }
101 .encode()
102 }
103}
104
105impl EncodableKey for OperatorRetentionStrategyKey {
106 const KIND: KeyKind = KeyKind::OperatorRetentionStrategy;
107
108 fn encode(&self) -> EncodedKey {
109 let mut serializer = KeySerializer::with_capacity(9);
110 serializer.extend_u8(Self::KIND as u8).extend_u64(self.operator);
111 serializer.to_encoded_key()
112 }
113
114 fn decode(key: &EncodedKey) -> Option<Self> {
115 let mut de = KeyDeserializer::from_bytes(key.as_slice());
116
117 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
118 if kind != Self::KIND {
119 return None;
120 }
121
122 Some(Self {
123 operator: FlowNodeId(de.read_u64().ok()?),
124 })
125 }
126}
127
128pub struct ShapeRetentionStrategyKeyRange;
129
130impl ShapeRetentionStrategyKeyRange {
131 pub fn full_scan() -> EncodedKeyRange {
132 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
133 }
134
135 fn start() -> EncodedKey {
136 let mut serializer = KeySerializer::with_capacity(1);
137 serializer.extend_u8(ShapeRetentionStrategyKey::KIND as u8);
138 serializer.to_encoded_key()
139 }
140
141 fn end() -> EncodedKey {
142 let mut serializer = KeySerializer::with_capacity(1);
143 serializer.extend_u8(ShapeRetentionStrategyKey::KIND as u8 - 1);
144 serializer.to_encoded_key()
145 }
146}
147
148pub struct OperatorRetentionStrategyKeyRange;
149
150impl OperatorRetentionStrategyKeyRange {
151 pub fn full_scan() -> EncodedKeyRange {
152 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
153 }
154
155 fn start() -> EncodedKey {
156 let mut serializer = KeySerializer::with_capacity(1);
157 serializer.extend_u8(OperatorRetentionStrategyKey::KIND as u8);
158 serializer.to_encoded_key()
159 }
160
161 fn end() -> EncodedKey {
162 let mut serializer = KeySerializer::with_capacity(1);
163 serializer.extend_u8(OperatorRetentionStrategyKey::KIND as u8 - 1);
164 serializer.to_encoded_key()
165 }
166}
167
168#[cfg(test)]
169pub mod tests {
170 use super::*;
171
172 #[test]
173 fn test_shape_retention_strategy_key_encoding() {
174 let key = ShapeRetentionStrategyKey {
175 shape: ShapeId::Table(TableId(42)),
176 };
177
178 let encoded = key.encode();
179 assert_eq!(encoded[0], 0xE8);
180
181 assert_eq!(encoded.len(), 3);
182 assert_eq!(encoded[1], 0xFE);
183 assert_eq!(encoded[2], 0xD5);
184
185 let decoded = ShapeRetentionStrategyKey::decode(&encoded).unwrap();
186 assert_eq!(key, decoded);
187 }
188
189 #[test]
190 fn test_operator_retention_strategy_key_encoding() {
191 let key = OperatorRetentionStrategyKey {
192 operator: FlowNodeId(12345),
193 };
194
195 let encoded = key.encode();
196 assert_eq!(encoded[0], 0xE7);
197
198 assert_eq!(encoded.len(), 3);
199 assert_eq!(encoded[1], 0x4F);
200 assert_eq!(encoded[2], 0xC6);
201
202 let decoded = OperatorRetentionStrategyKey::decode(&encoded).unwrap();
203 assert_eq!(key, decoded);
204 }
205}