Skip to main content

reifydb_core/key/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use cdc_consumer::CdcConsumerKey;
5use column::ColumnKey;
6use column_policy::ColumnPolicyKey;
7use column_sequence::ColumnSequenceKey;
8use columns::ColumnsKey;
9use dictionary::{DictionaryEntryIndexKey, DictionaryEntryKey, DictionaryKey, DictionarySequenceKey};
10use flow::FlowKey;
11use flow_node_internal_state::FlowNodeInternalStateKey;
12use flow_node_state::FlowNodeStateKey;
13use index::IndexKey;
14use index_entry::IndexEntryKey;
15use kind::KeyKind;
16use namespace::NamespaceKey;
17use namespace_dictionary::NamespaceDictionaryKey;
18use namespace_flow::NamespaceFlowKey;
19use namespace_ringbuffer::NamespaceRingBufferKey;
20use namespace_sumtype::NamespaceSumTypeKey;
21use namespace_table::NamespaceTableKey;
22use namespace_view::NamespaceViewKey;
23use primary_key::PrimaryKeyKey;
24use retention_policy::{OperatorRetentionPolicyKey, PrimitiveRetentionPolicyKey};
25use ringbuffer::{RingBufferKey, RingBufferMetadataKey};
26use row::RowKey;
27use row_sequence::RowSequenceKey;
28use subscription::SubscriptionKey;
29use subscription_column::SubscriptionColumnKey;
30use subscription_row::SubscriptionRowKey;
31use sumtype::SumTypeKey;
32use system_sequence::SystemSequenceKey;
33use system_version::SystemVersionKey;
34use table::TableKey;
35use transaction_version::TransactionVersionKey;
36use view::ViewKey;
37
38use crate::{
39	encoded::key::{EncodedKey, EncodedKeyRange},
40	util::encoding::keycode,
41};
42
43pub mod cdc_consumer;
44pub mod cdc_exclude;
45pub mod column;
46pub mod column_policy;
47pub mod column_sequence;
48pub mod columns;
49pub mod dictionary;
50pub mod flow;
51pub mod flow_edge;
52pub mod flow_node;
53pub mod flow_node_internal_state;
54pub mod flow_node_state;
55pub mod flow_version;
56pub mod index;
57pub mod index_entry;
58pub mod kind;
59pub mod namespace;
60pub mod namespace_dictionary;
61pub mod namespace_flow;
62pub mod namespace_ringbuffer;
63pub mod namespace_sumtype;
64pub mod namespace_table;
65pub mod namespace_view;
66pub mod primary_key;
67pub mod retention_policy;
68pub mod ringbuffer;
69pub mod row;
70pub mod row_sequence;
71pub mod schema;
72pub mod subscription;
73pub mod subscription_column;
74pub mod subscription_row;
75pub mod sumtype;
76pub mod system_sequence;
77pub mod system_version;
78pub mod table;
79pub mod transaction_version;
80pub mod view;
81#[derive(Debug)]
82pub enum Key {
83	CdcConsumer(CdcConsumerKey),
84	Namespace(NamespaceKey),
85	NamespaceTable(NamespaceTableKey),
86	NamespaceView(NamespaceViewKey),
87	NamespaceFlow(NamespaceFlowKey),
88	SystemSequence(SystemSequenceKey),
89	Table(TableKey),
90	Flow(FlowKey),
91	Column(ColumnKey),
92	Columns(ColumnsKey),
93	Index(IndexKey),
94	IndexEntry(IndexEntryKey),
95	FlowNodeState(FlowNodeStateKey),
96	FlowNodeInternalState(FlowNodeInternalStateKey),
97	PrimaryKey(PrimaryKeyKey),
98	Row(RowKey),
99	RowSequence(RowSequenceKey),
100	TableColumnSequence(ColumnSequenceKey),
101	TableColumnPolicy(ColumnPolicyKey),
102	SystemVersion(SystemVersionKey),
103	TransactionVersion(TransactionVersionKey),
104	View(ViewKey),
105	RingBuffer(RingBufferKey),
106	RingBufferMetadata(RingBufferMetadataKey),
107	NamespaceRingBuffer(NamespaceRingBufferKey),
108	PrimitiveRetentionPolicy(PrimitiveRetentionPolicyKey),
109	OperatorRetentionPolicy(OperatorRetentionPolicyKey),
110	Dictionary(DictionaryKey),
111	DictionaryEntry(DictionaryEntryKey),
112	DictionaryEntryIndex(DictionaryEntryIndexKey),
113	DictionarySequence(DictionarySequenceKey),
114	NamespaceDictionary(NamespaceDictionaryKey),
115	SumType(SumTypeKey),
116	NamespaceSumType(NamespaceSumTypeKey),
117	Subscription(SubscriptionKey),
118	SubscriptionColumn(SubscriptionColumnKey),
119	SubscriptionRow(SubscriptionRowKey),
120}
121
122impl Key {
123	pub fn encode(&self) -> EncodedKey {
124		match &self {
125			Key::CdcConsumer(key) => key.encode(),
126			Key::Namespace(key) => key.encode(),
127			Key::NamespaceTable(key) => key.encode(),
128			Key::NamespaceView(key) => key.encode(),
129			Key::NamespaceFlow(key) => key.encode(),
130			Key::Table(key) => key.encode(),
131			Key::Flow(key) => key.encode(),
132			Key::Column(key) => key.encode(),
133			Key::Columns(key) => key.encode(),
134			Key::TableColumnPolicy(key) => key.encode(),
135			Key::Index(key) => key.encode(),
136			Key::IndexEntry(key) => key.encode(),
137			Key::FlowNodeState(key) => key.encode(),
138			Key::FlowNodeInternalState(key) => key.encode(),
139			Key::PrimaryKey(key) => key.encode(),
140			Key::Row(key) => key.encode(),
141			Key::RowSequence(key) => key.encode(),
142			Key::TableColumnSequence(key) => key.encode(),
143			Key::SystemSequence(key) => key.encode(),
144			Key::SystemVersion(key) => key.encode(),
145			Key::TransactionVersion(key) => key.encode(),
146			Key::View(key) => key.encode(),
147			Key::RingBuffer(key) => key.encode(),
148			Key::RingBufferMetadata(key) => key.encode(),
149			Key::NamespaceRingBuffer(key) => key.encode(),
150			Key::PrimitiveRetentionPolicy(key) => key.encode(),
151			Key::OperatorRetentionPolicy(key) => key.encode(),
152			Key::Dictionary(key) => key.encode(),
153			Key::DictionaryEntry(key) => key.encode(),
154			Key::DictionaryEntryIndex(key) => key.encode(),
155			Key::DictionarySequence(key) => key.encode(),
156			Key::NamespaceDictionary(key) => key.encode(),
157			Key::SumType(key) => key.encode(),
158			Key::NamespaceSumType(key) => key.encode(),
159			Key::Subscription(key) => key.encode(),
160			Key::SubscriptionColumn(key) => key.encode(),
161			Key::SubscriptionRow(key) => key.encode(),
162		}
163	}
164}
165
166pub trait EncodableKey {
167	const KIND: KeyKind;
168
169	fn encode(&self) -> EncodedKey;
170
171	fn decode(key: &EncodedKey) -> Option<Self>
172	where
173		Self: Sized;
174}
175
176pub trait EncodableKeyRange {
177	const KIND: KeyKind;
178
179	fn start(&self) -> Option<EncodedKey>;
180
181	fn end(&self) -> Option<EncodedKey>;
182
183	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
184	where
185		Self: Sized;
186}
187
188impl Key {
189	pub fn kind(key: impl AsRef<[u8]>) -> Option<KeyKind> {
190		let key = key.as_ref();
191		if key.len() < 2 {
192			return None;
193		}
194
195		keycode::deserialize(&key[1..2]).ok()
196	}
197
198	pub fn decode(key: &EncodedKey) -> Option<Self> {
199		if key.len() < 2 {
200			return None;
201		}
202
203		let kind: KeyKind = keycode::deserialize(&key[1..2]).ok()?;
204		match kind {
205			KeyKind::CdcConsumer => CdcConsumerKey::decode(&key).map(Self::CdcConsumer),
206			KeyKind::Columns => ColumnsKey::decode(&key).map(Self::Columns),
207			KeyKind::ColumnPolicy => ColumnPolicyKey::decode(&key).map(Self::TableColumnPolicy),
208			KeyKind::Namespace => NamespaceKey::decode(&key).map(Self::Namespace),
209			KeyKind::NamespaceTable => NamespaceTableKey::decode(&key).map(Self::NamespaceTable),
210			KeyKind::NamespaceView => NamespaceViewKey::decode(&key).map(Self::NamespaceView),
211			KeyKind::NamespaceFlow => NamespaceFlowKey::decode(&key).map(Self::NamespaceFlow),
212			KeyKind::Table => TableKey::decode(&key).map(Self::Table),
213			KeyKind::Flow => FlowKey::decode(&key).map(Self::Flow),
214			KeyKind::Column => ColumnKey::decode(&key).map(Self::Column),
215			KeyKind::Index => IndexKey::decode(&key).map(Self::Index),
216			KeyKind::IndexEntry => IndexEntryKey::decode(&key).map(Self::IndexEntry),
217			KeyKind::FlowNodeState => FlowNodeStateKey::decode(&key).map(Self::FlowNodeState),
218			KeyKind::FlowNodeInternalState => {
219				FlowNodeInternalStateKey::decode(&key).map(Self::FlowNodeInternalState)
220			}
221			KeyKind::Row => RowKey::decode(&key).map(Self::Row),
222			KeyKind::RowSequence => RowSequenceKey::decode(&key).map(Self::RowSequence),
223			KeyKind::ColumnSequence => ColumnSequenceKey::decode(&key).map(Self::TableColumnSequence),
224			KeyKind::SystemSequence => SystemSequenceKey::decode(&key).map(Self::SystemSequence),
225			KeyKind::SystemVersion => SystemVersionKey::decode(&key).map(Self::SystemVersion),
226			KeyKind::TransactionVersion => {
227				TransactionVersionKey::decode(&key).map(Self::TransactionVersion)
228			}
229			KeyKind::View => ViewKey::decode(&key).map(Self::View),
230			KeyKind::PrimaryKey => PrimaryKeyKey::decode(&key).map(Self::PrimaryKey),
231			KeyKind::RingBuffer => RingBufferKey::decode(&key).map(Self::RingBuffer),
232			KeyKind::RingBufferMetadata => {
233				RingBufferMetadataKey::decode(&key).map(Self::RingBufferMetadata)
234			}
235			KeyKind::NamespaceRingBuffer => {
236				NamespaceRingBufferKey::decode(&key).map(Self::NamespaceRingBuffer)
237			}
238			KeyKind::PrimitiveRetentionPolicy => {
239				PrimitiveRetentionPolicyKey::decode(&key).map(Self::PrimitiveRetentionPolicy)
240			}
241			KeyKind::OperatorRetentionPolicy => {
242				OperatorRetentionPolicyKey::decode(&key).map(Self::OperatorRetentionPolicy)
243			}
244			KeyKind::FlowNode
245			| KeyKind::FlowNodeByFlow
246			| KeyKind::FlowEdge
247			| KeyKind::FlowEdgeByFlow
248			| KeyKind::FlowVersion => {
249				// These keys are used directly via EncodableKey trait, not through Key enum
250				None
251			}
252			KeyKind::Dictionary => DictionaryKey::decode(&key).map(Self::Dictionary),
253			KeyKind::DictionaryEntry => DictionaryEntryKey::decode(&key).map(Self::DictionaryEntry),
254			KeyKind::DictionaryEntryIndex => {
255				DictionaryEntryIndexKey::decode(&key).map(Self::DictionaryEntryIndex)
256			}
257			KeyKind::DictionarySequence => {
258				DictionarySequenceKey::decode(&key).map(Self::DictionarySequence)
259			}
260			KeyKind::NamespaceDictionary => {
261				NamespaceDictionaryKey::decode(&key).map(Self::NamespaceDictionary)
262			}
263			KeyKind::SumType => SumTypeKey::decode(&key).map(Self::SumType),
264			KeyKind::NamespaceSumType => NamespaceSumTypeKey::decode(&key).map(Self::NamespaceSumType),
265			KeyKind::Metric => {
266				// Storage tracker keys are used for internal persistence, not through Key enum
267				None
268			}
269			KeyKind::Subscription => SubscriptionKey::decode(&key).map(Self::Subscription),
270			KeyKind::SubscriptionColumn => {
271				SubscriptionColumnKey::decode(&key).map(Self::SubscriptionColumn)
272			}
273			KeyKind::SubscriptionRow => SubscriptionRowKey::decode(&key).map(Self::SubscriptionRow),
274			KeyKind::Schema | KeyKind::SchemaField => {
275				// Schema keys are used directly via EncodableKey trait, not through Key enum
276				None
277			}
278		}
279	}
280}
281
282#[cfg(test)]
283pub mod tests {
284	use reifydb_type::value::{row_number::RowNumber, sumtype::SumTypeId};
285
286	use crate::{
287		interface::catalog::{
288			flow::FlowNodeId,
289			id::{ColumnId, ColumnPolicyId, IndexId, NamespaceId, SequenceId, TableId},
290			primitive::PrimitiveId,
291		},
292		key::{
293			Key, column::ColumnKey, column_policy::ColumnPolicyKey, column_sequence::ColumnSequenceKey,
294			columns::ColumnsKey, flow_node_state::FlowNodeStateKey, index::IndexKey,
295			namespace::NamespaceKey, namespace_sumtype::NamespaceSumTypeKey,
296			namespace_table::NamespaceTableKey, row::RowKey, row_sequence::RowSequenceKey,
297			sumtype::SumTypeKey, system_sequence::SystemSequenceKey, table::TableKey,
298			transaction_version::TransactionVersionKey,
299		},
300	};
301
302	#[test]
303	fn test_table_columns() {
304		let key = Key::Columns(ColumnsKey {
305			column: ColumnId(42),
306		});
307
308		let encoded = key.encode();
309		let decoded = Key::decode(&encoded).expect("Failed to decode key");
310
311		match decoded {
312			Key::Columns(decoded_inner) => {
313				assert_eq!(decoded_inner.column, 42);
314			}
315			_ => unreachable!(),
316		}
317	}
318
319	#[test]
320	fn test_column() {
321		let key = Key::Column(ColumnKey {
322			primitive: PrimitiveId::table(1),
323			column: ColumnId(42),
324		});
325
326		let encoded = key.encode();
327		let decoded = Key::decode(&encoded).expect("Failed to decode key");
328
329		match decoded {
330			Key::Column(decoded_inner) => {
331				assert_eq!(decoded_inner.primitive, PrimitiveId::table(1));
332				assert_eq!(decoded_inner.column, 42);
333			}
334			_ => unreachable!(),
335		}
336	}
337
338	#[test]
339	fn test_column_policy() {
340		let key = Key::TableColumnPolicy(ColumnPolicyKey {
341			column: ColumnId(42),
342			policy: ColumnPolicyId(999_999),
343		});
344
345		let encoded = key.encode();
346		let decoded = Key::decode(&encoded).expect("Failed to decode key");
347
348		match decoded {
349			Key::TableColumnPolicy(decoded_inner) => {
350				assert_eq!(decoded_inner.column, 42);
351				assert_eq!(decoded_inner.policy, 999_999);
352			}
353			_ => unreachable!(),
354		}
355	}
356
357	#[test]
358	fn test_namespace() {
359		let key = Key::Namespace(NamespaceKey {
360			namespace: NamespaceId(42),
361		});
362
363		let encoded = key.encode();
364		let decoded = Key::decode(&encoded).expect("Failed to decode key");
365
366		match decoded {
367			Key::Namespace(decoded_inner) => {
368				assert_eq!(decoded_inner.namespace, 42);
369			}
370			_ => unreachable!(),
371		}
372	}
373
374	#[test]
375	fn test_namespace_table() {
376		let key = Key::NamespaceTable(NamespaceTableKey {
377			namespace: NamespaceId(42),
378			table: TableId(999_999),
379		});
380
381		let encoded = key.encode();
382		let decoded = Key::decode(&encoded).expect("Failed to decode key");
383
384		match decoded {
385			Key::NamespaceTable(decoded_inner) => {
386				assert_eq!(decoded_inner.namespace, 42);
387				assert_eq!(decoded_inner.table, 999_999);
388			}
389			_ => unreachable!(),
390		}
391	}
392
393	#[test]
394	fn test_system_sequence() {
395		let key = Key::SystemSequence(SystemSequenceKey {
396			sequence: SequenceId(42),
397		});
398
399		let encoded = key.encode();
400		let decoded = Key::decode(&encoded).expect("Failed to decode key");
401
402		match decoded {
403			Key::SystemSequence(decoded_inner) => {
404				assert_eq!(decoded_inner.sequence, 42);
405			}
406			_ => unreachable!(),
407		}
408	}
409
410	#[test]
411	fn test_table() {
412		let key = Key::Table(TableKey {
413			table: TableId(42),
414		});
415
416		let encoded = key.encode();
417		let decoded = Key::decode(&encoded).expect("Failed to decode key");
418
419		match decoded {
420			Key::Table(decoded_inner) => {
421				assert_eq!(decoded_inner.table, 42);
422			}
423			_ => unreachable!(),
424		}
425	}
426
427	#[test]
428	fn test_index() {
429		let key = Key::Index(IndexKey {
430			primitive: PrimitiveId::table(42),
431			index: IndexId::primary(999_999),
432		});
433
434		let encoded = key.encode();
435		let decoded = Key::decode(&encoded).expect("Failed to decode key");
436
437		match decoded {
438			Key::Index(decoded_inner) => {
439				assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
440				assert_eq!(decoded_inner.index, 999_999);
441			}
442			_ => unreachable!(),
443		}
444	}
445
446	#[test]
447	fn test_row() {
448		let key = Key::Row(RowKey {
449			primitive: PrimitiveId::table(42),
450			row: RowNumber(999_999),
451		});
452
453		let encoded = key.encode();
454		let decoded = Key::decode(&encoded).expect("Failed to decode key");
455
456		match decoded {
457			Key::Row(decoded_inner) => {
458				assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
459				assert_eq!(decoded_inner.row, 999_999);
460			}
461			_ => unreachable!(),
462		}
463	}
464
465	#[test]
466	fn test_row_sequence() {
467		let key = Key::RowSequence(RowSequenceKey {
468			primitive: PrimitiveId::table(42),
469		});
470
471		let encoded = key.encode();
472		let decoded = Key::decode(&encoded).expect("Failed to decode key");
473
474		match decoded {
475			Key::RowSequence(decoded_inner) => {
476				assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
477			}
478			_ => unreachable!(),
479		}
480	}
481
482	#[test]
483	fn test_column_sequence() {
484		let key = Key::TableColumnSequence(ColumnSequenceKey {
485			primitive: PrimitiveId::table(42),
486			column: ColumnId(123),
487		});
488
489		let encoded = key.encode();
490		let decoded = Key::decode(&encoded).expect("Failed to decode key");
491
492		match decoded {
493			Key::TableColumnSequence(decoded_inner) => {
494				assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
495				assert_eq!(decoded_inner.column, 123);
496			}
497			_ => unreachable!(),
498		}
499	}
500
501	#[test]
502	fn test_transaction_version() {
503		let key = Key::TransactionVersion(TransactionVersionKey {});
504		let encoded = key.encode();
505		Key::decode(&encoded).expect("Failed to decode key");
506	}
507
508	#[test]
509	fn test_operator_state() {
510		let key = Key::FlowNodeState(FlowNodeStateKey {
511			node: FlowNodeId(0xCAFEBABE),
512			key: vec![1, 2, 3],
513		});
514
515		let encoded = key.encode();
516		let decoded = Key::decode(&encoded).expect("Failed to decode key");
517
518		match decoded {
519			Key::FlowNodeState(decoded_inner) => {
520				assert_eq!(decoded_inner.node, 0xCAFEBABE);
521				assert_eq!(decoded_inner.key, vec![1, 2, 3]);
522			}
523			_ => unreachable!(),
524		}
525	}
526
527	#[test]
528	fn test_sumtype_key() {
529		let key = Key::SumType(SumTypeKey {
530			sumtype: SumTypeId(42),
531		});
532
533		let encoded = key.encode();
534		let decoded = Key::decode(&encoded).expect("Failed to decode key");
535
536		match decoded {
537			Key::SumType(decoded_inner) => {
538				assert_eq!(decoded_inner.sumtype, 42);
539			}
540			_ => unreachable!(),
541		}
542	}
543
544	#[test]
545	fn test_namespace_sumtype_key() {
546		let key = Key::NamespaceSumType(NamespaceSumTypeKey {
547			namespace: NamespaceId(42),
548			sumtype: SumTypeId(999_999),
549		});
550
551		let encoded = key.encode();
552		let decoded = Key::decode(&encoded).expect("Failed to decode key");
553
554		match decoded {
555			Key::NamespaceSumType(decoded_inner) => {
556				assert_eq!(decoded_inner.namespace, 42);
557				assert_eq!(decoded_inner.sumtype, 999_999);
558			}
559			_ => unreachable!(),
560		}
561	}
562}