1pub use cdc_consumer::{CdcConsumerKey, CdcConsumerKeyRange, ToConsumerKey};
5pub use column::ColumnKey;
6pub use column_policy::ColumnPolicyKey;
7pub use column_sequence::ColumnSequenceKey;
8pub use columns::ColumnsKey;
9pub use dictionary::{
10 DictionaryEntryIndexKey, DictionaryEntryIndexKeyRange, DictionaryEntryKey, DictionaryKey, DictionarySequenceKey,
11};
12pub use flow::FlowKey;
13pub use flow_edge::{FlowEdgeByFlowKey, FlowEdgeKey};
14pub use flow_node::{FlowNodeByFlowKey, FlowNodeKey};
15pub use flow_node_internal_state::{FlowNodeInternalStateKey, FlowNodeInternalStateKeyRange};
16pub use flow_node_state::{FlowNodeStateKey, FlowNodeStateKeyRange};
17pub use index::{IndexKey, SourceIndexKeyRange};
18pub use index_entry::IndexEntryKey;
19pub use kind::KeyKind;
20pub use namespace::NamespaceKey;
21pub use namespace_dictionary::NamespaceDictionaryKey;
22pub use namespace_flow::NamespaceFlowKey;
23pub use namespace_ring_buffer::NamespaceRingBufferKey;
24pub use namespace_table::NamespaceTableKey;
25pub use namespace_view::NamespaceViewKey;
26pub use primary_key::PrimaryKeyKey;
27pub use retention_policy::{
28 OperatorRetentionPolicyKey, OperatorRetentionPolicyKeyRange, SourceRetentionPolicyKey,
29 SourceRetentionPolicyKeyRange,
30};
31pub use ring_buffer::{RingBufferKey, RingBufferMetadataKey};
32pub use row::{RowKey, RowKeyRange};
33pub use row_sequence::RowSequenceKey;
34pub use system_sequence::SystemSequenceKey;
35pub use system_version::{SystemVersion, SystemVersionKey};
36pub use table::TableKey;
37pub use transaction_version::TransactionVersionKey;
38pub use view::ViewKey;
39
40use crate::{EncodedKey, EncodedKeyRange, util::encoding::keycode};
41
42mod cdc_consumer;
43mod column;
44mod column_policy;
45mod column_sequence;
46mod columns;
47mod dictionary;
48mod flow;
49mod flow_edge;
50mod flow_node;
51mod flow_node_internal_state;
52mod flow_node_state;
53mod index;
54mod index_entry;
55mod kind;
56mod namespace;
57mod namespace_dictionary;
58mod namespace_flow;
59mod namespace_ring_buffer;
60mod namespace_table;
61mod namespace_view;
62mod primary_key;
63mod retention_policy;
64mod ring_buffer;
65mod row;
66mod row_sequence;
67mod system_sequence;
68mod system_version;
69mod table;
70mod transaction_version;
71mod view;
72
73#[derive(Debug)]
74pub enum Key {
75 CdcConsumer(CdcConsumerKey),
76 Namespace(NamespaceKey),
77 NamespaceTable(NamespaceTableKey),
78 NamespaceView(NamespaceViewKey),
79 NamespaceFlow(NamespaceFlowKey),
80 SystemSequence(SystemSequenceKey),
81 Table(TableKey),
82 Flow(FlowKey),
83 Column(ColumnKey),
84 Columns(ColumnsKey),
85 Index(IndexKey),
86 IndexEntry(IndexEntryKey),
87 FlowNodeState(FlowNodeStateKey),
88 FlowNodeInternalState(FlowNodeInternalStateKey),
89 PrimaryKey(PrimaryKeyKey),
90 Row(RowKey),
91 RowSequence(RowSequenceKey),
92 TableColumnSequence(ColumnSequenceKey),
93 TableColumnPolicy(ColumnPolicyKey),
94 SystemVersion(SystemVersionKey),
95 TransactionVersion(TransactionVersionKey),
96 View(ViewKey),
97 RingBuffer(RingBufferKey),
98 RingBufferMetadata(RingBufferMetadataKey),
99 NamespaceRingBuffer(NamespaceRingBufferKey),
100 SourceRetentionPolicy(SourceRetentionPolicyKey),
101 OperatorRetentionPolicy(OperatorRetentionPolicyKey),
102 Dictionary(DictionaryKey),
103 DictionaryEntry(DictionaryEntryKey),
104 DictionaryEntryIndex(DictionaryEntryIndexKey),
105 DictionarySequence(DictionarySequenceKey),
106 NamespaceDictionary(NamespaceDictionaryKey),
107}
108
109impl Key {
110 pub fn encode(&self) -> EncodedKey {
111 match &self {
112 Key::CdcConsumer(key) => key.encode(),
113 Key::Namespace(key) => key.encode(),
114 Key::NamespaceTable(key) => key.encode(),
115 Key::NamespaceView(key) => key.encode(),
116 Key::NamespaceFlow(key) => key.encode(),
117 Key::Table(key) => key.encode(),
118 Key::Flow(key) => key.encode(),
119 Key::Column(key) => key.encode(),
120 Key::Columns(key) => key.encode(),
121 Key::TableColumnPolicy(key) => key.encode(),
122 Key::Index(key) => key.encode(),
123 Key::IndexEntry(key) => key.encode(),
124 Key::FlowNodeState(key) => key.encode(),
125 Key::FlowNodeInternalState(key) => key.encode(),
126 Key::PrimaryKey(key) => key.encode(),
127 Key::Row(key) => key.encode(),
128 Key::RowSequence(key) => key.encode(),
129 Key::TableColumnSequence(key) => key.encode(),
130 Key::SystemSequence(key) => key.encode(),
131 Key::SystemVersion(key) => key.encode(),
132 Key::TransactionVersion(key) => key.encode(),
133 Key::View(key) => key.encode(),
134 Key::RingBuffer(key) => key.encode(),
135 Key::RingBufferMetadata(key) => key.encode(),
136 Key::NamespaceRingBuffer(key) => key.encode(),
137 Key::SourceRetentionPolicy(key) => key.encode(),
138 Key::OperatorRetentionPolicy(key) => key.encode(),
139 Key::Dictionary(key) => key.encode(),
140 Key::DictionaryEntry(key) => key.encode(),
141 Key::DictionaryEntryIndex(key) => key.encode(),
142 Key::DictionarySequence(key) => key.encode(),
143 Key::NamespaceDictionary(key) => key.encode(),
144 }
145 }
146}
147
148pub trait EncodableKey {
149 const KIND: KeyKind;
150
151 fn encode(&self) -> EncodedKey;
152
153 fn decode(key: &EncodedKey) -> Option<Self>
154 where
155 Self: Sized;
156}
157
158pub trait EncodableKeyRange {
159 const KIND: KeyKind;
160
161 fn start(&self) -> Option<EncodedKey>;
162
163 fn end(&self) -> Option<EncodedKey>;
164
165 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
166 where
167 Self: Sized;
168}
169
170impl Key {
171 pub fn kind(key: &EncodedKey) -> Option<KeyKind> {
172 if key.len() < 2 {
173 return None;
174 }
175
176 keycode::deserialize(&key[1..2]).ok()
177 }
178
179 pub fn decode(key: &EncodedKey) -> Option<Self> {
180 if key.len() < 2 {
181 return None;
182 }
183
184 let kind: KeyKind = keycode::deserialize(&key[1..2]).ok()?;
185 match kind {
186 KeyKind::CdcConsumer => CdcConsumerKey::decode(&key).map(Self::CdcConsumer),
187 KeyKind::Columns => ColumnsKey::decode(&key).map(Self::Columns),
188 KeyKind::ColumnPolicy => ColumnPolicyKey::decode(&key).map(Self::TableColumnPolicy),
189 KeyKind::Namespace => NamespaceKey::decode(&key).map(Self::Namespace),
190 KeyKind::NamespaceTable => NamespaceTableKey::decode(&key).map(Self::NamespaceTable),
191 KeyKind::NamespaceView => NamespaceViewKey::decode(&key).map(Self::NamespaceView),
192 KeyKind::NamespaceFlow => NamespaceFlowKey::decode(&key).map(Self::NamespaceFlow),
193 KeyKind::Table => TableKey::decode(&key).map(Self::Table),
194 KeyKind::Flow => FlowKey::decode(&key).map(Self::Flow),
195 KeyKind::Column => ColumnKey::decode(&key).map(Self::Column),
196 KeyKind::Index => IndexKey::decode(&key).map(Self::Index),
197 KeyKind::IndexEntry => IndexEntryKey::decode(&key).map(Self::IndexEntry),
198 KeyKind::FlowNodeState => FlowNodeStateKey::decode(&key).map(Self::FlowNodeState),
199 KeyKind::FlowNodeInternalState => {
200 FlowNodeInternalStateKey::decode(&key).map(Self::FlowNodeInternalState)
201 }
202 KeyKind::Row => RowKey::decode(&key).map(Self::Row),
203 KeyKind::RowSequence => RowSequenceKey::decode(&key).map(Self::RowSequence),
204 KeyKind::ColumnSequence => ColumnSequenceKey::decode(&key).map(Self::TableColumnSequence),
205 KeyKind::SystemSequence => SystemSequenceKey::decode(&key).map(Self::SystemSequence),
206 KeyKind::SystemVersion => SystemVersionKey::decode(&key).map(Self::SystemVersion),
207 KeyKind::TransactionVersion => {
208 TransactionVersionKey::decode(&key).map(Self::TransactionVersion)
209 }
210 KeyKind::View => ViewKey::decode(&key).map(Self::View),
211 KeyKind::PrimaryKey => PrimaryKeyKey::decode(&key).map(Self::PrimaryKey),
212 KeyKind::RingBuffer => RingBufferKey::decode(&key).map(Self::RingBuffer),
213 KeyKind::RingBufferMetadata => {
214 RingBufferMetadataKey::decode(&key).map(Self::RingBufferMetadata)
215 }
216 KeyKind::NamespaceRingBuffer => {
217 NamespaceRingBufferKey::decode(&key).map(Self::NamespaceRingBuffer)
218 }
219 KeyKind::SourceRetentionPolicy => {
220 SourceRetentionPolicyKey::decode(&key).map(Self::SourceRetentionPolicy)
221 }
222 KeyKind::OperatorRetentionPolicy => {
223 OperatorRetentionPolicyKey::decode(&key).map(Self::OperatorRetentionPolicy)
224 }
225 KeyKind::FlowNode | KeyKind::FlowNodeByFlow | KeyKind::FlowEdge | KeyKind::FlowEdgeByFlow => {
226 None
228 }
229 KeyKind::Dictionary => DictionaryKey::decode(&key).map(Self::Dictionary),
230 KeyKind::DictionaryEntry => DictionaryEntryKey::decode(&key).map(Self::DictionaryEntry),
231 KeyKind::DictionaryEntryIndex => {
232 DictionaryEntryIndexKey::decode(&key).map(Self::DictionaryEntryIndex)
233 }
234 KeyKind::DictionarySequence => {
235 DictionarySequenceKey::decode(&key).map(Self::DictionarySequence)
236 }
237 KeyKind::NamespaceDictionary => {
238 NamespaceDictionaryKey::decode(&key).map(Self::NamespaceDictionary)
239 }
240 }
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use reifydb_type::RowNumber;
247
248 use super::{
249 ColumnKey, ColumnPolicyKey, ColumnSequenceKey, ColumnsKey, FlowNodeStateKey, Key, NamespaceKey,
250 NamespaceTableKey, SystemSequenceKey, TableKey,
251 };
252 use crate::{
253 interface::{
254 FlowNodeId, SourceId,
255 catalog::{ColumnId, ColumnPolicyId, IndexId, NamespaceId, SequenceId, TableId},
256 },
257 key::{
258 index::IndexKey, row::RowKey, row_sequence::RowSequenceKey,
259 transaction_version::TransactionVersionKey,
260 },
261 };
262
263 #[test]
264 fn test_table_columns() {
265 let key = Key::Columns(ColumnsKey {
266 column: ColumnId(42),
267 });
268
269 let encoded = key.encode();
270 let decoded = Key::decode(&encoded).expect("Failed to decode key");
271
272 match decoded {
273 Key::Columns(decoded_inner) => {
274 assert_eq!(decoded_inner.column, 42);
275 }
276 _ => unreachable!(),
277 }
278 }
279
280 #[test]
281 fn test_column() {
282 let key = Key::Column(ColumnKey {
283 source: SourceId::table(1),
284 column: ColumnId(42),
285 });
286
287 let encoded = key.encode();
288 let decoded = Key::decode(&encoded).expect("Failed to decode key");
289
290 match decoded {
291 Key::Column(decoded_inner) => {
292 assert_eq!(decoded_inner.source, SourceId::table(1));
293 assert_eq!(decoded_inner.column, 42);
294 }
295 _ => unreachable!(),
296 }
297 }
298
299 #[test]
300 fn test_column_policy() {
301 let key = Key::TableColumnPolicy(ColumnPolicyKey {
302 column: ColumnId(42),
303 policy: ColumnPolicyId(999_999),
304 });
305
306 let encoded = key.encode();
307 let decoded = Key::decode(&encoded).expect("Failed to decode key");
308
309 match decoded {
310 Key::TableColumnPolicy(decoded_inner) => {
311 assert_eq!(decoded_inner.column, 42);
312 assert_eq!(decoded_inner.policy, 999_999);
313 }
314 _ => unreachable!(),
315 }
316 }
317
318 #[test]
319 fn test_namespace() {
320 let key = Key::Namespace(NamespaceKey {
321 namespace: NamespaceId(42),
322 });
323
324 let encoded = key.encode();
325 let decoded = Key::decode(&encoded).expect("Failed to decode key");
326
327 match decoded {
328 Key::Namespace(decoded_inner) => {
329 assert_eq!(decoded_inner.namespace, 42);
330 }
331 _ => unreachable!(),
332 }
333 }
334
335 #[test]
336 fn test_namespace_table() {
337 let key = Key::NamespaceTable(NamespaceTableKey {
338 namespace: NamespaceId(42),
339 table: TableId(999_999),
340 });
341
342 let encoded = key.encode();
343 let decoded = Key::decode(&encoded).expect("Failed to decode key");
344
345 match decoded {
346 Key::NamespaceTable(decoded_inner) => {
347 assert_eq!(decoded_inner.namespace, 42);
348 assert_eq!(decoded_inner.table, 999_999);
349 }
350 _ => unreachable!(),
351 }
352 }
353
354 #[test]
355 fn test_system_sequence() {
356 let key = Key::SystemSequence(SystemSequenceKey {
357 sequence: SequenceId(42),
358 });
359
360 let encoded = key.encode();
361 let decoded = Key::decode(&encoded).expect("Failed to decode key");
362
363 match decoded {
364 Key::SystemSequence(decoded_inner) => {
365 assert_eq!(decoded_inner.sequence, 42);
366 }
367 _ => unreachable!(),
368 }
369 }
370
371 #[test]
372 fn test_table() {
373 let key = Key::Table(TableKey {
374 table: TableId(42),
375 });
376
377 let encoded = key.encode();
378 let decoded = Key::decode(&encoded).expect("Failed to decode key");
379
380 match decoded {
381 Key::Table(decoded_inner) => {
382 assert_eq!(decoded_inner.table, 42);
383 }
384 _ => unreachable!(),
385 }
386 }
387
388 #[test]
389 fn test_index() {
390 let key = Key::Index(IndexKey {
391 source: SourceId::table(42),
392 index: IndexId::primary(999_999),
393 });
394
395 let encoded = key.encode();
396 let decoded = Key::decode(&encoded).expect("Failed to decode key");
397
398 match decoded {
399 Key::Index(decoded_inner) => {
400 assert_eq!(decoded_inner.source, SourceId::table(42));
401 assert_eq!(decoded_inner.index, 999_999);
402 }
403 _ => unreachable!(),
404 }
405 }
406
407 #[test]
408 fn test_row() {
409 let key = Key::Row(RowKey {
410 source: SourceId::table(42),
411 row: RowNumber(999_999),
412 });
413
414 let encoded = key.encode();
415 let decoded = Key::decode(&encoded).expect("Failed to decode key");
416
417 match decoded {
418 Key::Row(decoded_inner) => {
419 assert_eq!(decoded_inner.source, SourceId::table(42));
420 assert_eq!(decoded_inner.row, 999_999);
421 }
422 _ => unreachable!(),
423 }
424 }
425
426 #[test]
427 fn test_row_sequence() {
428 let key = Key::RowSequence(RowSequenceKey {
429 source: SourceId::table(42),
430 });
431
432 let encoded = key.encode();
433 let decoded = Key::decode(&encoded).expect("Failed to decode key");
434
435 match decoded {
436 Key::RowSequence(decoded_inner) => {
437 assert_eq!(decoded_inner.source, SourceId::table(42));
438 }
439 _ => unreachable!(),
440 }
441 }
442
443 #[test]
444 fn test_column_sequence() {
445 let key = Key::TableColumnSequence(ColumnSequenceKey {
446 source: SourceId::table(42),
447 column: ColumnId(123),
448 });
449
450 let encoded = key.encode();
451 let decoded = Key::decode(&encoded).expect("Failed to decode key");
452
453 match decoded {
454 Key::TableColumnSequence(decoded_inner) => {
455 assert_eq!(decoded_inner.source, SourceId::table(42));
456 assert_eq!(decoded_inner.column, 123);
457 }
458 _ => unreachable!(),
459 }
460 }
461
462 #[test]
463 fn test_transaction_version() {
464 let key = Key::TransactionVersion(TransactionVersionKey {});
465 let encoded = key.encode();
466 Key::decode(&encoded).expect("Failed to decode key");
467 }
468
469 #[test]
470 fn test_operator_state() {
471 let key = Key::FlowNodeState(FlowNodeStateKey {
472 node: FlowNodeId(0xCAFEBABE),
473 key: vec![1, 2, 3],
474 });
475
476 let encoded = key.encode();
477 let decoded = Key::decode(&encoded).expect("Failed to decode key");
478
479 match decoded {
480 Key::FlowNodeState(decoded_inner) => {
481 assert_eq!(decoded_inner.node, 0xCAFEBABE);
482 assert_eq!(decoded_inner.key, vec![1, 2, 3]);
483 }
484 _ => unreachable!(),
485 }
486 }
487}