Skip to main content

reifydb_core/key/
retention_strategy.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_type::value::dictionary::DictionaryId;
5use serde::{Deserialize, Serialize};
6
7use super::{EncodableKey, KeyKind};
8use crate::{
9	encoded::key::{EncodedKey, EncodedKeyRange},
10	interface::catalog::{
11		flow::FlowNodeId,
12		id::{RingBufferId, SeriesId, TableId, ViewId},
13		shape::ShapeId,
14		vtable::VTableId,
15	},
16	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
17};
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct ShapeRetentionStrategyKey {
21	pub shape: ShapeId,
22}
23
24impl ShapeRetentionStrategyKey {
25	pub fn encoded(shape: impl Into<ShapeId>) -> EncodedKey {
26		Self {
27			shape: shape.into(),
28		}
29		.encode()
30	}
31}
32
33impl EncodableKey for ShapeRetentionStrategyKey {
34	const KIND: KeyKind = KeyKind::ShapeRetentionStrategy;
35
36	fn encode(&self) -> EncodedKey {
37		let mut serializer = KeySerializer::with_capacity(10);
38		serializer.extend_u8(Self::KIND as u8);
39
40		match &self.shape {
41			ShapeId::Table(id) => {
42				serializer.extend_u8(0x01).extend_u64(id.0);
43			}
44			ShapeId::View(id) => {
45				serializer.extend_u8(0x02).extend_u64(id.0);
46			}
47			ShapeId::TableVirtual(id) => {
48				serializer.extend_u8(0x03).extend_u64(id.0);
49			}
50			ShapeId::RingBuffer(id) => {
51				serializer.extend_u8(0x04).extend_u64(id.0);
52			}
53			ShapeId::Dictionary(id) => {
54				serializer.extend_u8(0x06).extend_u64(id.0);
55			}
56			ShapeId::Series(id) => {
57				serializer.extend_u8(0x07).extend_u64(id.0);
58			}
59		}
60
61		serializer.to_encoded_key()
62	}
63
64	fn decode(key: &EncodedKey) -> Option<Self> {
65		let mut de = KeyDeserializer::from_bytes(key.as_slice());
66
67		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
68		if kind != Self::KIND {
69			return None;
70		}
71
72		let discriminator = de.read_u8().ok()?;
73		let id = de.read_u64().ok()?;
74
75		let object_id = match discriminator {
76			0x01 => ShapeId::Table(TableId(id)),
77			0x02 => ShapeId::View(ViewId(id)),
78			0x03 => ShapeId::TableVirtual(VTableId(id)),
79			0x04 => ShapeId::RingBuffer(RingBufferId(id)),
80			0x06 => ShapeId::Dictionary(DictionaryId(id)),
81			0x07 => ShapeId::Series(SeriesId(id)),
82			_ => return None,
83		};
84
85		Some(Self {
86			shape: object_id,
87		})
88	}
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92pub struct OperatorRetentionStrategyKey {
93	pub operator: FlowNodeId,
94}
95
96impl OperatorRetentionStrategyKey {
97	pub fn encoded(operator: impl Into<FlowNodeId>) -> EncodedKey {
98		Self {
99			operator: operator.into(),
100		}
101		.encode()
102	}
103}
104
105impl EncodableKey for OperatorRetentionStrategyKey {
106	const KIND: KeyKind = KeyKind::OperatorRetentionStrategy;
107
108	fn encode(&self) -> EncodedKey {
109		let mut serializer = KeySerializer::with_capacity(9);
110		serializer.extend_u8(Self::KIND as u8).extend_u64(self.operator);
111		serializer.to_encoded_key()
112	}
113
114	fn decode(key: &EncodedKey) -> Option<Self> {
115		let mut de = KeyDeserializer::from_bytes(key.as_slice());
116
117		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
118		if kind != Self::KIND {
119			return None;
120		}
121
122		Some(Self {
123			operator: FlowNodeId(de.read_u64().ok()?),
124		})
125	}
126}
127
128pub struct ShapeRetentionStrategyKeyRange;
129
130impl ShapeRetentionStrategyKeyRange {
131	pub fn full_scan() -> EncodedKeyRange {
132		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
133	}
134
135	fn start() -> EncodedKey {
136		let mut serializer = KeySerializer::with_capacity(1);
137		serializer.extend_u8(ShapeRetentionStrategyKey::KIND as u8);
138		serializer.to_encoded_key()
139	}
140
141	fn end() -> EncodedKey {
142		let mut serializer = KeySerializer::with_capacity(1);
143		serializer.extend_u8(ShapeRetentionStrategyKey::KIND as u8 - 1);
144		serializer.to_encoded_key()
145	}
146}
147
148pub struct OperatorRetentionStrategyKeyRange;
149
150impl OperatorRetentionStrategyKeyRange {
151	pub fn full_scan() -> EncodedKeyRange {
152		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
153	}
154
155	fn start() -> EncodedKey {
156		let mut serializer = KeySerializer::with_capacity(1);
157		serializer.extend_u8(OperatorRetentionStrategyKey::KIND as u8);
158		serializer.to_encoded_key()
159	}
160
161	fn end() -> EncodedKey {
162		let mut serializer = KeySerializer::with_capacity(1);
163		serializer.extend_u8(OperatorRetentionStrategyKey::KIND as u8 - 1);
164		serializer.to_encoded_key()
165	}
166}
167
168#[cfg(test)]
169pub mod tests {
170	use super::*;
171
172	#[test]
173	fn test_shape_retention_strategy_key_encoding() {
174		let key = ShapeRetentionStrategyKey {
175			shape: ShapeId::Table(TableId(42)),
176		};
177
178		let encoded = key.encode();
179		assert_eq!(encoded[0], 0xE8);
180
181		assert_eq!(encoded.len(), 3);
182		assert_eq!(encoded[1], 0xFE);
183		assert_eq!(encoded[2], 0xD5);
184
185		let decoded = ShapeRetentionStrategyKey::decode(&encoded).unwrap();
186		assert_eq!(key, decoded);
187	}
188
189	#[test]
190	fn test_operator_retention_strategy_key_encoding() {
191		let key = OperatorRetentionStrategyKey {
192			operator: FlowNodeId(12345),
193		};
194
195		let encoded = key.encode();
196		assert_eq!(encoded[0], 0xE7);
197
198		assert_eq!(encoded.len(), 3);
199		assert_eq!(encoded[1], 0x4F);
200		assert_eq!(encoded[2], 0xC6);
201
202		let decoded = OperatorRetentionStrategyKey::decode(&encoded).unwrap();
203		assert_eq!(key, decoded);
204	}
205}