1use authentication::AuthenticationKey;
5use cdc_consumer::CdcConsumerKey;
6use column::ColumnKey;
7use column_sequence::ColumnSequenceKey;
8use columns::ColumnsKey;
9use dictionary::{DictionaryEntryIndexKey, DictionaryEntryKey, DictionaryKey, DictionarySequenceKey};
10use flow::FlowKey;
11use flow_node_internal_state::FlowNodeInternalStateKey;
12use flow_node_state::FlowNodeStateKey;
13use granted_role::GrantedRoleKey;
14use handler::HandlerKey;
15use identity::IdentityKey;
16use index::IndexKey;
17use index_entry::IndexEntryKey;
18use kind::KeyKind;
19use namespace::NamespaceKey;
20use namespace_dictionary::NamespaceDictionaryKey;
21use namespace_flow::NamespaceFlowKey;
22use namespace_handler::NamespaceHandlerKey;
23use namespace_ringbuffer::NamespaceRingBufferKey;
24use namespace_series::NamespaceSeriesKey;
25use namespace_sink::NamespaceSinkKey;
26use namespace_source::NamespaceSourceKey;
27use namespace_sumtype::NamespaceSumTypeKey;
28use namespace_table::NamespaceTableKey;
29use namespace_view::NamespaceViewKey;
30use policy::PolicyKey;
31use policy_op::PolicyOpKey;
32use primary_key::PrimaryKeyKey;
33use property::ColumnPropertyKey;
34use retention_strategy::{OperatorRetentionStrategyKey, ShapeRetentionStrategyKey};
35use ringbuffer::{RingBufferKey, RingBufferMetadataKey};
36use role::RoleKey;
37use row::RowKey;
38use row_sequence::RowSequenceKey;
39use series::{SeriesKey, SeriesMetadataKey};
40use sink::SinkKey;
41use source::SourceKey;
42use sumtype::SumTypeKey;
43use system_sequence::SystemSequenceKey;
44use system_version::SystemVersionKey;
45use table::TableKey;
46use token::TokenKey;
47use transaction_version::TransactionVersionKey;
48use view::ViewKey;
49
50use crate::{
51 encoded::key::{EncodedKey, EncodedKeyRange},
52 util::encoding::keycode,
53};
54
55pub mod authentication;
56pub mod cdc_consumer;
57pub mod cdc_exclude;
58pub mod column;
59pub mod column_sequence;
60pub mod columns;
61pub mod config;
62pub mod dictionary;
63pub mod flow;
64pub mod flow_edge;
65pub mod flow_node;
66pub mod flow_node_internal_state;
67pub mod flow_node_state;
68pub mod flow_version;
69pub mod granted_role;
70pub mod handler;
71pub mod identity;
72pub mod index;
73pub mod index_entry;
74pub mod kind;
75pub mod migration;
76pub mod migration_event;
77pub mod namespace;
78pub mod namespace_dictionary;
79pub mod namespace_flow;
80pub mod namespace_handler;
81pub mod namespace_ringbuffer;
82pub mod namespace_series;
83pub mod namespace_sink;
84pub mod namespace_source;
85pub mod namespace_sumtype;
86pub mod namespace_table;
87pub mod namespace_view;
88pub mod policy;
89pub mod policy_op;
90pub mod primary_key;
91pub mod property;
92pub mod retention_strategy;
93pub mod ringbuffer;
94pub mod role;
95pub mod row;
96pub mod row_sequence;
97pub mod series;
98pub mod series_row;
99pub mod shape;
100pub mod sink;
101pub mod source;
102pub mod sumtype;
103pub mod system_sequence;
104pub mod system_version;
105pub mod table;
106pub mod token;
107pub mod transaction_version;
108pub mod variant_handler;
109pub mod view;
110#[derive(Debug)]
111pub enum Key {
112 CdcConsumer(CdcConsumerKey),
113 Namespace(NamespaceKey),
114 NamespaceTable(NamespaceTableKey),
115 NamespaceView(NamespaceViewKey),
116 NamespaceFlow(NamespaceFlowKey),
117 SystemSequence(SystemSequenceKey),
118 Table(TableKey),
119 Flow(FlowKey),
120 Column(ColumnKey),
121 Columns(ColumnsKey),
122 Index(IndexKey),
123 IndexEntry(IndexEntryKey),
124 FlowNodeState(FlowNodeStateKey),
125 FlowNodeInternalState(FlowNodeInternalStateKey),
126 PrimaryKey(PrimaryKeyKey),
127 Row(RowKey),
128 RowSequence(RowSequenceKey),
129 TableColumnSequence(ColumnSequenceKey),
130 TableColumnProperty(ColumnPropertyKey),
131 SystemVersion(SystemVersionKey),
132 TransactionVersion(TransactionVersionKey),
133 View(ViewKey),
134 RingBuffer(RingBufferKey),
135 RingBufferMetadata(RingBufferMetadataKey),
136 NamespaceRingBuffer(NamespaceRingBufferKey),
137 ShapeRetentionStrategy(ShapeRetentionStrategyKey),
138 OperatorRetentionStrategy(OperatorRetentionStrategyKey),
139 Dictionary(DictionaryKey),
140 DictionaryEntry(DictionaryEntryKey),
141 DictionaryEntryIndex(DictionaryEntryIndexKey),
142 DictionarySequence(DictionarySequenceKey),
143 NamespaceDictionary(NamespaceDictionaryKey),
144 SumType(SumTypeKey),
145 NamespaceSumType(NamespaceSumTypeKey),
146 Handler(HandlerKey),
147 NamespaceHandler(NamespaceHandlerKey),
148 Series(SeriesKey),
149 SeriesMetadata(SeriesMetadataKey),
150 NamespaceSeries(NamespaceSeriesKey),
151 Identity(IdentityKey),
152 Authentication(AuthenticationKey),
153 Role(RoleKey),
154 GrantedRole(GrantedRoleKey),
155 Policy(PolicyKey),
156 PolicyOp(PolicyOpKey),
157 Token(TokenKey),
158 Source(SourceKey),
159 NamespaceSource(NamespaceSourceKey),
160 Sink(SinkKey),
161 NamespaceSink(NamespaceSinkKey),
162}
163
164impl Key {
165 pub fn encode(&self) -> EncodedKey {
166 match &self {
167 Key::CdcConsumer(key) => key.encode(),
168 Key::Namespace(key) => key.encode(),
169 Key::NamespaceTable(key) => key.encode(),
170 Key::NamespaceView(key) => key.encode(),
171 Key::NamespaceFlow(key) => key.encode(),
172 Key::Table(key) => key.encode(),
173 Key::Flow(key) => key.encode(),
174 Key::Column(key) => key.encode(),
175 Key::Columns(key) => key.encode(),
176 Key::TableColumnProperty(key) => key.encode(),
177 Key::Index(key) => key.encode(),
178 Key::IndexEntry(key) => key.encode(),
179 Key::FlowNodeState(key) => key.encode(),
180 Key::FlowNodeInternalState(key) => key.encode(),
181 Key::PrimaryKey(key) => key.encode(),
182 Key::Row(key) => key.encode(),
183 Key::RowSequence(key) => key.encode(),
184 Key::TableColumnSequence(key) => key.encode(),
185 Key::SystemSequence(key) => key.encode(),
186 Key::SystemVersion(key) => key.encode(),
187 Key::TransactionVersion(key) => key.encode(),
188 Key::View(key) => key.encode(),
189 Key::RingBuffer(key) => key.encode(),
190 Key::RingBufferMetadata(key) => key.encode(),
191 Key::NamespaceRingBuffer(key) => key.encode(),
192 Key::ShapeRetentionStrategy(key) => key.encode(),
193 Key::OperatorRetentionStrategy(key) => key.encode(),
194 Key::Dictionary(key) => key.encode(),
195 Key::DictionaryEntry(key) => key.encode(),
196 Key::DictionaryEntryIndex(key) => key.encode(),
197 Key::DictionarySequence(key) => key.encode(),
198 Key::NamespaceDictionary(key) => key.encode(),
199 Key::SumType(key) => key.encode(),
200 Key::NamespaceSumType(key) => key.encode(),
201 Key::Handler(key) => key.encode(),
202 Key::NamespaceHandler(key) => key.encode(),
203 Key::Series(key) => key.encode(),
204 Key::SeriesMetadata(key) => key.encode(),
205 Key::NamespaceSeries(key) => key.encode(),
206 Key::Identity(key) => key.encode(),
207 Key::Authentication(key) => key.encode(),
208 Key::Role(key) => key.encode(),
209 Key::GrantedRole(key) => key.encode(),
210 Key::Policy(key) => key.encode(),
211 Key::PolicyOp(key) => key.encode(),
212 Key::Token(key) => key.encode(),
213 Key::Source(key) => key.encode(),
214 Key::NamespaceSource(key) => key.encode(),
215 Key::Sink(key) => key.encode(),
216 Key::NamespaceSink(key) => key.encode(),
217 }
218 }
219}
220
221pub trait EncodableKey {
222 const KIND: KeyKind;
223
224 fn encode(&self) -> EncodedKey;
225
226 fn decode(key: &EncodedKey) -> Option<Self>
227 where
228 Self: Sized;
229}
230
231pub trait EncodableKeyRange {
232 const KIND: KeyKind;
233
234 fn start(&self) -> Option<EncodedKey>;
235
236 fn end(&self) -> Option<EncodedKey>;
237
238 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
239 where
240 Self: Sized;
241}
242
243impl Key {
244 pub fn kind(key: impl AsRef<[u8]>) -> Option<KeyKind> {
245 let key = key.as_ref();
246 if key.len() < 2 {
247 return None;
248 }
249
250 keycode::deserialize(&key[1..2]).ok()
251 }
252
253 pub fn decode(key: &EncodedKey) -> Option<Self> {
254 if key.len() < 2 {
255 return None;
256 }
257
258 let kind: KeyKind = keycode::deserialize(&key[1..2]).ok()?;
259 match kind {
260 KeyKind::CdcConsumer => CdcConsumerKey::decode(key).map(Self::CdcConsumer),
261 KeyKind::Columns => ColumnsKey::decode(key).map(Self::Columns),
262 KeyKind::ColumnProperty => ColumnPropertyKey::decode(key).map(Self::TableColumnProperty),
263 KeyKind::Namespace => NamespaceKey::decode(key).map(Self::Namespace),
264 KeyKind::NamespaceTable => NamespaceTableKey::decode(key).map(Self::NamespaceTable),
265 KeyKind::NamespaceView => NamespaceViewKey::decode(key).map(Self::NamespaceView),
266 KeyKind::NamespaceFlow => NamespaceFlowKey::decode(key).map(Self::NamespaceFlow),
267 KeyKind::Table => TableKey::decode(key).map(Self::Table),
268 KeyKind::Flow => FlowKey::decode(key).map(Self::Flow),
269 KeyKind::Column => ColumnKey::decode(key).map(Self::Column),
270 KeyKind::Index => IndexKey::decode(key).map(Self::Index),
271 KeyKind::IndexEntry => IndexEntryKey::decode(key).map(Self::IndexEntry),
272 KeyKind::FlowNodeState => FlowNodeStateKey::decode(key).map(Self::FlowNodeState),
273 KeyKind::FlowNodeInternalState => {
274 FlowNodeInternalStateKey::decode(key).map(Self::FlowNodeInternalState)
275 }
276 KeyKind::Row => RowKey::decode(key).map(Self::Row),
277 KeyKind::RowSequence => RowSequenceKey::decode(key).map(Self::RowSequence),
278 KeyKind::ColumnSequence => ColumnSequenceKey::decode(key).map(Self::TableColumnSequence),
279 KeyKind::SystemSequence => SystemSequenceKey::decode(key).map(Self::SystemSequence),
280 KeyKind::SystemVersion => SystemVersionKey::decode(key).map(Self::SystemVersion),
281 KeyKind::TransactionVersion => TransactionVersionKey::decode(key).map(Self::TransactionVersion),
282 KeyKind::View => ViewKey::decode(key).map(Self::View),
283 KeyKind::PrimaryKey => PrimaryKeyKey::decode(key).map(Self::PrimaryKey),
284 KeyKind::RingBuffer => RingBufferKey::decode(key).map(Self::RingBuffer),
285 KeyKind::RingBufferMetadata => RingBufferMetadataKey::decode(key).map(Self::RingBufferMetadata),
286 KeyKind::NamespaceRingBuffer => {
287 NamespaceRingBufferKey::decode(key).map(Self::NamespaceRingBuffer)
288 }
289 KeyKind::ShapeRetentionStrategy => {
290 ShapeRetentionStrategyKey::decode(key).map(Self::ShapeRetentionStrategy)
291 }
292 KeyKind::OperatorRetentionStrategy => {
293 OperatorRetentionStrategyKey::decode(key).map(Self::OperatorRetentionStrategy)
294 }
295 KeyKind::FlowNode
296 | KeyKind::FlowNodeByFlow
297 | KeyKind::FlowEdge
298 | KeyKind::FlowEdgeByFlow
299 | KeyKind::FlowVersion => {
300 None
302 }
303 KeyKind::Dictionary => DictionaryKey::decode(key).map(Self::Dictionary),
304 KeyKind::DictionaryEntry => DictionaryEntryKey::decode(key).map(Self::DictionaryEntry),
305 KeyKind::DictionaryEntryIndex => {
306 DictionaryEntryIndexKey::decode(key).map(Self::DictionaryEntryIndex)
307 }
308 KeyKind::DictionarySequence => DictionarySequenceKey::decode(key).map(Self::DictionarySequence),
309 KeyKind::NamespaceDictionary => {
310 NamespaceDictionaryKey::decode(key).map(Self::NamespaceDictionary)
311 }
312 KeyKind::SumType => SumTypeKey::decode(key).map(Self::SumType),
313 KeyKind::NamespaceSumType => NamespaceSumTypeKey::decode(key).map(Self::NamespaceSumType),
314 KeyKind::Handler => HandlerKey::decode(key).map(Self::Handler),
315 KeyKind::NamespaceHandler => NamespaceHandlerKey::decode(key).map(Self::NamespaceHandler),
316 KeyKind::VariantHandler => {
317 None
319 }
320 KeyKind::Metric => {
321 None
323 }
324 KeyKind::Subscription | KeyKind::SubscriptionColumn | KeyKind::SubscriptionRow => {
325 None
327 }
328 KeyKind::Shape | KeyKind::RowShapeField => {
329 None
331 }
332 KeyKind::Series => SeriesKey::decode(key).map(Self::Series),
333 KeyKind::NamespaceSeries => NamespaceSeriesKey::decode(key).map(Self::NamespaceSeries),
334 KeyKind::SeriesMetadata => SeriesMetadataKey::decode(key).map(Self::SeriesMetadata),
335 KeyKind::Identity => IdentityKey::decode(key).map(Self::Identity),
336 KeyKind::Authentication => AuthenticationKey::decode(key).map(Self::Authentication),
337 KeyKind::Role => RoleKey::decode(key).map(Self::Role),
338 KeyKind::GrantedRole => GrantedRoleKey::decode(key).map(Self::GrantedRole),
339 KeyKind::Policy => PolicyKey::decode(key).map(Self::Policy),
340 KeyKind::PolicyOp => PolicyOpKey::decode(key).map(Self::PolicyOp),
341 KeyKind::Migration | KeyKind::MigrationEvent => {
342 None
344 }
345 KeyKind::Token => TokenKey::decode(key).map(Self::Token),
346 KeyKind::Config => {
347 None
349 }
350 KeyKind::Source
351 | KeyKind::NamespaceSource
352 | KeyKind::Sink
353 | KeyKind::NamespaceSink
354 | KeyKind::SourceCheckpoint => {
355 None
357 }
358 }
359 }
360}
361
362#[cfg(test)]
363pub mod tests {
364 use reifydb_type::value::{row_number::RowNumber, sumtype::SumTypeId};
365
366 use crate::{
367 interface::catalog::{
368 flow::FlowNodeId,
369 id::{ColumnId, ColumnPropertyId, IndexId, NamespaceId, SequenceId, TableId},
370 shape::ShapeId,
371 },
372 key::{
373 Key, column::ColumnKey, column_sequence::ColumnSequenceKey, columns::ColumnsKey,
374 flow_node_state::FlowNodeStateKey, index::IndexKey, namespace::NamespaceKey,
375 namespace_sumtype::NamespaceSumTypeKey, namespace_table::NamespaceTableKey,
376 property::ColumnPropertyKey, row::RowKey, row_sequence::RowSequenceKey, sumtype::SumTypeKey,
377 system_sequence::SystemSequenceKey, table::TableKey,
378 transaction_version::TransactionVersionKey,
379 },
380 };
381
382 #[test]
383 fn test_table_columns() {
384 let key = Key::Columns(ColumnsKey {
385 column: ColumnId(42),
386 });
387
388 let encoded = key.encode();
389 let decoded = Key::decode(&encoded).expect("Failed to decode key");
390
391 match decoded {
392 Key::Columns(decoded_inner) => {
393 assert_eq!(decoded_inner.column, 42);
394 }
395 _ => unreachable!(),
396 }
397 }
398
399 #[test]
400 fn test_column() {
401 let key = Key::Column(ColumnKey {
402 shape: ShapeId::table(1),
403 column: ColumnId(42),
404 });
405
406 let encoded = key.encode();
407 let decoded = Key::decode(&encoded).expect("Failed to decode key");
408
409 match decoded {
410 Key::Column(decoded_inner) => {
411 assert_eq!(decoded_inner.shape, ShapeId::table(1));
412 assert_eq!(decoded_inner.column, 42);
413 }
414 _ => unreachable!(),
415 }
416 }
417
418 #[test]
419 fn test_column_property() {
420 let key = Key::TableColumnProperty(ColumnPropertyKey {
421 column: ColumnId(42),
422 property: ColumnPropertyId(999_999),
423 });
424
425 let encoded = key.encode();
426 let decoded = Key::decode(&encoded).expect("Failed to decode key");
427
428 match decoded {
429 Key::TableColumnProperty(decoded_inner) => {
430 assert_eq!(decoded_inner.column, 42);
431 assert_eq!(decoded_inner.property, 999_999);
432 }
433 _ => unreachable!(),
434 }
435 }
436
437 #[test]
438 fn test_namespace() {
439 let key = Key::Namespace(NamespaceKey {
440 namespace: NamespaceId(42),
441 });
442
443 let encoded = key.encode();
444 let decoded = Key::decode(&encoded).expect("Failed to decode key");
445
446 match decoded {
447 Key::Namespace(decoded_inner) => {
448 assert_eq!(decoded_inner.namespace, 42);
449 }
450 _ => unreachable!(),
451 }
452 }
453
454 #[test]
455 fn test_namespace_table() {
456 let key = Key::NamespaceTable(NamespaceTableKey {
457 namespace: NamespaceId(42),
458 table: TableId(999_999),
459 });
460
461 let encoded = key.encode();
462 let decoded = Key::decode(&encoded).expect("Failed to decode key");
463
464 match decoded {
465 Key::NamespaceTable(decoded_inner) => {
466 assert_eq!(decoded_inner.namespace, 42);
467 assert_eq!(decoded_inner.table, 999_999);
468 }
469 _ => unreachable!(),
470 }
471 }
472
473 #[test]
474 fn test_system_sequence() {
475 let key = Key::SystemSequence(SystemSequenceKey {
476 sequence: SequenceId(42),
477 });
478
479 let encoded = key.encode();
480 let decoded = Key::decode(&encoded).expect("Failed to decode key");
481
482 match decoded {
483 Key::SystemSequence(decoded_inner) => {
484 assert_eq!(decoded_inner.sequence, 42);
485 }
486 _ => unreachable!(),
487 }
488 }
489
490 #[test]
491 fn test_table() {
492 let key = Key::Table(TableKey {
493 table: TableId(42),
494 });
495
496 let encoded = key.encode();
497 let decoded = Key::decode(&encoded).expect("Failed to decode key");
498
499 match decoded {
500 Key::Table(decoded_inner) => {
501 assert_eq!(decoded_inner.table, 42);
502 }
503 _ => unreachable!(),
504 }
505 }
506
507 #[test]
508 fn test_index() {
509 let key = Key::Index(IndexKey {
510 shape: ShapeId::table(42),
511 index: IndexId::primary(999_999),
512 });
513
514 let encoded = key.encode();
515 let decoded = Key::decode(&encoded).expect("Failed to decode key");
516
517 match decoded {
518 Key::Index(decoded_inner) => {
519 assert_eq!(decoded_inner.shape, ShapeId::table(42));
520 assert_eq!(decoded_inner.index, 999_999);
521 }
522 _ => unreachable!(),
523 }
524 }
525
526 #[test]
527 fn test_row() {
528 let key = Key::Row(RowKey {
529 shape: ShapeId::table(42),
530 row: RowNumber(999_999),
531 });
532
533 let encoded = key.encode();
534 let decoded = Key::decode(&encoded).expect("Failed to decode key");
535
536 match decoded {
537 Key::Row(decoded_inner) => {
538 assert_eq!(decoded_inner.shape, ShapeId::table(42));
539 assert_eq!(decoded_inner.row, 999_999);
540 }
541 _ => unreachable!(),
542 }
543 }
544
545 #[test]
546 fn test_row_sequence() {
547 let key = Key::RowSequence(RowSequenceKey {
548 shape: ShapeId::table(42),
549 });
550
551 let encoded = key.encode();
552 let decoded = Key::decode(&encoded).expect("Failed to decode key");
553
554 match decoded {
555 Key::RowSequence(decoded_inner) => {
556 assert_eq!(decoded_inner.shape, ShapeId::table(42));
557 }
558 _ => unreachable!(),
559 }
560 }
561
562 #[test]
563 fn test_column_sequence() {
564 let key = Key::TableColumnSequence(ColumnSequenceKey {
565 shape: ShapeId::table(42),
566 column: ColumnId(123),
567 });
568
569 let encoded = key.encode();
570 let decoded = Key::decode(&encoded).expect("Failed to decode key");
571
572 match decoded {
573 Key::TableColumnSequence(decoded_inner) => {
574 assert_eq!(decoded_inner.shape, ShapeId::table(42));
575 assert_eq!(decoded_inner.column, 123);
576 }
577 _ => unreachable!(),
578 }
579 }
580
581 #[test]
582 fn test_transaction_version() {
583 let key = Key::TransactionVersion(TransactionVersionKey {});
584 let encoded = key.encode();
585 Key::decode(&encoded).expect("Failed to decode key");
586 }
587
588 #[test]
589 fn test_operator_state() {
590 let key = Key::FlowNodeState(FlowNodeStateKey {
591 node: FlowNodeId(0xCAFEBABE),
592 key: vec![1, 2, 3],
593 });
594
595 let encoded = key.encode();
596 let decoded = Key::decode(&encoded).expect("Failed to decode key");
597
598 match decoded {
599 Key::FlowNodeState(decoded_inner) => {
600 assert_eq!(decoded_inner.node, 0xCAFEBABE);
601 assert_eq!(decoded_inner.key, vec![1, 2, 3]);
602 }
603 _ => unreachable!(),
604 }
605 }
606
607 #[test]
608 fn test_sumtype_key() {
609 let key = Key::SumType(SumTypeKey {
610 sumtype: SumTypeId(42),
611 });
612
613 let encoded = key.encode();
614 let decoded = Key::decode(&encoded).expect("Failed to decode key");
615
616 match decoded {
617 Key::SumType(decoded_inner) => {
618 assert_eq!(decoded_inner.sumtype, 42);
619 }
620 _ => unreachable!(),
621 }
622 }
623
624 #[test]
625 fn test_namespace_sumtype_key() {
626 let key = Key::NamespaceSumType(NamespaceSumTypeKey {
627 namespace: NamespaceId(42),
628 sumtype: SumTypeId(999_999),
629 });
630
631 let encoded = key.encode();
632 let decoded = Key::decode(&encoded).expect("Failed to decode key");
633
634 match decoded {
635 Key::NamespaceSumType(decoded_inner) => {
636 assert_eq!(decoded_inner.namespace, 42);
637 assert_eq!(decoded_inner.sumtype, 999_999);
638 }
639 _ => unreachable!(),
640 }
641 }
642}