1pub use cdc_consumer::{CdcConsumerKey, CdcConsumerKeyRange, ToConsumerKey};
5pub use column::ColumnKey;
6pub use column_policy::ColumnPolicyKey;
7pub use column_sequence::ColumnSequenceKey;
8pub use columns::ColumnsKey;
9pub use flow::FlowKey;
10pub use flow_node_state::{FlowNodeStateKey, FlowNodeStateKeyRange};
11pub use index::{IndexKey, SourceIndexKeyRange};
12pub use index_entry::IndexEntryKey;
13pub use kind::KeyKind;
14pub use namespace::NamespaceKey;
15pub use namespace_flow::NamespaceFlowKey;
16pub use namespace_ring_buffer::NamespaceRingBufferKey;
17pub use namespace_table::NamespaceTableKey;
18pub use namespace_view::NamespaceViewKey;
19pub use primary_key::PrimaryKeyKey;
20pub use retention_policy::{
21 OperatorRetentionPolicyKey, OperatorRetentionPolicyKeyRange, SourceRetentionPolicyKey,
22 SourceRetentionPolicyKeyRange,
23};
24pub use ring_buffer::{RingBufferKey, RingBufferMetadataKey};
25pub use row::{RowKey, RowKeyRange};
26pub use row_sequence::RowSequenceKey;
27pub use system_sequence::SystemSequenceKey;
28pub use system_version::{SystemVersion, SystemVersionKey};
29pub use table::TableKey;
30pub use transaction_version::TransactionVersionKey;
31pub use view::ViewKey;
32
33use crate::{EncodedKey, EncodedKeyRange, util::encoding::keycode};
34
35mod cdc_consumer;
36mod column;
37mod column_policy;
38mod column_sequence;
39mod columns;
40mod flow;
41mod flow_node_state;
42mod index;
43mod index_entry;
44mod kind;
45mod namespace;
46mod namespace_flow;
47mod namespace_ring_buffer;
48mod namespace_table;
49mod namespace_view;
50mod primary_key;
51mod retention_policy;
52mod ring_buffer;
53mod row;
54mod row_sequence;
55mod system_sequence;
56mod system_version;
57mod table;
58mod transaction_version;
59mod view;
60
61#[derive(Debug)]
62pub enum Key {
63 CdcConsumer(CdcConsumerKey),
64 Namespace(NamespaceKey),
65 NamespaceTable(NamespaceTableKey),
66 NamespaceView(NamespaceViewKey),
67 NamespaceFlow(NamespaceFlowKey),
68 SystemSequence(SystemSequenceKey),
69 Table(TableKey),
70 Flow(FlowKey),
71 Column(ColumnKey),
72 Columns(ColumnsKey),
73 Index(IndexKey),
74 IndexEntry(IndexEntryKey),
75 FlowNodeState(FlowNodeStateKey),
76 PrimaryKey(PrimaryKeyKey),
77 Row(RowKey),
78 RowSequence(RowSequenceKey),
79 TableColumnSequence(ColumnSequenceKey),
80 TableColumnPolicy(ColumnPolicyKey),
81 SystemVersion(SystemVersionKey),
82 TransactionVersion(TransactionVersionKey),
83 View(ViewKey),
84 RingBuffer(RingBufferKey),
85 RingBufferMetadata(RingBufferMetadataKey),
86 NamespaceRingBuffer(NamespaceRingBufferKey),
87 SourceRetentionPolicy(SourceRetentionPolicyKey),
88 OperatorRetentionPolicy(OperatorRetentionPolicyKey),
89}
90
91impl Key {
92 pub fn encode(&self) -> EncodedKey {
93 match &self {
94 Key::CdcConsumer(key) => key.encode(),
95 Key::Namespace(key) => key.encode(),
96 Key::NamespaceTable(key) => key.encode(),
97 Key::NamespaceView(key) => key.encode(),
98 Key::NamespaceFlow(key) => key.encode(),
99 Key::Table(key) => key.encode(),
100 Key::Flow(key) => key.encode(),
101 Key::Column(key) => key.encode(),
102 Key::Columns(key) => key.encode(),
103 Key::TableColumnPolicy(key) => key.encode(),
104 Key::Index(key) => key.encode(),
105 Key::IndexEntry(key) => key.encode(),
106 Key::FlowNodeState(key) => key.encode(),
107 Key::PrimaryKey(key) => key.encode(),
108 Key::Row(key) => key.encode(),
109 Key::RowSequence(key) => key.encode(),
110 Key::TableColumnSequence(key) => key.encode(),
111 Key::SystemSequence(key) => key.encode(),
112 Key::SystemVersion(key) => key.encode(),
113 Key::TransactionVersion(key) => key.encode(),
114 Key::View(key) => key.encode(),
115 Key::RingBuffer(key) => key.encode(),
116 Key::RingBufferMetadata(key) => key.encode(),
117 Key::NamespaceRingBuffer(key) => key.encode(),
118 Key::SourceRetentionPolicy(key) => key.encode(),
119 Key::OperatorRetentionPolicy(key) => key.encode(),
120 }
121 }
122}
123
124pub trait EncodableKey {
125 const KIND: KeyKind;
126
127 fn encode(&self) -> EncodedKey;
128
129 fn decode(key: &EncodedKey) -> Option<Self>
130 where
131 Self: Sized;
132}
133
134pub trait EncodableKeyRange {
135 const KIND: KeyKind;
136
137 fn start(&self) -> Option<EncodedKey>;
138
139 fn end(&self) -> Option<EncodedKey>;
140
141 fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
142 where
143 Self: Sized;
144}
145
146impl Key {
147 pub fn kind(key: &EncodedKey) -> Option<KeyKind> {
148 if key.len() < 2 {
149 return None;
150 }
151
152 keycode::deserialize(&key[1..2]).ok()
153 }
154
155 pub fn decode(key: &EncodedKey) -> Option<Self> {
156 if key.len() < 2 {
157 return None;
158 }
159
160 let kind: KeyKind = keycode::deserialize(&key[1..2]).ok()?;
161 match kind {
162 KeyKind::CdcConsumer => CdcConsumerKey::decode(&key).map(Self::CdcConsumer),
163 KeyKind::Columns => ColumnsKey::decode(&key).map(Self::Columns),
164 KeyKind::ColumnPolicy => ColumnPolicyKey::decode(&key).map(Self::TableColumnPolicy),
165 KeyKind::Namespace => NamespaceKey::decode(&key).map(Self::Namespace),
166 KeyKind::NamespaceTable => NamespaceTableKey::decode(&key).map(Self::NamespaceTable),
167 KeyKind::NamespaceView => NamespaceViewKey::decode(&key).map(Self::NamespaceView),
168 KeyKind::NamespaceFlow => NamespaceFlowKey::decode(&key).map(Self::NamespaceFlow),
169 KeyKind::Table => TableKey::decode(&key).map(Self::Table),
170 KeyKind::Flow => FlowKey::decode(&key).map(Self::Flow),
171 KeyKind::Column => ColumnKey::decode(&key).map(Self::Column),
172 KeyKind::Index => IndexKey::decode(&key).map(Self::Index),
173 KeyKind::IndexEntry => IndexEntryKey::decode(&key).map(Self::IndexEntry),
174 KeyKind::FlowNodeState => FlowNodeStateKey::decode(&key).map(Self::FlowNodeState),
175 KeyKind::Row => RowKey::decode(&key).map(Self::Row),
176 KeyKind::RowSequence => RowSequenceKey::decode(&key).map(Self::RowSequence),
177 KeyKind::ColumnSequence => ColumnSequenceKey::decode(&key).map(Self::TableColumnSequence),
178 KeyKind::SystemSequence => SystemSequenceKey::decode(&key).map(Self::SystemSequence),
179 KeyKind::SystemVersion => SystemVersionKey::decode(&key).map(Self::SystemVersion),
180 KeyKind::TransactionVersion => {
181 TransactionVersionKey::decode(&key).map(Self::TransactionVersion)
182 }
183 KeyKind::View => ViewKey::decode(&key).map(Self::View),
184 KeyKind::PrimaryKey => PrimaryKeyKey::decode(&key).map(Self::PrimaryKey),
185 KeyKind::RingBuffer => RingBufferKey::decode(&key).map(Self::RingBuffer),
186 KeyKind::RingBufferMetadata => {
187 RingBufferMetadataKey::decode(&key).map(Self::RingBufferMetadata)
188 }
189 KeyKind::NamespaceRingBuffer => {
190 NamespaceRingBufferKey::decode(&key).map(Self::NamespaceRingBuffer)
191 }
192 KeyKind::SourceRetentionPolicy => {
193 SourceRetentionPolicyKey::decode(&key).map(Self::SourceRetentionPolicy)
194 }
195 KeyKind::OperatorRetentionPolicy => {
196 OperatorRetentionPolicyKey::decode(&key).map(Self::OperatorRetentionPolicy)
197 }
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use reifydb_type::RowNumber;
205
206 use super::{
207 ColumnKey, ColumnPolicyKey, ColumnSequenceKey, ColumnsKey, FlowNodeStateKey, Key, NamespaceKey,
208 NamespaceTableKey, SystemSequenceKey, TableKey,
209 };
210 use crate::{
211 interface::{
212 FlowNodeId, SourceId,
213 catalog::{ColumnId, ColumnPolicyId, IndexId, NamespaceId, SequenceId, TableId},
214 },
215 key::{
216 index::IndexKey, row::RowKey, row_sequence::RowSequenceKey,
217 transaction_version::TransactionVersionKey,
218 },
219 };
220
221 #[test]
222 fn test_table_columns() {
223 let key = Key::Columns(ColumnsKey {
224 column: ColumnId(42),
225 });
226
227 let encoded = key.encode();
228 let decoded = Key::decode(&encoded).expect("Failed to decode key");
229
230 match decoded {
231 Key::Columns(decoded_inner) => {
232 assert_eq!(decoded_inner.column, 42);
233 }
234 _ => unreachable!(),
235 }
236 }
237
238 #[test]
239 fn test_column() {
240 let key = Key::Column(ColumnKey {
241 source: SourceId::table(1),
242 column: ColumnId(42),
243 });
244
245 let encoded = key.encode();
246 let decoded = Key::decode(&encoded).expect("Failed to decode key");
247
248 match decoded {
249 Key::Column(decoded_inner) => {
250 assert_eq!(decoded_inner.source, SourceId::table(1));
251 assert_eq!(decoded_inner.column, 42);
252 }
253 _ => unreachable!(),
254 }
255 }
256
257 #[test]
258 fn test_column_policy() {
259 let key = Key::TableColumnPolicy(ColumnPolicyKey {
260 column: ColumnId(42),
261 policy: ColumnPolicyId(999_999),
262 });
263
264 let encoded = key.encode();
265 let decoded = Key::decode(&encoded).expect("Failed to decode key");
266
267 match decoded {
268 Key::TableColumnPolicy(decoded_inner) => {
269 assert_eq!(decoded_inner.column, 42);
270 assert_eq!(decoded_inner.policy, 999_999);
271 }
272 _ => unreachable!(),
273 }
274 }
275
276 #[test]
277 fn test_namespace() {
278 let key = Key::Namespace(NamespaceKey {
279 namespace: NamespaceId(42),
280 });
281
282 let encoded = key.encode();
283 let decoded = Key::decode(&encoded).expect("Failed to decode key");
284
285 match decoded {
286 Key::Namespace(decoded_inner) => {
287 assert_eq!(decoded_inner.namespace, 42);
288 }
289 _ => unreachable!(),
290 }
291 }
292
293 #[test]
294 fn test_namespace_table() {
295 let key = Key::NamespaceTable(NamespaceTableKey {
296 namespace: NamespaceId(42),
297 table: TableId(999_999),
298 });
299
300 let encoded = key.encode();
301 let decoded = Key::decode(&encoded).expect("Failed to decode key");
302
303 match decoded {
304 Key::NamespaceTable(decoded_inner) => {
305 assert_eq!(decoded_inner.namespace, 42);
306 assert_eq!(decoded_inner.table, 999_999);
307 }
308 _ => unreachable!(),
309 }
310 }
311
312 #[test]
313 fn test_system_sequence() {
314 let key = Key::SystemSequence(SystemSequenceKey {
315 sequence: SequenceId(42),
316 });
317
318 let encoded = key.encode();
319 let decoded = Key::decode(&encoded).expect("Failed to decode key");
320
321 match decoded {
322 Key::SystemSequence(decoded_inner) => {
323 assert_eq!(decoded_inner.sequence, 42);
324 }
325 _ => unreachable!(),
326 }
327 }
328
329 #[test]
330 fn test_table() {
331 let key = Key::Table(TableKey {
332 table: TableId(42),
333 });
334
335 let encoded = key.encode();
336 let decoded = Key::decode(&encoded).expect("Failed to decode key");
337
338 match decoded {
339 Key::Table(decoded_inner) => {
340 assert_eq!(decoded_inner.table, 42);
341 }
342 _ => unreachable!(),
343 }
344 }
345
346 #[test]
347 fn test_index() {
348 let key = Key::Index(IndexKey {
349 source: SourceId::table(42),
350 index: IndexId::primary(999_999),
351 });
352
353 let encoded = key.encode();
354 let decoded = Key::decode(&encoded).expect("Failed to decode key");
355
356 match decoded {
357 Key::Index(decoded_inner) => {
358 assert_eq!(decoded_inner.source, SourceId::table(42));
359 assert_eq!(decoded_inner.index, 999_999);
360 }
361 _ => unreachable!(),
362 }
363 }
364
365 #[test]
366 fn test_row() {
367 let key = Key::Row(RowKey {
368 source: SourceId::table(42),
369 row: RowNumber(999_999),
370 });
371
372 let encoded = key.encode();
373 let decoded = Key::decode(&encoded).expect("Failed to decode key");
374
375 match decoded {
376 Key::Row(decoded_inner) => {
377 assert_eq!(decoded_inner.source, SourceId::table(42));
378 assert_eq!(decoded_inner.row, 999_999);
379 }
380 _ => unreachable!(),
381 }
382 }
383
384 #[test]
385 fn test_row_sequence() {
386 let key = Key::RowSequence(RowSequenceKey {
387 source: SourceId::table(42),
388 });
389
390 let encoded = key.encode();
391 let decoded = Key::decode(&encoded).expect("Failed to decode key");
392
393 match decoded {
394 Key::RowSequence(decoded_inner) => {
395 assert_eq!(decoded_inner.source, SourceId::table(42));
396 }
397 _ => unreachable!(),
398 }
399 }
400
401 #[test]
402 fn test_column_sequence() {
403 let key = Key::TableColumnSequence(ColumnSequenceKey {
404 source: SourceId::table(42),
405 column: ColumnId(123),
406 });
407
408 let encoded = key.encode();
409 let decoded = Key::decode(&encoded).expect("Failed to decode key");
410
411 match decoded {
412 Key::TableColumnSequence(decoded_inner) => {
413 assert_eq!(decoded_inner.source, SourceId::table(42));
414 assert_eq!(decoded_inner.column, 123);
415 }
416 _ => unreachable!(),
417 }
418 }
419
420 #[test]
421 fn test_transaction_version() {
422 let key = Key::TransactionVersion(TransactionVersionKey {});
423 let encoded = key.encode();
424 Key::decode(&encoded).expect("Failed to decode key");
425 }
426
427 #[test]
428 fn test_operator_state() {
429 let key = Key::FlowNodeState(FlowNodeStateKey {
430 node: FlowNodeId(0xCAFEBABE),
431 key: vec![1, 2, 3],
432 });
433
434 let encoded = key.encode();
435 let decoded = Key::decode(&encoded).expect("Failed to decode key");
436
437 match decoded {
438 Key::FlowNodeState(decoded_inner) => {
439 assert_eq!(decoded_inner.node, 0xCAFEBABE);
440 assert_eq!(decoded_inner.key, vec![1, 2, 3]);
441 }
442 _ => unreachable!(),
443 }
444 }
445}