1use cdc_consumer::CdcConsumerKey;
5use column::ColumnKey;
6use column_policy::ColumnPolicyKey;
7use column_sequence::ColumnSequenceKey;
8use columns::ColumnsKey;
9use dictionary::{DictionaryEntryIndexKey, DictionaryEntryKey, DictionaryKey, DictionarySequenceKey};
10use flow::FlowKey;
11use flow_node_internal_state::FlowNodeInternalStateKey;
12use flow_node_state::FlowNodeStateKey;
13use index::IndexKey;
14use index_entry::IndexEntryKey;
15use kind::KeyKind;
16use namespace::NamespaceKey;
17use namespace_dictionary::NamespaceDictionaryKey;
18use namespace_flow::NamespaceFlowKey;
19use namespace_ringbuffer::NamespaceRingBufferKey;
20use namespace_sumtype::NamespaceSumTypeKey;
21use namespace_table::NamespaceTableKey;
22use namespace_view::NamespaceViewKey;
23use primary_key::PrimaryKeyKey;
24use retention_policy::{OperatorRetentionPolicyKey, PrimitiveRetentionPolicyKey};
25use ringbuffer::{RingBufferKey, RingBufferMetadataKey};
26use row::RowKey;
27use row_sequence::RowSequenceKey;
28use subscription::SubscriptionKey;
29use subscription_column::SubscriptionColumnKey;
30use subscription_row::SubscriptionRowKey;
31use sumtype::SumTypeKey;
32use system_sequence::SystemSequenceKey;
33use system_version::SystemVersionKey;
34use table::TableKey;
35use transaction_version::TransactionVersionKey;
36use view::ViewKey;
37
38use crate::{
39 encoded::key::{EncodedKey, EncodedKeyRange},
40 util::encoding::keycode,
41};
42
43pub mod cdc_consumer;
44pub mod cdc_exclude;
45pub mod column;
46pub mod column_policy;
47pub mod column_sequence;
48pub mod columns;
49pub mod dictionary;
50pub mod flow;
51pub mod flow_edge;
52pub mod flow_node;
53pub mod flow_node_internal_state;
54pub mod flow_node_state;
55pub mod flow_version;
56pub mod index;
57pub mod index_entry;
58pub mod kind;
59pub mod namespace;
60pub mod namespace_dictionary;
61pub mod namespace_flow;
62pub mod namespace_ringbuffer;
63pub mod namespace_sumtype;
64pub mod namespace_table;
65pub mod namespace_view;
66pub mod primary_key;
67pub mod retention_policy;
68pub mod ringbuffer;
69pub mod row;
70pub mod row_sequence;
71pub mod schema;
72pub mod subscription;
73pub mod subscription_column;
74pub mod subscription_row;
75pub mod sumtype;
76pub mod system_sequence;
77pub mod system_version;
78pub mod table;
79pub mod transaction_version;
80pub mod view;
81#[derive(Debug)]
82pub enum Key {
83 CdcConsumer(CdcConsumerKey),
84 Namespace(NamespaceKey),
85 NamespaceTable(NamespaceTableKey),
86 NamespaceView(NamespaceViewKey),
87 NamespaceFlow(NamespaceFlowKey),
88 SystemSequence(SystemSequenceKey),
89 Table(TableKey),
90 Flow(FlowKey),
91 Column(ColumnKey),
92 Columns(ColumnsKey),
93 Index(IndexKey),
94 IndexEntry(IndexEntryKey),
95 FlowNodeState(FlowNodeStateKey),
96 FlowNodeInternalState(FlowNodeInternalStateKey),
97 PrimaryKey(PrimaryKeyKey),
98 Row(RowKey),
99 RowSequence(RowSequenceKey),
100 TableColumnSequence(ColumnSequenceKey),
101 TableColumnPolicy(ColumnPolicyKey),
102 SystemVersion(SystemVersionKey),
103 TransactionVersion(TransactionVersionKey),
104 View(ViewKey),
105 RingBuffer(RingBufferKey),
106 RingBufferMetadata(RingBufferMetadataKey),
107 NamespaceRingBuffer(NamespaceRingBufferKey),
108 PrimitiveRetentionPolicy(PrimitiveRetentionPolicyKey),
109 OperatorRetentionPolicy(OperatorRetentionPolicyKey),
110 Dictionary(DictionaryKey),
111 DictionaryEntry(DictionaryEntryKey),
112 DictionaryEntryIndex(DictionaryEntryIndexKey),
113 DictionarySequence(DictionarySequenceKey),
114 NamespaceDictionary(NamespaceDictionaryKey),
115 SumType(SumTypeKey),
116 NamespaceSumType(NamespaceSumTypeKey),
117 Subscription(SubscriptionKey),
118 SubscriptionColumn(SubscriptionColumnKey),
119 SubscriptionRow(SubscriptionRowKey),
120}
121
122impl Key {
123 pub fn encode(&self) -> EncodedKey {
124 match &self {
125 Key::CdcConsumer(key) => key.encode(),
126 Key::Namespace(key) => key.encode(),
127 Key::NamespaceTable(key) => key.encode(),
128 Key::NamespaceView(key) => key.encode(),
129 Key::NamespaceFlow(key) => key.encode(),
130 Key::Table(key) => key.encode(),
131 Key::Flow(key) => key.encode(),
132 Key::Column(key) => key.encode(),
133 Key::Columns(key) => key.encode(),
134 Key::TableColumnPolicy(key) => key.encode(),
135 Key::Index(key) => key.encode(),
136 Key::IndexEntry(key) => key.encode(),
137 Key::FlowNodeState(key) => key.encode(),
138 Key::FlowNodeInternalState(key) => key.encode(),
139 Key::PrimaryKey(key) => key.encode(),
140 Key::Row(key) => key.encode(),
141 Key::RowSequence(key) => key.encode(),
142 Key::TableColumnSequence(key) => key.encode(),
143 Key::SystemSequence(key) => key.encode(),
144 Key::SystemVersion(key) => key.encode(),
145 Key::TransactionVersion(key) => key.encode(),
146 Key::View(key) => key.encode(),
147 Key::RingBuffer(key) => key.encode(),
148 Key::RingBufferMetadata(key) => key.encode(),
149 Key::NamespaceRingBuffer(key) => key.encode(),
150 Key::PrimitiveRetentionPolicy(key) => key.encode(),
151 Key::OperatorRetentionPolicy(key) => key.encode(),
152 Key::Dictionary(key) => key.encode(),
153 Key::DictionaryEntry(key) => key.encode(),
154 Key::DictionaryEntryIndex(key) => key.encode(),
155 Key::DictionarySequence(key) => key.encode(),
156 Key::NamespaceDictionary(key) => key.encode(),
157 Key::SumType(key) => key.encode(),
158 Key::NamespaceSumType(key) => key.encode(),
159 Key::Subscription(key) => key.encode(),
160 Key::SubscriptionColumn(key) => key.encode(),
161 Key::SubscriptionRow(key) => key.encode(),
162 }
163 }
164}
165
166pub trait EncodableKey {
167 const KIND: KeyKind;
168
169 fn encode(&self) -> EncodedKey;
170
171 fn decode(key: &EncodedKey) -> Option<Self>
172 where
173 Self: Sized;
174}
175
176pub trait EncodableKeyRange {
177 const KIND: KeyKind;
178
179 fn start(&self) -> Option<EncodedKey>;
180
181 fn end(&self) -> Option<EncodedKey>;
182
183 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
184 where
185 Self: Sized;
186}
187
188impl Key {
189 pub fn kind(key: impl AsRef<[u8]>) -> Option<KeyKind> {
190 let key = key.as_ref();
191 if key.len() < 2 {
192 return None;
193 }
194
195 keycode::deserialize(&key[1..2]).ok()
196 }
197
198 pub fn decode(key: &EncodedKey) -> Option<Self> {
199 if key.len() < 2 {
200 return None;
201 }
202
203 let kind: KeyKind = keycode::deserialize(&key[1..2]).ok()?;
204 match kind {
205 KeyKind::CdcConsumer => CdcConsumerKey::decode(&key).map(Self::CdcConsumer),
206 KeyKind::Columns => ColumnsKey::decode(&key).map(Self::Columns),
207 KeyKind::ColumnPolicy => ColumnPolicyKey::decode(&key).map(Self::TableColumnPolicy),
208 KeyKind::Namespace => NamespaceKey::decode(&key).map(Self::Namespace),
209 KeyKind::NamespaceTable => NamespaceTableKey::decode(&key).map(Self::NamespaceTable),
210 KeyKind::NamespaceView => NamespaceViewKey::decode(&key).map(Self::NamespaceView),
211 KeyKind::NamespaceFlow => NamespaceFlowKey::decode(&key).map(Self::NamespaceFlow),
212 KeyKind::Table => TableKey::decode(&key).map(Self::Table),
213 KeyKind::Flow => FlowKey::decode(&key).map(Self::Flow),
214 KeyKind::Column => ColumnKey::decode(&key).map(Self::Column),
215 KeyKind::Index => IndexKey::decode(&key).map(Self::Index),
216 KeyKind::IndexEntry => IndexEntryKey::decode(&key).map(Self::IndexEntry),
217 KeyKind::FlowNodeState => FlowNodeStateKey::decode(&key).map(Self::FlowNodeState),
218 KeyKind::FlowNodeInternalState => {
219 FlowNodeInternalStateKey::decode(&key).map(Self::FlowNodeInternalState)
220 }
221 KeyKind::Row => RowKey::decode(&key).map(Self::Row),
222 KeyKind::RowSequence => RowSequenceKey::decode(&key).map(Self::RowSequence),
223 KeyKind::ColumnSequence => ColumnSequenceKey::decode(&key).map(Self::TableColumnSequence),
224 KeyKind::SystemSequence => SystemSequenceKey::decode(&key).map(Self::SystemSequence),
225 KeyKind::SystemVersion => SystemVersionKey::decode(&key).map(Self::SystemVersion),
226 KeyKind::TransactionVersion => {
227 TransactionVersionKey::decode(&key).map(Self::TransactionVersion)
228 }
229 KeyKind::View => ViewKey::decode(&key).map(Self::View),
230 KeyKind::PrimaryKey => PrimaryKeyKey::decode(&key).map(Self::PrimaryKey),
231 KeyKind::RingBuffer => RingBufferKey::decode(&key).map(Self::RingBuffer),
232 KeyKind::RingBufferMetadata => {
233 RingBufferMetadataKey::decode(&key).map(Self::RingBufferMetadata)
234 }
235 KeyKind::NamespaceRingBuffer => {
236 NamespaceRingBufferKey::decode(&key).map(Self::NamespaceRingBuffer)
237 }
238 KeyKind::PrimitiveRetentionPolicy => {
239 PrimitiveRetentionPolicyKey::decode(&key).map(Self::PrimitiveRetentionPolicy)
240 }
241 KeyKind::OperatorRetentionPolicy => {
242 OperatorRetentionPolicyKey::decode(&key).map(Self::OperatorRetentionPolicy)
243 }
244 KeyKind::FlowNode
245 | KeyKind::FlowNodeByFlow
246 | KeyKind::FlowEdge
247 | KeyKind::FlowEdgeByFlow
248 | KeyKind::FlowVersion => {
249 None
251 }
252 KeyKind::Dictionary => DictionaryKey::decode(&key).map(Self::Dictionary),
253 KeyKind::DictionaryEntry => DictionaryEntryKey::decode(&key).map(Self::DictionaryEntry),
254 KeyKind::DictionaryEntryIndex => {
255 DictionaryEntryIndexKey::decode(&key).map(Self::DictionaryEntryIndex)
256 }
257 KeyKind::DictionarySequence => {
258 DictionarySequenceKey::decode(&key).map(Self::DictionarySequence)
259 }
260 KeyKind::NamespaceDictionary => {
261 NamespaceDictionaryKey::decode(&key).map(Self::NamespaceDictionary)
262 }
263 KeyKind::SumType => SumTypeKey::decode(&key).map(Self::SumType),
264 KeyKind::NamespaceSumType => NamespaceSumTypeKey::decode(&key).map(Self::NamespaceSumType),
265 KeyKind::Metric => {
266 None
268 }
269 KeyKind::Subscription => SubscriptionKey::decode(&key).map(Self::Subscription),
270 KeyKind::SubscriptionColumn => {
271 SubscriptionColumnKey::decode(&key).map(Self::SubscriptionColumn)
272 }
273 KeyKind::SubscriptionRow => SubscriptionRowKey::decode(&key).map(Self::SubscriptionRow),
274 KeyKind::Schema | KeyKind::SchemaField => {
275 None
277 }
278 }
279 }
280}
281
282#[cfg(test)]
283pub mod tests {
284 use reifydb_type::value::{row_number::RowNumber, sumtype::SumTypeId};
285
286 use crate::{
287 interface::catalog::{
288 flow::FlowNodeId,
289 id::{ColumnId, ColumnPolicyId, IndexId, NamespaceId, SequenceId, TableId},
290 primitive::PrimitiveId,
291 },
292 key::{
293 Key, column::ColumnKey, column_policy::ColumnPolicyKey, column_sequence::ColumnSequenceKey,
294 columns::ColumnsKey, flow_node_state::FlowNodeStateKey, index::IndexKey,
295 namespace::NamespaceKey, namespace_sumtype::NamespaceSumTypeKey,
296 namespace_table::NamespaceTableKey, row::RowKey, row_sequence::RowSequenceKey,
297 sumtype::SumTypeKey, system_sequence::SystemSequenceKey, table::TableKey,
298 transaction_version::TransactionVersionKey,
299 },
300 };
301
302 #[test]
303 fn test_table_columns() {
304 let key = Key::Columns(ColumnsKey {
305 column: ColumnId(42),
306 });
307
308 let encoded = key.encode();
309 let decoded = Key::decode(&encoded).expect("Failed to decode key");
310
311 match decoded {
312 Key::Columns(decoded_inner) => {
313 assert_eq!(decoded_inner.column, 42);
314 }
315 _ => unreachable!(),
316 }
317 }
318
319 #[test]
320 fn test_column() {
321 let key = Key::Column(ColumnKey {
322 primitive: PrimitiveId::table(1),
323 column: ColumnId(42),
324 });
325
326 let encoded = key.encode();
327 let decoded = Key::decode(&encoded).expect("Failed to decode key");
328
329 match decoded {
330 Key::Column(decoded_inner) => {
331 assert_eq!(decoded_inner.primitive, PrimitiveId::table(1));
332 assert_eq!(decoded_inner.column, 42);
333 }
334 _ => unreachable!(),
335 }
336 }
337
338 #[test]
339 fn test_column_policy() {
340 let key = Key::TableColumnPolicy(ColumnPolicyKey {
341 column: ColumnId(42),
342 policy: ColumnPolicyId(999_999),
343 });
344
345 let encoded = key.encode();
346 let decoded = Key::decode(&encoded).expect("Failed to decode key");
347
348 match decoded {
349 Key::TableColumnPolicy(decoded_inner) => {
350 assert_eq!(decoded_inner.column, 42);
351 assert_eq!(decoded_inner.policy, 999_999);
352 }
353 _ => unreachable!(),
354 }
355 }
356
357 #[test]
358 fn test_namespace() {
359 let key = Key::Namespace(NamespaceKey {
360 namespace: NamespaceId(42),
361 });
362
363 let encoded = key.encode();
364 let decoded = Key::decode(&encoded).expect("Failed to decode key");
365
366 match decoded {
367 Key::Namespace(decoded_inner) => {
368 assert_eq!(decoded_inner.namespace, 42);
369 }
370 _ => unreachable!(),
371 }
372 }
373
374 #[test]
375 fn test_namespace_table() {
376 let key = Key::NamespaceTable(NamespaceTableKey {
377 namespace: NamespaceId(42),
378 table: TableId(999_999),
379 });
380
381 let encoded = key.encode();
382 let decoded = Key::decode(&encoded).expect("Failed to decode key");
383
384 match decoded {
385 Key::NamespaceTable(decoded_inner) => {
386 assert_eq!(decoded_inner.namespace, 42);
387 assert_eq!(decoded_inner.table, 999_999);
388 }
389 _ => unreachable!(),
390 }
391 }
392
393 #[test]
394 fn test_system_sequence() {
395 let key = Key::SystemSequence(SystemSequenceKey {
396 sequence: SequenceId(42),
397 });
398
399 let encoded = key.encode();
400 let decoded = Key::decode(&encoded).expect("Failed to decode key");
401
402 match decoded {
403 Key::SystemSequence(decoded_inner) => {
404 assert_eq!(decoded_inner.sequence, 42);
405 }
406 _ => unreachable!(),
407 }
408 }
409
410 #[test]
411 fn test_table() {
412 let key = Key::Table(TableKey {
413 table: TableId(42),
414 });
415
416 let encoded = key.encode();
417 let decoded = Key::decode(&encoded).expect("Failed to decode key");
418
419 match decoded {
420 Key::Table(decoded_inner) => {
421 assert_eq!(decoded_inner.table, 42);
422 }
423 _ => unreachable!(),
424 }
425 }
426
427 #[test]
428 fn test_index() {
429 let key = Key::Index(IndexKey {
430 primitive: PrimitiveId::table(42),
431 index: IndexId::primary(999_999),
432 });
433
434 let encoded = key.encode();
435 let decoded = Key::decode(&encoded).expect("Failed to decode key");
436
437 match decoded {
438 Key::Index(decoded_inner) => {
439 assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
440 assert_eq!(decoded_inner.index, 999_999);
441 }
442 _ => unreachable!(),
443 }
444 }
445
446 #[test]
447 fn test_row() {
448 let key = Key::Row(RowKey {
449 primitive: PrimitiveId::table(42),
450 row: RowNumber(999_999),
451 });
452
453 let encoded = key.encode();
454 let decoded = Key::decode(&encoded).expect("Failed to decode key");
455
456 match decoded {
457 Key::Row(decoded_inner) => {
458 assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
459 assert_eq!(decoded_inner.row, 999_999);
460 }
461 _ => unreachable!(),
462 }
463 }
464
465 #[test]
466 fn test_row_sequence() {
467 let key = Key::RowSequence(RowSequenceKey {
468 primitive: PrimitiveId::table(42),
469 });
470
471 let encoded = key.encode();
472 let decoded = Key::decode(&encoded).expect("Failed to decode key");
473
474 match decoded {
475 Key::RowSequence(decoded_inner) => {
476 assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
477 }
478 _ => unreachable!(),
479 }
480 }
481
482 #[test]
483 fn test_column_sequence() {
484 let key = Key::TableColumnSequence(ColumnSequenceKey {
485 primitive: PrimitiveId::table(42),
486 column: ColumnId(123),
487 });
488
489 let encoded = key.encode();
490 let decoded = Key::decode(&encoded).expect("Failed to decode key");
491
492 match decoded {
493 Key::TableColumnSequence(decoded_inner) => {
494 assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
495 assert_eq!(decoded_inner.column, 123);
496 }
497 _ => unreachable!(),
498 }
499 }
500
501 #[test]
502 fn test_transaction_version() {
503 let key = Key::TransactionVersion(TransactionVersionKey {});
504 let encoded = key.encode();
505 Key::decode(&encoded).expect("Failed to decode key");
506 }
507
508 #[test]
509 fn test_operator_state() {
510 let key = Key::FlowNodeState(FlowNodeStateKey {
511 node: FlowNodeId(0xCAFEBABE),
512 key: vec![1, 2, 3],
513 });
514
515 let encoded = key.encode();
516 let decoded = Key::decode(&encoded).expect("Failed to decode key");
517
518 match decoded {
519 Key::FlowNodeState(decoded_inner) => {
520 assert_eq!(decoded_inner.node, 0xCAFEBABE);
521 assert_eq!(decoded_inner.key, vec![1, 2, 3]);
522 }
523 _ => unreachable!(),
524 }
525 }
526
527 #[test]
528 fn test_sumtype_key() {
529 let key = Key::SumType(SumTypeKey {
530 sumtype: SumTypeId(42),
531 });
532
533 let encoded = key.encode();
534 let decoded = Key::decode(&encoded).expect("Failed to decode key");
535
536 match decoded {
537 Key::SumType(decoded_inner) => {
538 assert_eq!(decoded_inner.sumtype, 42);
539 }
540 _ => unreachable!(),
541 }
542 }
543
544 #[test]
545 fn test_namespace_sumtype_key() {
546 let key = Key::NamespaceSumType(NamespaceSumTypeKey {
547 namespace: NamespaceId(42),
548 sumtype: SumTypeId(999_999),
549 });
550
551 let encoded = key.encode();
552 let decoded = Key::decode(&encoded).expect("Failed to decode key");
553
554 match decoded {
555 Key::NamespaceSumType(decoded_inner) => {
556 assert_eq!(decoded_inner.namespace, 42);
557 assert_eq!(decoded_inner.sumtype, 999_999);
558 }
559 _ => unreachable!(),
560 }
561 }
562}