1use cdc_consumer::CdcConsumerKey;
5use column::ColumnKey;
6use column_sequence::ColumnSequenceKey;
7use columns::ColumnsKey;
8use dictionary::{DictionaryEntryIndexKey, DictionaryEntryKey, DictionaryKey, DictionarySequenceKey};
9use flow::FlowKey;
10use flow_node_internal_state::FlowNodeInternalStateKey;
11use flow_node_state::FlowNodeStateKey;
12use handler::HandlerKey;
13use index::IndexKey;
14use index_entry::IndexEntryKey;
15use kind::KeyKind;
16use namespace::NamespaceKey;
17use namespace_dictionary::NamespaceDictionaryKey;
18use namespace_flow::NamespaceFlowKey;
19use namespace_handler::NamespaceHandlerKey;
20use namespace_ringbuffer::NamespaceRingBufferKey;
21use namespace_series::NamespaceSeriesKey;
22use namespace_sumtype::NamespaceSumTypeKey;
23use namespace_table::NamespaceTableKey;
24use namespace_view::NamespaceViewKey;
25use policy::PolicyKey;
26use policy_op::PolicyOpKey;
27use primary_key::PrimaryKeyKey;
28use property::ColumnPropertyKey;
29use retention_policy::{OperatorRetentionPolicyKey, PrimitiveRetentionPolicyKey};
30use ringbuffer::{RingBufferKey, RingBufferMetadataKey};
31use role::RoleKey;
32use row::RowKey;
33use row_sequence::RowSequenceKey;
34use series::{SeriesKey, SeriesMetadataKey};
35use subscription::SubscriptionKey;
36use subscription_column::SubscriptionColumnKey;
37use subscription_row::SubscriptionRowKey;
38use sumtype::SumTypeKey;
39use system_sequence::SystemSequenceKey;
40use system_version::SystemVersionKey;
41use table::TableKey;
42use transaction_version::TransactionVersionKey;
43use user::UserKey;
44use user_authentication::UserAuthenticationKey;
45use user_role::UserRoleKey;
46use view::ViewKey;
47
48use crate::{
49 encoded::key::{EncodedKey, EncodedKeyRange},
50 util::encoding::keycode,
51};
52
53pub mod cdc_consumer;
54pub mod cdc_exclude;
55pub mod column;
56pub mod column_sequence;
57pub mod columns;
58pub mod config;
59pub mod dictionary;
60pub mod flow;
61pub mod flow_edge;
62pub mod flow_node;
63pub mod flow_node_internal_state;
64pub mod flow_node_state;
65pub mod flow_version;
66pub mod handler;
67pub mod index;
68pub mod index_entry;
69pub mod kind;
70pub mod migration;
71pub mod migration_event;
72pub mod namespace;
73pub mod namespace_dictionary;
74pub mod namespace_flow;
75pub mod namespace_handler;
76pub mod namespace_ringbuffer;
77pub mod namespace_series;
78pub mod namespace_sumtype;
79pub mod namespace_table;
80pub mod namespace_view;
81pub mod policy;
82pub mod policy_op;
83pub mod primary_key;
84pub mod property;
85pub mod retention_policy;
86pub mod ringbuffer;
87pub mod role;
88pub mod row;
89pub mod row_sequence;
90pub mod schema;
91pub mod series;
92pub mod series_row;
93pub mod subscription;
94pub mod subscription_column;
95pub mod subscription_row;
96pub mod sumtype;
97pub mod system_sequence;
98pub mod system_version;
99pub mod table;
100pub mod transaction_version;
101pub mod user;
102pub mod user_authentication;
103pub mod user_role;
104pub mod variant_handler;
105pub mod view;
106#[derive(Debug)]
107pub enum Key {
108 CdcConsumer(CdcConsumerKey),
109 Namespace(NamespaceKey),
110 NamespaceTable(NamespaceTableKey),
111 NamespaceView(NamespaceViewKey),
112 NamespaceFlow(NamespaceFlowKey),
113 SystemSequence(SystemSequenceKey),
114 Table(TableKey),
115 Flow(FlowKey),
116 Column(ColumnKey),
117 Columns(ColumnsKey),
118 Index(IndexKey),
119 IndexEntry(IndexEntryKey),
120 FlowNodeState(FlowNodeStateKey),
121 FlowNodeInternalState(FlowNodeInternalStateKey),
122 PrimaryKey(PrimaryKeyKey),
123 Row(RowKey),
124 RowSequence(RowSequenceKey),
125 TableColumnSequence(ColumnSequenceKey),
126 TableColumnProperty(ColumnPropertyKey),
127 SystemVersion(SystemVersionKey),
128 TransactionVersion(TransactionVersionKey),
129 View(ViewKey),
130 RingBuffer(RingBufferKey),
131 RingBufferMetadata(RingBufferMetadataKey),
132 NamespaceRingBuffer(NamespaceRingBufferKey),
133 PrimitiveRetentionPolicy(PrimitiveRetentionPolicyKey),
134 OperatorRetentionPolicy(OperatorRetentionPolicyKey),
135 Dictionary(DictionaryKey),
136 DictionaryEntry(DictionaryEntryKey),
137 DictionaryEntryIndex(DictionaryEntryIndexKey),
138 DictionarySequence(DictionarySequenceKey),
139 NamespaceDictionary(NamespaceDictionaryKey),
140 SumType(SumTypeKey),
141 NamespaceSumType(NamespaceSumTypeKey),
142 Handler(HandlerKey),
143 NamespaceHandler(NamespaceHandlerKey),
144 Subscription(SubscriptionKey),
145 SubscriptionColumn(SubscriptionColumnKey),
146 SubscriptionRow(SubscriptionRowKey),
147 Series(SeriesKey),
148 SeriesMetadata(SeriesMetadataKey),
149 NamespaceSeries(NamespaceSeriesKey),
150 User(UserKey),
151 UserAuthentication(UserAuthenticationKey),
152 Role(RoleKey),
153 UserRole(UserRoleKey),
154 Policy(PolicyKey),
155 PolicyOp(PolicyOpKey),
156}
157
158impl Key {
159 pub fn encode(&self) -> EncodedKey {
160 match &self {
161 Key::CdcConsumer(key) => key.encode(),
162 Key::Namespace(key) => key.encode(),
163 Key::NamespaceTable(key) => key.encode(),
164 Key::NamespaceView(key) => key.encode(),
165 Key::NamespaceFlow(key) => key.encode(),
166 Key::Table(key) => key.encode(),
167 Key::Flow(key) => key.encode(),
168 Key::Column(key) => key.encode(),
169 Key::Columns(key) => key.encode(),
170 Key::TableColumnProperty(key) => key.encode(),
171 Key::Index(key) => key.encode(),
172 Key::IndexEntry(key) => key.encode(),
173 Key::FlowNodeState(key) => key.encode(),
174 Key::FlowNodeInternalState(key) => key.encode(),
175 Key::PrimaryKey(key) => key.encode(),
176 Key::Row(key) => key.encode(),
177 Key::RowSequence(key) => key.encode(),
178 Key::TableColumnSequence(key) => key.encode(),
179 Key::SystemSequence(key) => key.encode(),
180 Key::SystemVersion(key) => key.encode(),
181 Key::TransactionVersion(key) => key.encode(),
182 Key::View(key) => key.encode(),
183 Key::RingBuffer(key) => key.encode(),
184 Key::RingBufferMetadata(key) => key.encode(),
185 Key::NamespaceRingBuffer(key) => key.encode(),
186 Key::PrimitiveRetentionPolicy(key) => key.encode(),
187 Key::OperatorRetentionPolicy(key) => key.encode(),
188 Key::Dictionary(key) => key.encode(),
189 Key::DictionaryEntry(key) => key.encode(),
190 Key::DictionaryEntryIndex(key) => key.encode(),
191 Key::DictionarySequence(key) => key.encode(),
192 Key::NamespaceDictionary(key) => key.encode(),
193 Key::SumType(key) => key.encode(),
194 Key::NamespaceSumType(key) => key.encode(),
195 Key::Handler(key) => key.encode(),
196 Key::NamespaceHandler(key) => key.encode(),
197 Key::Subscription(key) => key.encode(),
198 Key::SubscriptionColumn(key) => key.encode(),
199 Key::SubscriptionRow(key) => key.encode(),
200 Key::Series(key) => key.encode(),
201 Key::SeriesMetadata(key) => key.encode(),
202 Key::NamespaceSeries(key) => key.encode(),
203 Key::User(key) => key.encode(),
204 Key::UserAuthentication(key) => key.encode(),
205 Key::Role(key) => key.encode(),
206 Key::UserRole(key) => key.encode(),
207 Key::Policy(key) => key.encode(),
208 Key::PolicyOp(key) => key.encode(),
209 }
210 }
211}
212
213pub trait EncodableKey {
214 const KIND: KeyKind;
215
216 fn encode(&self) -> EncodedKey;
217
218 fn decode(key: &EncodedKey) -> Option<Self>
219 where
220 Self: Sized;
221}
222
223pub trait EncodableKeyRange {
224 const KIND: KeyKind;
225
226 fn start(&self) -> Option<EncodedKey>;
227
228 fn end(&self) -> Option<EncodedKey>;
229
230 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
231 where
232 Self: Sized;
233}
234
235impl Key {
236 pub fn kind(key: impl AsRef<[u8]>) -> Option<KeyKind> {
237 let key = key.as_ref();
238 if key.len() < 2 {
239 return None;
240 }
241
242 keycode::deserialize(&key[1..2]).ok()
243 }
244
245 pub fn decode(key: &EncodedKey) -> Option<Self> {
246 if key.len() < 2 {
247 return None;
248 }
249
250 let kind: KeyKind = keycode::deserialize(&key[1..2]).ok()?;
251 match kind {
252 KeyKind::CdcConsumer => CdcConsumerKey::decode(&key).map(Self::CdcConsumer),
253 KeyKind::Columns => ColumnsKey::decode(&key).map(Self::Columns),
254 KeyKind::ColumnProperty => ColumnPropertyKey::decode(&key).map(Self::TableColumnProperty),
255 KeyKind::Namespace => NamespaceKey::decode(&key).map(Self::Namespace),
256 KeyKind::NamespaceTable => NamespaceTableKey::decode(&key).map(Self::NamespaceTable),
257 KeyKind::NamespaceView => NamespaceViewKey::decode(&key).map(Self::NamespaceView),
258 KeyKind::NamespaceFlow => NamespaceFlowKey::decode(&key).map(Self::NamespaceFlow),
259 KeyKind::Table => TableKey::decode(&key).map(Self::Table),
260 KeyKind::Flow => FlowKey::decode(&key).map(Self::Flow),
261 KeyKind::Column => ColumnKey::decode(&key).map(Self::Column),
262 KeyKind::Index => IndexKey::decode(&key).map(Self::Index),
263 KeyKind::IndexEntry => IndexEntryKey::decode(&key).map(Self::IndexEntry),
264 KeyKind::FlowNodeState => FlowNodeStateKey::decode(&key).map(Self::FlowNodeState),
265 KeyKind::FlowNodeInternalState => {
266 FlowNodeInternalStateKey::decode(&key).map(Self::FlowNodeInternalState)
267 }
268 KeyKind::Row => RowKey::decode(&key).map(Self::Row),
269 KeyKind::RowSequence => RowSequenceKey::decode(&key).map(Self::RowSequence),
270 KeyKind::ColumnSequence => ColumnSequenceKey::decode(&key).map(Self::TableColumnSequence),
271 KeyKind::SystemSequence => SystemSequenceKey::decode(&key).map(Self::SystemSequence),
272 KeyKind::SystemVersion => SystemVersionKey::decode(&key).map(Self::SystemVersion),
273 KeyKind::TransactionVersion => {
274 TransactionVersionKey::decode(&key).map(Self::TransactionVersion)
275 }
276 KeyKind::View => ViewKey::decode(&key).map(Self::View),
277 KeyKind::PrimaryKey => PrimaryKeyKey::decode(&key).map(Self::PrimaryKey),
278 KeyKind::RingBuffer => RingBufferKey::decode(&key).map(Self::RingBuffer),
279 KeyKind::RingBufferMetadata => {
280 RingBufferMetadataKey::decode(&key).map(Self::RingBufferMetadata)
281 }
282 KeyKind::NamespaceRingBuffer => {
283 NamespaceRingBufferKey::decode(&key).map(Self::NamespaceRingBuffer)
284 }
285 KeyKind::PrimitiveRetentionPolicy => {
286 PrimitiveRetentionPolicyKey::decode(&key).map(Self::PrimitiveRetentionPolicy)
287 }
288 KeyKind::OperatorRetentionPolicy => {
289 OperatorRetentionPolicyKey::decode(&key).map(Self::OperatorRetentionPolicy)
290 }
291 KeyKind::FlowNode
292 | KeyKind::FlowNodeByFlow
293 | KeyKind::FlowEdge
294 | KeyKind::FlowEdgeByFlow
295 | KeyKind::FlowVersion => {
296 None
298 }
299 KeyKind::Dictionary => DictionaryKey::decode(&key).map(Self::Dictionary),
300 KeyKind::DictionaryEntry => DictionaryEntryKey::decode(&key).map(Self::DictionaryEntry),
301 KeyKind::DictionaryEntryIndex => {
302 DictionaryEntryIndexKey::decode(&key).map(Self::DictionaryEntryIndex)
303 }
304 KeyKind::DictionarySequence => {
305 DictionarySequenceKey::decode(&key).map(Self::DictionarySequence)
306 }
307 KeyKind::NamespaceDictionary => {
308 NamespaceDictionaryKey::decode(&key).map(Self::NamespaceDictionary)
309 }
310 KeyKind::SumType => SumTypeKey::decode(&key).map(Self::SumType),
311 KeyKind::NamespaceSumType => NamespaceSumTypeKey::decode(&key).map(Self::NamespaceSumType),
312 KeyKind::Handler => HandlerKey::decode(&key).map(Self::Handler),
313 KeyKind::NamespaceHandler => NamespaceHandlerKey::decode(&key).map(Self::NamespaceHandler),
314 KeyKind::VariantHandler => {
315 None
317 }
318 KeyKind::Metric => {
319 None
321 }
322 KeyKind::Subscription => SubscriptionKey::decode(&key).map(Self::Subscription),
323 KeyKind::SubscriptionColumn => {
324 SubscriptionColumnKey::decode(&key).map(Self::SubscriptionColumn)
325 }
326 KeyKind::SubscriptionRow => SubscriptionRowKey::decode(&key).map(Self::SubscriptionRow),
327 KeyKind::Schema | KeyKind::SchemaField => {
328 None
330 }
331 KeyKind::Series => SeriesKey::decode(&key).map(Self::Series),
332 KeyKind::NamespaceSeries => NamespaceSeriesKey::decode(&key).map(Self::NamespaceSeries),
333 KeyKind::SeriesMetadata => SeriesMetadataKey::decode(&key).map(Self::SeriesMetadata),
334 KeyKind::User => UserKey::decode(&key).map(Self::User),
335 KeyKind::UserAuthentication => {
336 UserAuthenticationKey::decode(&key).map(Self::UserAuthentication)
337 }
338 KeyKind::Role => RoleKey::decode(&key).map(Self::Role),
339 KeyKind::UserRole => UserRoleKey::decode(&key).map(Self::UserRole),
340 KeyKind::Policy => PolicyKey::decode(&key).map(Self::Policy),
341 KeyKind::PolicyOp => PolicyOpKey::decode(&key).map(Self::PolicyOp),
342 KeyKind::Migration | KeyKind::MigrationEvent => {
343 None
345 }
346 KeyKind::Config => {
347 None
349 }
350 }
351 }
352}
353
354#[cfg(test)]
355pub mod tests {
356 use reifydb_type::value::{row_number::RowNumber, sumtype::SumTypeId};
357
358 use crate::{
359 interface::catalog::{
360 flow::FlowNodeId,
361 id::{ColumnId, ColumnPropertyId, IndexId, NamespaceId, SequenceId, TableId},
362 primitive::PrimitiveId,
363 },
364 key::{
365 Key, column::ColumnKey, column_sequence::ColumnSequenceKey, columns::ColumnsKey,
366 flow_node_state::FlowNodeStateKey, index::IndexKey, namespace::NamespaceKey,
367 namespace_sumtype::NamespaceSumTypeKey, namespace_table::NamespaceTableKey,
368 property::ColumnPropertyKey, row::RowKey, row_sequence::RowSequenceKey, sumtype::SumTypeKey,
369 system_sequence::SystemSequenceKey, table::TableKey,
370 transaction_version::TransactionVersionKey,
371 },
372 };
373
374 #[test]
375 fn test_table_columns() {
376 let key = Key::Columns(ColumnsKey {
377 column: ColumnId(42),
378 });
379
380 let encoded = key.encode();
381 let decoded = Key::decode(&encoded).expect("Failed to decode key");
382
383 match decoded {
384 Key::Columns(decoded_inner) => {
385 assert_eq!(decoded_inner.column, 42);
386 }
387 _ => unreachable!(),
388 }
389 }
390
391 #[test]
392 fn test_column() {
393 let key = Key::Column(ColumnKey {
394 primitive: PrimitiveId::table(1),
395 column: ColumnId(42),
396 });
397
398 let encoded = key.encode();
399 let decoded = Key::decode(&encoded).expect("Failed to decode key");
400
401 match decoded {
402 Key::Column(decoded_inner) => {
403 assert_eq!(decoded_inner.primitive, PrimitiveId::table(1));
404 assert_eq!(decoded_inner.column, 42);
405 }
406 _ => unreachable!(),
407 }
408 }
409
410 #[test]
411 fn test_column_property() {
412 let key = Key::TableColumnProperty(ColumnPropertyKey {
413 column: ColumnId(42),
414 property: ColumnPropertyId(999_999),
415 });
416
417 let encoded = key.encode();
418 let decoded = Key::decode(&encoded).expect("Failed to decode key");
419
420 match decoded {
421 Key::TableColumnProperty(decoded_inner) => {
422 assert_eq!(decoded_inner.column, 42);
423 assert_eq!(decoded_inner.property, 999_999);
424 }
425 _ => unreachable!(),
426 }
427 }
428
429 #[test]
430 fn test_namespace() {
431 let key = Key::Namespace(NamespaceKey {
432 namespace: NamespaceId(42),
433 });
434
435 let encoded = key.encode();
436 let decoded = Key::decode(&encoded).expect("Failed to decode key");
437
438 match decoded {
439 Key::Namespace(decoded_inner) => {
440 assert_eq!(decoded_inner.namespace, 42);
441 }
442 _ => unreachable!(),
443 }
444 }
445
446 #[test]
447 fn test_namespace_table() {
448 let key = Key::NamespaceTable(NamespaceTableKey {
449 namespace: NamespaceId(42),
450 table: TableId(999_999),
451 });
452
453 let encoded = key.encode();
454 let decoded = Key::decode(&encoded).expect("Failed to decode key");
455
456 match decoded {
457 Key::NamespaceTable(decoded_inner) => {
458 assert_eq!(decoded_inner.namespace, 42);
459 assert_eq!(decoded_inner.table, 999_999);
460 }
461 _ => unreachable!(),
462 }
463 }
464
465 #[test]
466 fn test_system_sequence() {
467 let key = Key::SystemSequence(SystemSequenceKey {
468 sequence: SequenceId(42),
469 });
470
471 let encoded = key.encode();
472 let decoded = Key::decode(&encoded).expect("Failed to decode key");
473
474 match decoded {
475 Key::SystemSequence(decoded_inner) => {
476 assert_eq!(decoded_inner.sequence, 42);
477 }
478 _ => unreachable!(),
479 }
480 }
481
482 #[test]
483 fn test_table() {
484 let key = Key::Table(TableKey {
485 table: TableId(42),
486 });
487
488 let encoded = key.encode();
489 let decoded = Key::decode(&encoded).expect("Failed to decode key");
490
491 match decoded {
492 Key::Table(decoded_inner) => {
493 assert_eq!(decoded_inner.table, 42);
494 }
495 _ => unreachable!(),
496 }
497 }
498
499 #[test]
500 fn test_index() {
501 let key = Key::Index(IndexKey {
502 primitive: PrimitiveId::table(42),
503 index: IndexId::primary(999_999),
504 });
505
506 let encoded = key.encode();
507 let decoded = Key::decode(&encoded).expect("Failed to decode key");
508
509 match decoded {
510 Key::Index(decoded_inner) => {
511 assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
512 assert_eq!(decoded_inner.index, 999_999);
513 }
514 _ => unreachable!(),
515 }
516 }
517
518 #[test]
519 fn test_row() {
520 let key = Key::Row(RowKey {
521 primitive: PrimitiveId::table(42),
522 row: RowNumber(999_999),
523 });
524
525 let encoded = key.encode();
526 let decoded = Key::decode(&encoded).expect("Failed to decode key");
527
528 match decoded {
529 Key::Row(decoded_inner) => {
530 assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
531 assert_eq!(decoded_inner.row, 999_999);
532 }
533 _ => unreachable!(),
534 }
535 }
536
537 #[test]
538 fn test_row_sequence() {
539 let key = Key::RowSequence(RowSequenceKey {
540 primitive: PrimitiveId::table(42),
541 });
542
543 let encoded = key.encode();
544 let decoded = Key::decode(&encoded).expect("Failed to decode key");
545
546 match decoded {
547 Key::RowSequence(decoded_inner) => {
548 assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
549 }
550 _ => unreachable!(),
551 }
552 }
553
554 #[test]
555 fn test_column_sequence() {
556 let key = Key::TableColumnSequence(ColumnSequenceKey {
557 primitive: PrimitiveId::table(42),
558 column: ColumnId(123),
559 });
560
561 let encoded = key.encode();
562 let decoded = Key::decode(&encoded).expect("Failed to decode key");
563
564 match decoded {
565 Key::TableColumnSequence(decoded_inner) => {
566 assert_eq!(decoded_inner.primitive, PrimitiveId::table(42));
567 assert_eq!(decoded_inner.column, 123);
568 }
569 _ => unreachable!(),
570 }
571 }
572
573 #[test]
574 fn test_transaction_version() {
575 let key = Key::TransactionVersion(TransactionVersionKey {});
576 let encoded = key.encode();
577 Key::decode(&encoded).expect("Failed to decode key");
578 }
579
580 #[test]
581 fn test_operator_state() {
582 let key = Key::FlowNodeState(FlowNodeStateKey {
583 node: FlowNodeId(0xCAFEBABE),
584 key: vec![1, 2, 3],
585 });
586
587 let encoded = key.encode();
588 let decoded = Key::decode(&encoded).expect("Failed to decode key");
589
590 match decoded {
591 Key::FlowNodeState(decoded_inner) => {
592 assert_eq!(decoded_inner.node, 0xCAFEBABE);
593 assert_eq!(decoded_inner.key, vec![1, 2, 3]);
594 }
595 _ => unreachable!(),
596 }
597 }
598
599 #[test]
600 fn test_sumtype_key() {
601 let key = Key::SumType(SumTypeKey {
602 sumtype: SumTypeId(42),
603 });
604
605 let encoded = key.encode();
606 let decoded = Key::decode(&encoded).expect("Failed to decode key");
607
608 match decoded {
609 Key::SumType(decoded_inner) => {
610 assert_eq!(decoded_inner.sumtype, 42);
611 }
612 _ => unreachable!(),
613 }
614 }
615
616 #[test]
617 fn test_namespace_sumtype_key() {
618 let key = Key::NamespaceSumType(NamespaceSumTypeKey {
619 namespace: NamespaceId(42),
620 sumtype: SumTypeId(999_999),
621 });
622
623 let encoded = key.encode();
624 let decoded = Key::decode(&encoded).expect("Failed to decode key");
625
626 match decoded {
627 Key::NamespaceSumType(decoded_inner) => {
628 assert_eq!(decoded_inner.namespace, 42);
629 assert_eq!(decoded_inner.sumtype, 999_999);
630 }
631 _ => unreachable!(),
632 }
633 }
634}