reifydb_core/key/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4pub use cdc_consumer::{CdcConsumerKey, CdcConsumerKeyRange, ToConsumerKey};
5pub use column::ColumnKey;
6pub use column_policy::ColumnPolicyKey;
7pub use column_sequence::ColumnSequenceKey;
8pub use columns::ColumnsKey;
9pub use flow::FlowKey;
10pub use flow_node_state::{FlowNodeStateKey, FlowNodeStateKeyRange};
11pub use index::{IndexKey, SourceIndexKeyRange};
12pub use index_entry::IndexEntryKey;
13pub use kind::KeyKind;
14pub use namespace::NamespaceKey;
15pub use namespace_flow::NamespaceFlowKey;
16pub use namespace_ring_buffer::NamespaceRingBufferKey;
17pub use namespace_table::NamespaceTableKey;
18pub use namespace_view::NamespaceViewKey;
19pub use primary_key::PrimaryKeyKey;
20pub use retention_policy::{
21	OperatorRetentionPolicyKey, OperatorRetentionPolicyKeyRange, SourceRetentionPolicyKey,
22	SourceRetentionPolicyKeyRange,
23};
24pub use ring_buffer::{RingBufferKey, RingBufferMetadataKey};
25pub use row::{RowKey, RowKeyRange};
26pub use row_sequence::RowSequenceKey;
27pub use system_sequence::SystemSequenceKey;
28pub use system_version::{SystemVersion, SystemVersionKey};
29pub use table::TableKey;
30pub use transaction_version::TransactionVersionKey;
31pub use view::ViewKey;
32
33use crate::{EncodedKey, EncodedKeyRange, util::encoding::keycode};
34
35mod cdc_consumer;
36mod column;
37mod column_policy;
38mod column_sequence;
39mod columns;
40mod flow;
41mod flow_node_state;
42mod index;
43mod index_entry;
44mod kind;
45mod namespace;
46mod namespace_flow;
47mod namespace_ring_buffer;
48mod namespace_table;
49mod namespace_view;
50mod primary_key;
51mod retention_policy;
52mod ring_buffer;
53mod row;
54mod row_sequence;
55mod system_sequence;
56mod system_version;
57mod table;
58mod transaction_version;
59mod view;
60
61#[derive(Debug)]
62pub enum Key {
63	CdcConsumer(CdcConsumerKey),
64	Namespace(NamespaceKey),
65	NamespaceTable(NamespaceTableKey),
66	NamespaceView(NamespaceViewKey),
67	NamespaceFlow(NamespaceFlowKey),
68	SystemSequence(SystemSequenceKey),
69	Table(TableKey),
70	Flow(FlowKey),
71	Column(ColumnKey),
72	Columns(ColumnsKey),
73	Index(IndexKey),
74	IndexEntry(IndexEntryKey),
75	FlowNodeState(FlowNodeStateKey),
76	PrimaryKey(PrimaryKeyKey),
77	Row(RowKey),
78	RowSequence(RowSequenceKey),
79	TableColumnSequence(ColumnSequenceKey),
80	TableColumnPolicy(ColumnPolicyKey),
81	SystemVersion(SystemVersionKey),
82	TransactionVersion(TransactionVersionKey),
83	View(ViewKey),
84	RingBuffer(RingBufferKey),
85	RingBufferMetadata(RingBufferMetadataKey),
86	NamespaceRingBuffer(NamespaceRingBufferKey),
87	SourceRetentionPolicy(SourceRetentionPolicyKey),
88	OperatorRetentionPolicy(OperatorRetentionPolicyKey),
89}
90
91impl Key {
92	pub fn encode(&self) -> EncodedKey {
93		match &self {
94			Key::CdcConsumer(key) => key.encode(),
95			Key::Namespace(key) => key.encode(),
96			Key::NamespaceTable(key) => key.encode(),
97			Key::NamespaceView(key) => key.encode(),
98			Key::NamespaceFlow(key) => key.encode(),
99			Key::Table(key) => key.encode(),
100			Key::Flow(key) => key.encode(),
101			Key::Column(key) => key.encode(),
102			Key::Columns(key) => key.encode(),
103			Key::TableColumnPolicy(key) => key.encode(),
104			Key::Index(key) => key.encode(),
105			Key::IndexEntry(key) => key.encode(),
106			Key::FlowNodeState(key) => key.encode(),
107			Key::PrimaryKey(key) => key.encode(),
108			Key::Row(key) => key.encode(),
109			Key::RowSequence(key) => key.encode(),
110			Key::TableColumnSequence(key) => key.encode(),
111			Key::SystemSequence(key) => key.encode(),
112			Key::SystemVersion(key) => key.encode(),
113			Key::TransactionVersion(key) => key.encode(),
114			Key::View(key) => key.encode(),
115			Key::RingBuffer(key) => key.encode(),
116			Key::RingBufferMetadata(key) => key.encode(),
117			Key::NamespaceRingBuffer(key) => key.encode(),
118			Key::SourceRetentionPolicy(key) => key.encode(),
119			Key::OperatorRetentionPolicy(key) => key.encode(),
120		}
121	}
122}
123
124pub trait EncodableKey {
125	const KIND: KeyKind;
126
127	fn encode(&self) -> EncodedKey;
128
129	fn decode(key: &EncodedKey) -> Option<Self>
130	where
131		Self: Sized;
132}
133
134pub trait EncodableKeyRange {
135	const KIND: KeyKind;
136
137	fn start(&self) -> Option<EncodedKey>;
138
139	fn end(&self) -> Option<EncodedKey>;
140
141	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
142	where
143		Self: Sized;
144}
145
146impl Key {
147	pub fn kind(key: &EncodedKey) -> Option<KeyKind> {
148		if key.len() < 2 {
149			return None;
150		}
151
152		keycode::deserialize(&key[1..2]).ok()
153	}
154
155	pub fn decode(key: &EncodedKey) -> Option<Self> {
156		if key.len() < 2 {
157			return None;
158		}
159
160		let kind: KeyKind = keycode::deserialize(&key[1..2]).ok()?;
161		match kind {
162			KeyKind::CdcConsumer => CdcConsumerKey::decode(&key).map(Self::CdcConsumer),
163			KeyKind::Columns => ColumnsKey::decode(&key).map(Self::Columns),
164			KeyKind::ColumnPolicy => ColumnPolicyKey::decode(&key).map(Self::TableColumnPolicy),
165			KeyKind::Namespace => NamespaceKey::decode(&key).map(Self::Namespace),
166			KeyKind::NamespaceTable => NamespaceTableKey::decode(&key).map(Self::NamespaceTable),
167			KeyKind::NamespaceView => NamespaceViewKey::decode(&key).map(Self::NamespaceView),
168			KeyKind::NamespaceFlow => NamespaceFlowKey::decode(&key).map(Self::NamespaceFlow),
169			KeyKind::Table => TableKey::decode(&key).map(Self::Table),
170			KeyKind::Flow => FlowKey::decode(&key).map(Self::Flow),
171			KeyKind::Column => ColumnKey::decode(&key).map(Self::Column),
172			KeyKind::Index => IndexKey::decode(&key).map(Self::Index),
173			KeyKind::IndexEntry => IndexEntryKey::decode(&key).map(Self::IndexEntry),
174			KeyKind::FlowNodeState => FlowNodeStateKey::decode(&key).map(Self::FlowNodeState),
175			KeyKind::Row => RowKey::decode(&key).map(Self::Row),
176			KeyKind::RowSequence => RowSequenceKey::decode(&key).map(Self::RowSequence),
177			KeyKind::ColumnSequence => ColumnSequenceKey::decode(&key).map(Self::TableColumnSequence),
178			KeyKind::SystemSequence => SystemSequenceKey::decode(&key).map(Self::SystemSequence),
179			KeyKind::SystemVersion => SystemVersionKey::decode(&key).map(Self::SystemVersion),
180			KeyKind::TransactionVersion => {
181				TransactionVersionKey::decode(&key).map(Self::TransactionVersion)
182			}
183			KeyKind::View => ViewKey::decode(&key).map(Self::View),
184			KeyKind::PrimaryKey => PrimaryKeyKey::decode(&key).map(Self::PrimaryKey),
185			KeyKind::RingBuffer => RingBufferKey::decode(&key).map(Self::RingBuffer),
186			KeyKind::RingBufferMetadata => {
187				RingBufferMetadataKey::decode(&key).map(Self::RingBufferMetadata)
188			}
189			KeyKind::NamespaceRingBuffer => {
190				NamespaceRingBufferKey::decode(&key).map(Self::NamespaceRingBuffer)
191			}
192			KeyKind::SourceRetentionPolicy => {
193				SourceRetentionPolicyKey::decode(&key).map(Self::SourceRetentionPolicy)
194			}
195			KeyKind::OperatorRetentionPolicy => {
196				OperatorRetentionPolicyKey::decode(&key).map(Self::OperatorRetentionPolicy)
197			}
198		}
199	}
200}
201
202#[cfg(test)]
203mod tests {
204	use reifydb_type::RowNumber;
205
206	use super::{
207		ColumnKey, ColumnPolicyKey, ColumnSequenceKey, ColumnsKey, FlowNodeStateKey, Key, NamespaceKey,
208		NamespaceTableKey, SystemSequenceKey, TableKey,
209	};
210	use crate::{
211		interface::{
212			FlowNodeId, SourceId,
213			catalog::{ColumnId, ColumnPolicyId, IndexId, NamespaceId, SequenceId, TableId},
214		},
215		key::{
216			index::IndexKey, row::RowKey, row_sequence::RowSequenceKey,
217			transaction_version::TransactionVersionKey,
218		},
219	};
220
221	#[test]
222	fn test_table_columns() {
223		let key = Key::Columns(ColumnsKey {
224			column: ColumnId(42),
225		});
226
227		let encoded = key.encode();
228		let decoded = Key::decode(&encoded).expect("Failed to decode key");
229
230		match decoded {
231			Key::Columns(decoded_inner) => {
232				assert_eq!(decoded_inner.column, 42);
233			}
234			_ => unreachable!(),
235		}
236	}
237
238	#[test]
239	fn test_column() {
240		let key = Key::Column(ColumnKey {
241			source: SourceId::table(1),
242			column: ColumnId(42),
243		});
244
245		let encoded = key.encode();
246		let decoded = Key::decode(&encoded).expect("Failed to decode key");
247
248		match decoded {
249			Key::Column(decoded_inner) => {
250				assert_eq!(decoded_inner.source, SourceId::table(1));
251				assert_eq!(decoded_inner.column, 42);
252			}
253			_ => unreachable!(),
254		}
255	}
256
257	#[test]
258	fn test_column_policy() {
259		let key = Key::TableColumnPolicy(ColumnPolicyKey {
260			column: ColumnId(42),
261			policy: ColumnPolicyId(999_999),
262		});
263
264		let encoded = key.encode();
265		let decoded = Key::decode(&encoded).expect("Failed to decode key");
266
267		match decoded {
268			Key::TableColumnPolicy(decoded_inner) => {
269				assert_eq!(decoded_inner.column, 42);
270				assert_eq!(decoded_inner.policy, 999_999);
271			}
272			_ => unreachable!(),
273		}
274	}
275
276	#[test]
277	fn test_namespace() {
278		let key = Key::Namespace(NamespaceKey {
279			namespace: NamespaceId(42),
280		});
281
282		let encoded = key.encode();
283		let decoded = Key::decode(&encoded).expect("Failed to decode key");
284
285		match decoded {
286			Key::Namespace(decoded_inner) => {
287				assert_eq!(decoded_inner.namespace, 42);
288			}
289			_ => unreachable!(),
290		}
291	}
292
293	#[test]
294	fn test_namespace_table() {
295		let key = Key::NamespaceTable(NamespaceTableKey {
296			namespace: NamespaceId(42),
297			table: TableId(999_999),
298		});
299
300		let encoded = key.encode();
301		let decoded = Key::decode(&encoded).expect("Failed to decode key");
302
303		match decoded {
304			Key::NamespaceTable(decoded_inner) => {
305				assert_eq!(decoded_inner.namespace, 42);
306				assert_eq!(decoded_inner.table, 999_999);
307			}
308			_ => unreachable!(),
309		}
310	}
311
312	#[test]
313	fn test_system_sequence() {
314		let key = Key::SystemSequence(SystemSequenceKey {
315			sequence: SequenceId(42),
316		});
317
318		let encoded = key.encode();
319		let decoded = Key::decode(&encoded).expect("Failed to decode key");
320
321		match decoded {
322			Key::SystemSequence(decoded_inner) => {
323				assert_eq!(decoded_inner.sequence, 42);
324			}
325			_ => unreachable!(),
326		}
327	}
328
329	#[test]
330	fn test_table() {
331		let key = Key::Table(TableKey {
332			table: TableId(42),
333		});
334
335		let encoded = key.encode();
336		let decoded = Key::decode(&encoded).expect("Failed to decode key");
337
338		match decoded {
339			Key::Table(decoded_inner) => {
340				assert_eq!(decoded_inner.table, 42);
341			}
342			_ => unreachable!(),
343		}
344	}
345
346	#[test]
347	fn test_index() {
348		let key = Key::Index(IndexKey {
349			source: SourceId::table(42),
350			index: IndexId::primary(999_999),
351		});
352
353		let encoded = key.encode();
354		let decoded = Key::decode(&encoded).expect("Failed to decode key");
355
356		match decoded {
357			Key::Index(decoded_inner) => {
358				assert_eq!(decoded_inner.source, SourceId::table(42));
359				assert_eq!(decoded_inner.index, 999_999);
360			}
361			_ => unreachable!(),
362		}
363	}
364
365	#[test]
366	fn test_row() {
367		let key = Key::Row(RowKey {
368			source: SourceId::table(42),
369			row: RowNumber(999_999),
370		});
371
372		let encoded = key.encode();
373		let decoded = Key::decode(&encoded).expect("Failed to decode key");
374
375		match decoded {
376			Key::Row(decoded_inner) => {
377				assert_eq!(decoded_inner.source, SourceId::table(42));
378				assert_eq!(decoded_inner.row, 999_999);
379			}
380			_ => unreachable!(),
381		}
382	}
383
384	#[test]
385	fn test_row_sequence() {
386		let key = Key::RowSequence(RowSequenceKey {
387			source: SourceId::table(42),
388		});
389
390		let encoded = key.encode();
391		let decoded = Key::decode(&encoded).expect("Failed to decode key");
392
393		match decoded {
394			Key::RowSequence(decoded_inner) => {
395				assert_eq!(decoded_inner.source, SourceId::table(42));
396			}
397			_ => unreachable!(),
398		}
399	}
400
401	#[test]
402	fn test_column_sequence() {
403		let key = Key::TableColumnSequence(ColumnSequenceKey {
404			source: SourceId::table(42),
405			column: ColumnId(123),
406		});
407
408		let encoded = key.encode();
409		let decoded = Key::decode(&encoded).expect("Failed to decode key");
410
411		match decoded {
412			Key::TableColumnSequence(decoded_inner) => {
413				assert_eq!(decoded_inner.source, SourceId::table(42));
414				assert_eq!(decoded_inner.column, 123);
415			}
416			_ => unreachable!(),
417		}
418	}
419
420	#[test]
421	fn test_transaction_version() {
422		let key = Key::TransactionVersion(TransactionVersionKey {});
423		let encoded = key.encode();
424		Key::decode(&encoded).expect("Failed to decode key");
425	}
426
427	#[test]
428	fn test_operator_state() {
429		let key = Key::FlowNodeState(FlowNodeStateKey {
430			node: FlowNodeId(0xCAFEBABE),
431			key: vec![1, 2, 3],
432		});
433
434		let encoded = key.encode();
435		let decoded = Key::decode(&encoded).expect("Failed to decode key");
436
437		match decoded {
438			Key::FlowNodeState(decoded_inner) => {
439				assert_eq!(decoded_inner.node, 0xCAFEBABE);
440				assert_eq!(decoded_inner.key, vec![1, 2, 3]);
441			}
442			_ => unreachable!(),
443		}
444	}
445}