1use authentication::AuthenticationKey;
5use cdc_consumer::CdcConsumerKey;
6use column::ColumnKey;
7use column_sequence::ColumnSequenceKey;
8use columns::ColumnsKey;
9use dictionary::{DictionaryEntryIndexKey, DictionaryEntryKey, DictionaryKey, DictionarySequenceKey};
10use flow::FlowKey;
11use flow_node_internal_state::FlowNodeInternalStateKey;
12use flow_node_state::FlowNodeStateKey;
13use granted_role::GrantedRoleKey;
14use handler::HandlerKey;
15use identity::IdentityKey;
16use index::IndexKey;
17use index_entry::IndexEntryKey;
18use kind::KeyKind;
19use namespace::NamespaceKey;
20use namespace_dictionary::NamespaceDictionaryKey;
21use namespace_flow::NamespaceFlowKey;
22use namespace_handler::NamespaceHandlerKey;
23use namespace_ringbuffer::NamespaceRingBufferKey;
24use namespace_series::NamespaceSeriesKey;
25use namespace_sink::NamespaceSinkKey;
26use namespace_source::NamespaceSourceKey;
27use namespace_sumtype::NamespaceSumTypeKey;
28use namespace_table::NamespaceTableKey;
29use namespace_view::NamespaceViewKey;
30use policy::PolicyKey;
31use policy_op::PolicyOpKey;
32use primary_key::PrimaryKeyKey;
33use property::ColumnPropertyKey;
34use retention_strategy::{OperatorRetentionStrategyKey, ShapeRetentionStrategyKey};
35use ringbuffer::{RingBufferKey, RingBufferMetadataKey};
36use role::RoleKey;
37use row::RowKey;
38use row_sequence::RowSequenceKey;
39use series::{SeriesKey, SeriesMetadataKey};
40use sink::SinkKey;
41use source::SourceKey;
42use sumtype::SumTypeKey;
43use system_sequence::SystemSequenceKey;
44use system_version::SystemVersionKey;
45use table::TableKey;
46use token::TokenKey;
47use transaction_version::TransactionVersionKey;
48use view::ViewKey;
49
50use crate::{
51 encoded::key::{EncodedKey, EncodedKeyRange},
52 util::encoding::keycode,
53};
54
55pub mod authentication;
56pub mod cdc_consumer;
57pub mod cdc_exclude;
58pub mod column;
59pub mod column_sequence;
60pub mod columns;
61pub mod config;
62pub mod dictionary;
63pub mod flow;
64pub mod flow_edge;
65pub mod flow_node;
66pub mod flow_node_internal_state;
67pub mod flow_node_state;
68pub mod flow_version;
69pub mod granted_role;
70pub mod handler;
71pub mod identity;
72pub mod index;
73pub mod index_entry;
74pub mod kind;
75pub mod migration;
76pub mod migration_event;
77pub mod namespace;
78pub mod namespace_dictionary;
79pub mod namespace_flow;
80pub mod namespace_handler;
81pub mod namespace_ringbuffer;
82pub mod namespace_series;
83pub mod namespace_sink;
84pub mod namespace_source;
85pub mod namespace_sumtype;
86pub mod namespace_table;
87pub mod namespace_view;
88pub mod policy;
89pub mod policy_op;
90pub mod primary_key;
91pub mod property;
92pub mod retention_strategy;
93pub mod ringbuffer;
94pub mod role;
95pub mod row;
96pub mod row_sequence;
97pub mod series;
98pub mod series_row;
99pub mod shape;
100pub mod sink;
101pub mod source;
102pub mod sumtype;
103pub mod system_sequence;
104pub mod system_version;
105pub mod table;
106pub mod token;
107pub mod transaction_version;
108pub mod ttl;
109pub mod variant_handler;
110pub mod view;
111#[derive(Debug)]
112pub enum Key {
113 CdcConsumer(CdcConsumerKey),
114 Namespace(NamespaceKey),
115 NamespaceTable(NamespaceTableKey),
116 NamespaceView(NamespaceViewKey),
117 NamespaceFlow(NamespaceFlowKey),
118 SystemSequence(SystemSequenceKey),
119 Table(TableKey),
120 Flow(FlowKey),
121 Column(ColumnKey),
122 Columns(ColumnsKey),
123 Index(IndexKey),
124 IndexEntry(IndexEntryKey),
125 FlowNodeState(FlowNodeStateKey),
126 FlowNodeInternalState(FlowNodeInternalStateKey),
127 PrimaryKey(PrimaryKeyKey),
128 Row(RowKey),
129 RowSequence(RowSequenceKey),
130 TableColumnSequence(ColumnSequenceKey),
131 TableColumnProperty(ColumnPropertyKey),
132 SystemVersion(SystemVersionKey),
133 TransactionVersion(TransactionVersionKey),
134 View(ViewKey),
135 RingBuffer(RingBufferKey),
136 RingBufferMetadata(RingBufferMetadataKey),
137 NamespaceRingBuffer(NamespaceRingBufferKey),
138 ShapeRetentionStrategy(ShapeRetentionStrategyKey),
139 OperatorRetentionStrategy(OperatorRetentionStrategyKey),
140 Dictionary(DictionaryKey),
141 DictionaryEntry(DictionaryEntryKey),
142 DictionaryEntryIndex(DictionaryEntryIndexKey),
143 DictionarySequence(DictionarySequenceKey),
144 NamespaceDictionary(NamespaceDictionaryKey),
145 SumType(SumTypeKey),
146 NamespaceSumType(NamespaceSumTypeKey),
147 Handler(HandlerKey),
148 NamespaceHandler(NamespaceHandlerKey),
149 Series(SeriesKey),
150 SeriesMetadata(SeriesMetadataKey),
151 NamespaceSeries(NamespaceSeriesKey),
152 Identity(IdentityKey),
153 Authentication(AuthenticationKey),
154 Role(RoleKey),
155 GrantedRole(GrantedRoleKey),
156 Policy(PolicyKey),
157 PolicyOp(PolicyOpKey),
158 Token(TokenKey),
159 Source(SourceKey),
160 NamespaceSource(NamespaceSourceKey),
161 Sink(SinkKey),
162 NamespaceSink(NamespaceSinkKey),
163}
164
165impl Key {
166 pub fn encode(&self) -> EncodedKey {
167 match &self {
168 Key::CdcConsumer(key) => key.encode(),
169 Key::Namespace(key) => key.encode(),
170 Key::NamespaceTable(key) => key.encode(),
171 Key::NamespaceView(key) => key.encode(),
172 Key::NamespaceFlow(key) => key.encode(),
173 Key::Table(key) => key.encode(),
174 Key::Flow(key) => key.encode(),
175 Key::Column(key) => key.encode(),
176 Key::Columns(key) => key.encode(),
177 Key::TableColumnProperty(key) => key.encode(),
178 Key::Index(key) => key.encode(),
179 Key::IndexEntry(key) => key.encode(),
180 Key::FlowNodeState(key) => key.encode(),
181 Key::FlowNodeInternalState(key) => key.encode(),
182 Key::PrimaryKey(key) => key.encode(),
183 Key::Row(key) => key.encode(),
184 Key::RowSequence(key) => key.encode(),
185 Key::TableColumnSequence(key) => key.encode(),
186 Key::SystemSequence(key) => key.encode(),
187 Key::SystemVersion(key) => key.encode(),
188 Key::TransactionVersion(key) => key.encode(),
189 Key::View(key) => key.encode(),
190 Key::RingBuffer(key) => key.encode(),
191 Key::RingBufferMetadata(key) => key.encode(),
192 Key::NamespaceRingBuffer(key) => key.encode(),
193 Key::ShapeRetentionStrategy(key) => key.encode(),
194 Key::OperatorRetentionStrategy(key) => key.encode(),
195 Key::Dictionary(key) => key.encode(),
196 Key::DictionaryEntry(key) => key.encode(),
197 Key::DictionaryEntryIndex(key) => key.encode(),
198 Key::DictionarySequence(key) => key.encode(),
199 Key::NamespaceDictionary(key) => key.encode(),
200 Key::SumType(key) => key.encode(),
201 Key::NamespaceSumType(key) => key.encode(),
202 Key::Handler(key) => key.encode(),
203 Key::NamespaceHandler(key) => key.encode(),
204 Key::Series(key) => key.encode(),
205 Key::SeriesMetadata(key) => key.encode(),
206 Key::NamespaceSeries(key) => key.encode(),
207 Key::Identity(key) => key.encode(),
208 Key::Authentication(key) => key.encode(),
209 Key::Role(key) => key.encode(),
210 Key::GrantedRole(key) => key.encode(),
211 Key::Policy(key) => key.encode(),
212 Key::PolicyOp(key) => key.encode(),
213 Key::Token(key) => key.encode(),
214 Key::Source(key) => key.encode(),
215 Key::NamespaceSource(key) => key.encode(),
216 Key::Sink(key) => key.encode(),
217 Key::NamespaceSink(key) => key.encode(),
218 }
219 }
220}
221
222pub trait EncodableKey {
223 const KIND: KeyKind;
224
225 fn encode(&self) -> EncodedKey;
226
227 fn decode(key: &EncodedKey) -> Option<Self>
228 where
229 Self: Sized;
230}
231
232pub trait EncodableKeyRange {
233 const KIND: KeyKind;
234
235 fn start(&self) -> Option<EncodedKey>;
236
237 fn end(&self) -> Option<EncodedKey>;
238
239 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
240 where
241 Self: Sized;
242}
243
244impl Key {
245 pub fn kind(key: impl AsRef<[u8]>) -> Option<KeyKind> {
246 let key = key.as_ref();
247 if key.len() < 2 {
248 return None;
249 }
250
251 keycode::deserialize(&key[1..2]).ok()
252 }
253
254 pub fn decode(key: &EncodedKey) -> Option<Self> {
255 if key.len() < 2 {
256 return None;
257 }
258
259 let kind: KeyKind = keycode::deserialize(&key[1..2]).ok()?;
260 match kind {
261 KeyKind::CdcConsumer => CdcConsumerKey::decode(key).map(Self::CdcConsumer),
262 KeyKind::Columns => ColumnsKey::decode(key).map(Self::Columns),
263 KeyKind::ColumnProperty => ColumnPropertyKey::decode(key).map(Self::TableColumnProperty),
264 KeyKind::Namespace => NamespaceKey::decode(key).map(Self::Namespace),
265 KeyKind::NamespaceTable => NamespaceTableKey::decode(key).map(Self::NamespaceTable),
266 KeyKind::NamespaceView => NamespaceViewKey::decode(key).map(Self::NamespaceView),
267 KeyKind::NamespaceFlow => NamespaceFlowKey::decode(key).map(Self::NamespaceFlow),
268 KeyKind::Table => TableKey::decode(key).map(Self::Table),
269 KeyKind::Flow => FlowKey::decode(key).map(Self::Flow),
270 KeyKind::Column => ColumnKey::decode(key).map(Self::Column),
271 KeyKind::Index => IndexKey::decode(key).map(Self::Index),
272 KeyKind::IndexEntry => IndexEntryKey::decode(key).map(Self::IndexEntry),
273 KeyKind::FlowNodeState => FlowNodeStateKey::decode(key).map(Self::FlowNodeState),
274 KeyKind::FlowNodeInternalState => {
275 FlowNodeInternalStateKey::decode(key).map(Self::FlowNodeInternalState)
276 }
277 KeyKind::Row => RowKey::decode(key).map(Self::Row),
278 KeyKind::RowSequence => RowSequenceKey::decode(key).map(Self::RowSequence),
279 KeyKind::ColumnSequence => ColumnSequenceKey::decode(key).map(Self::TableColumnSequence),
280 KeyKind::SystemSequence => SystemSequenceKey::decode(key).map(Self::SystemSequence),
281 KeyKind::SystemVersion => SystemVersionKey::decode(key).map(Self::SystemVersion),
282 KeyKind::TransactionVersion => TransactionVersionKey::decode(key).map(Self::TransactionVersion),
283 KeyKind::View => ViewKey::decode(key).map(Self::View),
284 KeyKind::PrimaryKey => PrimaryKeyKey::decode(key).map(Self::PrimaryKey),
285 KeyKind::RingBuffer => RingBufferKey::decode(key).map(Self::RingBuffer),
286 KeyKind::RingBufferMetadata => RingBufferMetadataKey::decode(key).map(Self::RingBufferMetadata),
287 KeyKind::NamespaceRingBuffer => {
288 NamespaceRingBufferKey::decode(key).map(Self::NamespaceRingBuffer)
289 }
290 KeyKind::ShapeRetentionStrategy => {
291 ShapeRetentionStrategyKey::decode(key).map(Self::ShapeRetentionStrategy)
292 }
293 KeyKind::OperatorRetentionStrategy => {
294 OperatorRetentionStrategyKey::decode(key).map(Self::OperatorRetentionStrategy)
295 }
296 KeyKind::FlowNode
297 | KeyKind::FlowNodeByFlow
298 | KeyKind::FlowEdge
299 | KeyKind::FlowEdgeByFlow
300 | KeyKind::FlowVersion => {
301 None
303 }
304 KeyKind::Dictionary => DictionaryKey::decode(key).map(Self::Dictionary),
305 KeyKind::DictionaryEntry => DictionaryEntryKey::decode(key).map(Self::DictionaryEntry),
306 KeyKind::DictionaryEntryIndex => {
307 DictionaryEntryIndexKey::decode(key).map(Self::DictionaryEntryIndex)
308 }
309 KeyKind::DictionarySequence => DictionarySequenceKey::decode(key).map(Self::DictionarySequence),
310 KeyKind::NamespaceDictionary => {
311 NamespaceDictionaryKey::decode(key).map(Self::NamespaceDictionary)
312 }
313 KeyKind::SumType => SumTypeKey::decode(key).map(Self::SumType),
314 KeyKind::NamespaceSumType => NamespaceSumTypeKey::decode(key).map(Self::NamespaceSumType),
315 KeyKind::Handler => HandlerKey::decode(key).map(Self::Handler),
316 KeyKind::NamespaceHandler => NamespaceHandlerKey::decode(key).map(Self::NamespaceHandler),
317 KeyKind::VariantHandler => {
318 None
320 }
321 KeyKind::Metric => {
322 None
324 }
325 KeyKind::Subscription | KeyKind::SubscriptionColumn | KeyKind::SubscriptionRow => {
326 None
328 }
329 KeyKind::Shape | KeyKind::RowShapeField => {
330 None
332 }
333 KeyKind::Series => SeriesKey::decode(key).map(Self::Series),
334 KeyKind::NamespaceSeries => NamespaceSeriesKey::decode(key).map(Self::NamespaceSeries),
335 KeyKind::SeriesMetadata => SeriesMetadataKey::decode(key).map(Self::SeriesMetadata),
336 KeyKind::Identity => IdentityKey::decode(key).map(Self::Identity),
337 KeyKind::Authentication => AuthenticationKey::decode(key).map(Self::Authentication),
338 KeyKind::Role => RoleKey::decode(key).map(Self::Role),
339 KeyKind::GrantedRole => GrantedRoleKey::decode(key).map(Self::GrantedRole),
340 KeyKind::Policy => PolicyKey::decode(key).map(Self::Policy),
341 KeyKind::PolicyOp => PolicyOpKey::decode(key).map(Self::PolicyOp),
342 KeyKind::Migration | KeyKind::MigrationEvent => {
343 None
345 }
346 KeyKind::Token => TokenKey::decode(key).map(Self::Token),
347 KeyKind::ConfigStorage => {
348 None
350 }
351 KeyKind::Source
352 | KeyKind::NamespaceSource
353 | KeyKind::Sink
354 | KeyKind::NamespaceSink
355 | KeyKind::SourceCheckpoint => {
356 None
358 }
359 KeyKind::RowTtl => {
360 None
362 }
363 }
364 }
365}
366
367#[cfg(test)]
368pub mod tests {
369 use reifydb_type::value::{row_number::RowNumber, sumtype::SumTypeId};
370
371 use crate::{
372 interface::catalog::{
373 flow::FlowNodeId,
374 id::{ColumnId, ColumnPropertyId, IndexId, NamespaceId, SequenceId, TableId},
375 shape::ShapeId,
376 },
377 key::{
378 Key, column::ColumnKey, column_sequence::ColumnSequenceKey, columns::ColumnsKey,
379 flow_node_state::FlowNodeStateKey, index::IndexKey, namespace::NamespaceKey,
380 namespace_sumtype::NamespaceSumTypeKey, namespace_table::NamespaceTableKey,
381 property::ColumnPropertyKey, row::RowKey, row_sequence::RowSequenceKey, sumtype::SumTypeKey,
382 system_sequence::SystemSequenceKey, table::TableKey,
383 transaction_version::TransactionVersionKey,
384 },
385 };
386
387 #[test]
388 fn test_table_columns() {
389 let key = Key::Columns(ColumnsKey {
390 column: ColumnId(42),
391 });
392
393 let encoded = key.encode();
394 let decoded = Key::decode(&encoded).expect("Failed to decode key");
395
396 match decoded {
397 Key::Columns(decoded_inner) => {
398 assert_eq!(decoded_inner.column, 42);
399 }
400 _ => unreachable!(),
401 }
402 }
403
404 #[test]
405 fn test_column() {
406 let key = Key::Column(ColumnKey {
407 shape: ShapeId::table(1),
408 column: ColumnId(42),
409 });
410
411 let encoded = key.encode();
412 let decoded = Key::decode(&encoded).expect("Failed to decode key");
413
414 match decoded {
415 Key::Column(decoded_inner) => {
416 assert_eq!(decoded_inner.shape, ShapeId::table(1));
417 assert_eq!(decoded_inner.column, 42);
418 }
419 _ => unreachable!(),
420 }
421 }
422
423 #[test]
424 fn test_column_property() {
425 let key = Key::TableColumnProperty(ColumnPropertyKey {
426 column: ColumnId(42),
427 property: ColumnPropertyId(999_999),
428 });
429
430 let encoded = key.encode();
431 let decoded = Key::decode(&encoded).expect("Failed to decode key");
432
433 match decoded {
434 Key::TableColumnProperty(decoded_inner) => {
435 assert_eq!(decoded_inner.column, 42);
436 assert_eq!(decoded_inner.property, 999_999);
437 }
438 _ => unreachable!(),
439 }
440 }
441
442 #[test]
443 fn test_namespace() {
444 let key = Key::Namespace(NamespaceKey {
445 namespace: NamespaceId(42),
446 });
447
448 let encoded = key.encode();
449 let decoded = Key::decode(&encoded).expect("Failed to decode key");
450
451 match decoded {
452 Key::Namespace(decoded_inner) => {
453 assert_eq!(decoded_inner.namespace, 42);
454 }
455 _ => unreachable!(),
456 }
457 }
458
459 #[test]
460 fn test_namespace_table() {
461 let key = Key::NamespaceTable(NamespaceTableKey {
462 namespace: NamespaceId(42),
463 table: TableId(999_999),
464 });
465
466 let encoded = key.encode();
467 let decoded = Key::decode(&encoded).expect("Failed to decode key");
468
469 match decoded {
470 Key::NamespaceTable(decoded_inner) => {
471 assert_eq!(decoded_inner.namespace, 42);
472 assert_eq!(decoded_inner.table, 999_999);
473 }
474 _ => unreachable!(),
475 }
476 }
477
478 #[test]
479 fn test_system_sequence() {
480 let key = Key::SystemSequence(SystemSequenceKey {
481 sequence: SequenceId(42),
482 });
483
484 let encoded = key.encode();
485 let decoded = Key::decode(&encoded).expect("Failed to decode key");
486
487 match decoded {
488 Key::SystemSequence(decoded_inner) => {
489 assert_eq!(decoded_inner.sequence, 42);
490 }
491 _ => unreachable!(),
492 }
493 }
494
495 #[test]
496 fn test_table() {
497 let key = Key::Table(TableKey {
498 table: TableId(42),
499 });
500
501 let encoded = key.encode();
502 let decoded = Key::decode(&encoded).expect("Failed to decode key");
503
504 match decoded {
505 Key::Table(decoded_inner) => {
506 assert_eq!(decoded_inner.table, 42);
507 }
508 _ => unreachable!(),
509 }
510 }
511
512 #[test]
513 fn test_index() {
514 let key = Key::Index(IndexKey {
515 shape: ShapeId::table(42),
516 index: IndexId::primary(999_999),
517 });
518
519 let encoded = key.encode();
520 let decoded = Key::decode(&encoded).expect("Failed to decode key");
521
522 match decoded {
523 Key::Index(decoded_inner) => {
524 assert_eq!(decoded_inner.shape, ShapeId::table(42));
525 assert_eq!(decoded_inner.index, 999_999);
526 }
527 _ => unreachable!(),
528 }
529 }
530
531 #[test]
532 fn test_row() {
533 let key = Key::Row(RowKey {
534 shape: ShapeId::table(42),
535 row: RowNumber(999_999),
536 });
537
538 let encoded = key.encode();
539 let decoded = Key::decode(&encoded).expect("Failed to decode key");
540
541 match decoded {
542 Key::Row(decoded_inner) => {
543 assert_eq!(decoded_inner.shape, ShapeId::table(42));
544 assert_eq!(decoded_inner.row, 999_999);
545 }
546 _ => unreachable!(),
547 }
548 }
549
550 #[test]
551 fn test_row_sequence() {
552 let key = Key::RowSequence(RowSequenceKey {
553 shape: ShapeId::table(42),
554 });
555
556 let encoded = key.encode();
557 let decoded = Key::decode(&encoded).expect("Failed to decode key");
558
559 match decoded {
560 Key::RowSequence(decoded_inner) => {
561 assert_eq!(decoded_inner.shape, ShapeId::table(42));
562 }
563 _ => unreachable!(),
564 }
565 }
566
567 #[test]
568 fn test_column_sequence() {
569 let key = Key::TableColumnSequence(ColumnSequenceKey {
570 shape: ShapeId::table(42),
571 column: ColumnId(123),
572 });
573
574 let encoded = key.encode();
575 let decoded = Key::decode(&encoded).expect("Failed to decode key");
576
577 match decoded {
578 Key::TableColumnSequence(decoded_inner) => {
579 assert_eq!(decoded_inner.shape, ShapeId::table(42));
580 assert_eq!(decoded_inner.column, 123);
581 }
582 _ => unreachable!(),
583 }
584 }
585
586 #[test]
587 fn test_transaction_version() {
588 let key = Key::TransactionVersion(TransactionVersionKey {});
589 let encoded = key.encode();
590 Key::decode(&encoded).expect("Failed to decode key");
591 }
592
593 #[test]
594 fn test_operator_state() {
595 let key = Key::FlowNodeState(FlowNodeStateKey {
596 node: FlowNodeId(0xCAFEBABE),
597 key: vec![1, 2, 3],
598 });
599
600 let encoded = key.encode();
601 let decoded = Key::decode(&encoded).expect("Failed to decode key");
602
603 match decoded {
604 Key::FlowNodeState(decoded_inner) => {
605 assert_eq!(decoded_inner.node, 0xCAFEBABE);
606 assert_eq!(decoded_inner.key, vec![1, 2, 3]);
607 }
608 _ => unreachable!(),
609 }
610 }
611
612 #[test]
613 fn test_sumtype_key() {
614 let key = Key::SumType(SumTypeKey {
615 sumtype: SumTypeId(42),
616 });
617
618 let encoded = key.encode();
619 let decoded = Key::decode(&encoded).expect("Failed to decode key");
620
621 match decoded {
622 Key::SumType(decoded_inner) => {
623 assert_eq!(decoded_inner.sumtype, 42);
624 }
625 _ => unreachable!(),
626 }
627 }
628
629 #[test]
630 fn test_namespace_sumtype_key() {
631 let key = Key::NamespaceSumType(NamespaceSumTypeKey {
632 namespace: NamespaceId(42),
633 sumtype: SumTypeId(999_999),
634 });
635
636 let encoded = key.encode();
637 let decoded = Key::decode(&encoded).expect("Failed to decode key");
638
639 match decoded {
640 Key::NamespaceSumType(decoded_inner) => {
641 assert_eq!(decoded_inner.namespace, 42);
642 assert_eq!(decoded_inner.sumtype, 999_999);
643 }
644 _ => unreachable!(),
645 }
646 }
647}