1pub use cdc_consumer::{CdcConsumerKey, CdcConsumerKeyRange, ToConsumerKey};
5pub use column::ColumnKey;
6pub use column_policy::ColumnPolicyKey;
7pub use column_sequence::ColumnSequenceKey;
8pub use columns::ColumnsKey;
9pub use dictionary::{
10 DictionaryEntryIndexKey, DictionaryEntryIndexKeyRange, DictionaryEntryKey, DictionaryKey, DictionarySequenceKey,
11};
12pub use flow::FlowKey;
13pub use flow_edge::{FlowEdgeByFlowKey, FlowEdgeKey};
14pub use flow_node::{FlowNodeByFlowKey, FlowNodeKey};
15pub use flow_node_internal_state::{FlowNodeInternalStateKey, FlowNodeInternalStateKeyRange};
16pub use flow_node_state::{FlowNodeStateKey, FlowNodeStateKeyRange};
17pub use index::{IndexKey, SourceIndexKeyRange};
18pub use index_entry::IndexEntryKey;
19pub use kind::KeyKind;
20pub use namespace::NamespaceKey;
21pub use namespace_dictionary::NamespaceDictionaryKey;
22pub use namespace_flow::NamespaceFlowKey;
23pub use namespace_ringbuffer::NamespaceRingBufferKey;
24pub use namespace_table::NamespaceTableKey;
25pub use namespace_view::NamespaceViewKey;
26pub use primary_key::PrimaryKeyKey;
27pub use retention_policy::{
28 OperatorRetentionPolicyKey, OperatorRetentionPolicyKeyRange, SourceRetentionPolicyKey,
29 SourceRetentionPolicyKeyRange,
30};
31pub use ringbuffer::{RingBufferKey, RingBufferMetadataKey};
32pub use row::{RowKey, RowKeyRange};
33pub use row_sequence::RowSequenceKey;
34pub use system_sequence::SystemSequenceKey;
35pub use system_version::{SystemVersion, SystemVersionKey};
36pub use table::TableKey;
37pub use transaction_version::TransactionVersionKey;
38pub use view::ViewKey;
39
40use crate::{EncodedKey, EncodedKeyRange, util::encoding::keycode};
41
42mod cdc_consumer;
43mod column;
44mod column_policy;
45mod column_sequence;
46mod columns;
47mod dictionary;
48mod flow;
49mod flow_edge;
50mod flow_node;
51mod flow_node_internal_state;
52mod flow_node_state;
53mod index;
54mod index_entry;
55mod kind;
56mod namespace;
57mod namespace_dictionary;
58mod namespace_flow;
59mod namespace_ringbuffer;
60mod namespace_table;
61mod namespace_view;
62mod primary_key;
63mod retention_policy;
64mod ringbuffer;
65mod row;
66mod row_sequence;
67mod system_sequence;
68mod system_version;
69mod table;
70mod transaction_version;
71mod view;
72
73#[derive(Debug)]
74pub enum Key {
75 CdcConsumer(CdcConsumerKey),
76 Namespace(NamespaceKey),
77 NamespaceTable(NamespaceTableKey),
78 NamespaceView(NamespaceViewKey),
79 NamespaceFlow(NamespaceFlowKey),
80 SystemSequence(SystemSequenceKey),
81 Table(TableKey),
82 Flow(FlowKey),
83 Column(ColumnKey),
84 Columns(ColumnsKey),
85 Index(IndexKey),
86 IndexEntry(IndexEntryKey),
87 FlowNodeState(FlowNodeStateKey),
88 FlowNodeInternalState(FlowNodeInternalStateKey),
89 PrimaryKey(PrimaryKeyKey),
90 Row(RowKey),
91 RowSequence(RowSequenceKey),
92 TableColumnSequence(ColumnSequenceKey),
93 TableColumnPolicy(ColumnPolicyKey),
94 SystemVersion(SystemVersionKey),
95 TransactionVersion(TransactionVersionKey),
96 View(ViewKey),
97 RingBuffer(RingBufferKey),
98 RingBufferMetadata(RingBufferMetadataKey),
99 NamespaceRingBuffer(NamespaceRingBufferKey),
100 SourceRetentionPolicy(SourceRetentionPolicyKey),
101 OperatorRetentionPolicy(OperatorRetentionPolicyKey),
102 Dictionary(DictionaryKey),
103 DictionaryEntry(DictionaryEntryKey),
104 DictionaryEntryIndex(DictionaryEntryIndexKey),
105 DictionarySequence(DictionarySequenceKey),
106 NamespaceDictionary(NamespaceDictionaryKey),
107}
108
109impl Key {
110 pub fn encode(&self) -> EncodedKey {
111 match &self {
112 Key::CdcConsumer(key) => key.encode(),
113 Key::Namespace(key) => key.encode(),
114 Key::NamespaceTable(key) => key.encode(),
115 Key::NamespaceView(key) => key.encode(),
116 Key::NamespaceFlow(key) => key.encode(),
117 Key::Table(key) => key.encode(),
118 Key::Flow(key) => key.encode(),
119 Key::Column(key) => key.encode(),
120 Key::Columns(key) => key.encode(),
121 Key::TableColumnPolicy(key) => key.encode(),
122 Key::Index(key) => key.encode(),
123 Key::IndexEntry(key) => key.encode(),
124 Key::FlowNodeState(key) => key.encode(),
125 Key::FlowNodeInternalState(key) => key.encode(),
126 Key::PrimaryKey(key) => key.encode(),
127 Key::Row(key) => key.encode(),
128 Key::RowSequence(key) => key.encode(),
129 Key::TableColumnSequence(key) => key.encode(),
130 Key::SystemSequence(key) => key.encode(),
131 Key::SystemVersion(key) => key.encode(),
132 Key::TransactionVersion(key) => key.encode(),
133 Key::View(key) => key.encode(),
134 Key::RingBuffer(key) => key.encode(),
135 Key::RingBufferMetadata(key) => key.encode(),
136 Key::NamespaceRingBuffer(key) => key.encode(),
137 Key::SourceRetentionPolicy(key) => key.encode(),
138 Key::OperatorRetentionPolicy(key) => key.encode(),
139 Key::Dictionary(key) => key.encode(),
140 Key::DictionaryEntry(key) => key.encode(),
141 Key::DictionaryEntryIndex(key) => key.encode(),
142 Key::DictionarySequence(key) => key.encode(),
143 Key::NamespaceDictionary(key) => key.encode(),
144 }
145 }
146}
147
148pub trait EncodableKey {
149 const KIND: KeyKind;
150
151 fn encode(&self) -> EncodedKey;
152
153 fn decode(key: &EncodedKey) -> Option<Self>
154 where
155 Self: Sized;
156}
157
158pub trait EncodableKeyRange {
159 const KIND: KeyKind;
160
161 fn start(&self) -> Option<EncodedKey>;
162
163 fn end(&self) -> Option<EncodedKey>;
164
165 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
166 where
167 Self: Sized;
168}
169
170impl Key {
171 pub fn kind(key: impl AsRef<[u8]>) -> Option<KeyKind> {
172 let key = key.as_ref();
173 if key.len() < 2 {
174 return None;
175 }
176
177 keycode::deserialize(&key[1..2]).ok()
178 }
179
180 pub fn decode(key: &EncodedKey) -> Option<Self> {
181 if key.len() < 2 {
182 return None;
183 }
184
185 let kind: KeyKind = keycode::deserialize(&key[1..2]).ok()?;
186 match kind {
187 KeyKind::CdcConsumer => CdcConsumerKey::decode(&key).map(Self::CdcConsumer),
188 KeyKind::Columns => ColumnsKey::decode(&key).map(Self::Columns),
189 KeyKind::ColumnPolicy => ColumnPolicyKey::decode(&key).map(Self::TableColumnPolicy),
190 KeyKind::Namespace => NamespaceKey::decode(&key).map(Self::Namespace),
191 KeyKind::NamespaceTable => NamespaceTableKey::decode(&key).map(Self::NamespaceTable),
192 KeyKind::NamespaceView => NamespaceViewKey::decode(&key).map(Self::NamespaceView),
193 KeyKind::NamespaceFlow => NamespaceFlowKey::decode(&key).map(Self::NamespaceFlow),
194 KeyKind::Table => TableKey::decode(&key).map(Self::Table),
195 KeyKind::Flow => FlowKey::decode(&key).map(Self::Flow),
196 KeyKind::Column => ColumnKey::decode(&key).map(Self::Column),
197 KeyKind::Index => IndexKey::decode(&key).map(Self::Index),
198 KeyKind::IndexEntry => IndexEntryKey::decode(&key).map(Self::IndexEntry),
199 KeyKind::FlowNodeState => FlowNodeStateKey::decode(&key).map(Self::FlowNodeState),
200 KeyKind::FlowNodeInternalState => {
201 FlowNodeInternalStateKey::decode(&key).map(Self::FlowNodeInternalState)
202 }
203 KeyKind::Row => RowKey::decode(&key).map(Self::Row),
204 KeyKind::RowSequence => RowSequenceKey::decode(&key).map(Self::RowSequence),
205 KeyKind::ColumnSequence => ColumnSequenceKey::decode(&key).map(Self::TableColumnSequence),
206 KeyKind::SystemSequence => SystemSequenceKey::decode(&key).map(Self::SystemSequence),
207 KeyKind::SystemVersion => SystemVersionKey::decode(&key).map(Self::SystemVersion),
208 KeyKind::TransactionVersion => {
209 TransactionVersionKey::decode(&key).map(Self::TransactionVersion)
210 }
211 KeyKind::View => ViewKey::decode(&key).map(Self::View),
212 KeyKind::PrimaryKey => PrimaryKeyKey::decode(&key).map(Self::PrimaryKey),
213 KeyKind::RingBuffer => RingBufferKey::decode(&key).map(Self::RingBuffer),
214 KeyKind::RingBufferMetadata => {
215 RingBufferMetadataKey::decode(&key).map(Self::RingBufferMetadata)
216 }
217 KeyKind::NamespaceRingBuffer => {
218 NamespaceRingBufferKey::decode(&key).map(Self::NamespaceRingBuffer)
219 }
220 KeyKind::SourceRetentionPolicy => {
221 SourceRetentionPolicyKey::decode(&key).map(Self::SourceRetentionPolicy)
222 }
223 KeyKind::OperatorRetentionPolicy => {
224 OperatorRetentionPolicyKey::decode(&key).map(Self::OperatorRetentionPolicy)
225 }
226 KeyKind::FlowNode | KeyKind::FlowNodeByFlow | KeyKind::FlowEdge | KeyKind::FlowEdgeByFlow => {
227 None
229 }
230 KeyKind::Dictionary => DictionaryKey::decode(&key).map(Self::Dictionary),
231 KeyKind::DictionaryEntry => DictionaryEntryKey::decode(&key).map(Self::DictionaryEntry),
232 KeyKind::DictionaryEntryIndex => {
233 DictionaryEntryIndexKey::decode(&key).map(Self::DictionaryEntryIndex)
234 }
235 KeyKind::DictionarySequence => {
236 DictionarySequenceKey::decode(&key).map(Self::DictionarySequence)
237 }
238 KeyKind::NamespaceDictionary => {
239 NamespaceDictionaryKey::decode(&key).map(Self::NamespaceDictionary)
240 }
241 KeyKind::StorageTracker => {
242 None
244 }
245 }
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use reifydb_type::RowNumber;
252
253 use super::{
254 ColumnKey, ColumnPolicyKey, ColumnSequenceKey, ColumnsKey, FlowNodeStateKey, Key, NamespaceKey,
255 NamespaceTableKey, SystemSequenceKey, TableKey,
256 };
257 use crate::{
258 interface::{
259 FlowNodeId, SourceId,
260 catalog::{ColumnId, ColumnPolicyId, IndexId, NamespaceId, SequenceId, TableId},
261 },
262 key::{
263 index::IndexKey, row::RowKey, row_sequence::RowSequenceKey,
264 transaction_version::TransactionVersionKey,
265 },
266 };
267
268 #[test]
269 fn test_table_columns() {
270 let key = Key::Columns(ColumnsKey {
271 column: ColumnId(42),
272 });
273
274 let encoded = key.encode();
275 let decoded = Key::decode(&encoded).expect("Failed to decode key");
276
277 match decoded {
278 Key::Columns(decoded_inner) => {
279 assert_eq!(decoded_inner.column, 42);
280 }
281 _ => unreachable!(),
282 }
283 }
284
285 #[test]
286 fn test_column() {
287 let key = Key::Column(ColumnKey {
288 source: SourceId::table(1),
289 column: ColumnId(42),
290 });
291
292 let encoded = key.encode();
293 let decoded = Key::decode(&encoded).expect("Failed to decode key");
294
295 match decoded {
296 Key::Column(decoded_inner) => {
297 assert_eq!(decoded_inner.source, SourceId::table(1));
298 assert_eq!(decoded_inner.column, 42);
299 }
300 _ => unreachable!(),
301 }
302 }
303
304 #[test]
305 fn test_column_policy() {
306 let key = Key::TableColumnPolicy(ColumnPolicyKey {
307 column: ColumnId(42),
308 policy: ColumnPolicyId(999_999),
309 });
310
311 let encoded = key.encode();
312 let decoded = Key::decode(&encoded).expect("Failed to decode key");
313
314 match decoded {
315 Key::TableColumnPolicy(decoded_inner) => {
316 assert_eq!(decoded_inner.column, 42);
317 assert_eq!(decoded_inner.policy, 999_999);
318 }
319 _ => unreachable!(),
320 }
321 }
322
323 #[test]
324 fn test_namespace() {
325 let key = Key::Namespace(NamespaceKey {
326 namespace: NamespaceId(42),
327 });
328
329 let encoded = key.encode();
330 let decoded = Key::decode(&encoded).expect("Failed to decode key");
331
332 match decoded {
333 Key::Namespace(decoded_inner) => {
334 assert_eq!(decoded_inner.namespace, 42);
335 }
336 _ => unreachable!(),
337 }
338 }
339
340 #[test]
341 fn test_namespace_table() {
342 let key = Key::NamespaceTable(NamespaceTableKey {
343 namespace: NamespaceId(42),
344 table: TableId(999_999),
345 });
346
347 let encoded = key.encode();
348 let decoded = Key::decode(&encoded).expect("Failed to decode key");
349
350 match decoded {
351 Key::NamespaceTable(decoded_inner) => {
352 assert_eq!(decoded_inner.namespace, 42);
353 assert_eq!(decoded_inner.table, 999_999);
354 }
355 _ => unreachable!(),
356 }
357 }
358
359 #[test]
360 fn test_system_sequence() {
361 let key = Key::SystemSequence(SystemSequenceKey {
362 sequence: SequenceId(42),
363 });
364
365 let encoded = key.encode();
366 let decoded = Key::decode(&encoded).expect("Failed to decode key");
367
368 match decoded {
369 Key::SystemSequence(decoded_inner) => {
370 assert_eq!(decoded_inner.sequence, 42);
371 }
372 _ => unreachable!(),
373 }
374 }
375
376 #[test]
377 fn test_table() {
378 let key = Key::Table(TableKey {
379 table: TableId(42),
380 });
381
382 let encoded = key.encode();
383 let decoded = Key::decode(&encoded).expect("Failed to decode key");
384
385 match decoded {
386 Key::Table(decoded_inner) => {
387 assert_eq!(decoded_inner.table, 42);
388 }
389 _ => unreachable!(),
390 }
391 }
392
393 #[test]
394 fn test_index() {
395 let key = Key::Index(IndexKey {
396 source: SourceId::table(42),
397 index: IndexId::primary(999_999),
398 });
399
400 let encoded = key.encode();
401 let decoded = Key::decode(&encoded).expect("Failed to decode key");
402
403 match decoded {
404 Key::Index(decoded_inner) => {
405 assert_eq!(decoded_inner.source, SourceId::table(42));
406 assert_eq!(decoded_inner.index, 999_999);
407 }
408 _ => unreachable!(),
409 }
410 }
411
412 #[test]
413 fn test_row() {
414 let key = Key::Row(RowKey {
415 source: SourceId::table(42),
416 row: RowNumber(999_999),
417 });
418
419 let encoded = key.encode();
420 let decoded = Key::decode(&encoded).expect("Failed to decode key");
421
422 match decoded {
423 Key::Row(decoded_inner) => {
424 assert_eq!(decoded_inner.source, SourceId::table(42));
425 assert_eq!(decoded_inner.row, 999_999);
426 }
427 _ => unreachable!(),
428 }
429 }
430
431 #[test]
432 fn test_row_sequence() {
433 let key = Key::RowSequence(RowSequenceKey {
434 source: SourceId::table(42),
435 });
436
437 let encoded = key.encode();
438 let decoded = Key::decode(&encoded).expect("Failed to decode key");
439
440 match decoded {
441 Key::RowSequence(decoded_inner) => {
442 assert_eq!(decoded_inner.source, SourceId::table(42));
443 }
444 _ => unreachable!(),
445 }
446 }
447
448 #[test]
449 fn test_column_sequence() {
450 let key = Key::TableColumnSequence(ColumnSequenceKey {
451 source: SourceId::table(42),
452 column: ColumnId(123),
453 });
454
455 let encoded = key.encode();
456 let decoded = Key::decode(&encoded).expect("Failed to decode key");
457
458 match decoded {
459 Key::TableColumnSequence(decoded_inner) => {
460 assert_eq!(decoded_inner.source, SourceId::table(42));
461 assert_eq!(decoded_inner.column, 123);
462 }
463 _ => unreachable!(),
464 }
465 }
466
467 #[test]
468 fn test_transaction_version() {
469 let key = Key::TransactionVersion(TransactionVersionKey {});
470 let encoded = key.encode();
471 Key::decode(&encoded).expect("Failed to decode key");
472 }
473
474 #[test]
475 fn test_operator_state() {
476 let key = Key::FlowNodeState(FlowNodeStateKey {
477 node: FlowNodeId(0xCAFEBABE),
478 key: vec![1, 2, 3],
479 });
480
481 let encoded = key.encode();
482 let decoded = Key::decode(&encoded).expect("Failed to decode key");
483
484 match decoded {
485 Key::FlowNodeState(decoded_inner) => {
486 assert_eq!(decoded_inner.node, 0xCAFEBABE);
487 assert_eq!(decoded_inner.key, vec![1, 2, 3]);
488 }
489 _ => unreachable!(),
490 }
491 }
492}