reifydb_core/key/
cdc_consumer.rs1use super::{EncodableKey, KeyKind};
5use crate::{
6 EncodedKey, EncodedKeyRange,
7 interface::CdcConsumerId,
8 util::encoding::keycode::{KeyDeserializer, KeySerializer},
9};
10
11pub trait ToConsumerKey {
13 fn to_consumer_key(&self) -> EncodedKey;
14}
15
16impl ToConsumerKey for EncodedKey {
17 fn to_consumer_key(&self) -> EncodedKey {
18 self.clone()
19 }
20}
21
22impl ToConsumerKey for CdcConsumerId {
23 fn to_consumer_key(&self) -> EncodedKey {
24 CdcConsumerKey {
25 consumer: self.clone(),
26 }
27 .encode()
28 }
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
32pub struct CdcConsumerKey {
33 pub consumer: CdcConsumerId,
34}
35
36pub const VERSION_BYTE: u8 = 1;
37
38impl EncodableKey for CdcConsumerKey {
39 const KIND: KeyKind = KeyKind::CdcConsumer;
40
41 fn encode(&self) -> EncodedKey {
42 let mut serializer = KeySerializer::new();
43 serializer.extend_u8(VERSION_BYTE).extend_u8(Self::KIND as u8).extend_str(&self.consumer);
44 serializer.to_encoded_key()
45 }
46
47 fn decode(key: &EncodedKey) -> Option<Self>
48 where
49 Self: Sized,
50 {
51 let mut de = KeyDeserializer::from_bytes(key.as_slice());
52
53 let version = de.read_u8().ok()?;
54 if version != VERSION_BYTE {
55 return None;
56 }
57
58 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
59 if kind != Self::KIND {
60 return None;
61 }
62
63 let consumer_id = de.read_str().ok()?;
64
65 Some(Self {
66 consumer: CdcConsumerId(consumer_id),
67 })
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct CdcConsumerKeyRange;
73
74impl CdcConsumerKeyRange {
75 pub fn full_scan() -> EncodedKeyRange {
80 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
81 }
82
83 fn start() -> EncodedKey {
84 let mut serializer = KeySerializer::with_capacity(2);
85 serializer.extend_u8(VERSION_BYTE).extend_u8(CdcConsumerKey::KIND as u8);
86 serializer.to_encoded_key()
87 }
88
89 fn end() -> EncodedKey {
90 let mut serializer = KeySerializer::with_capacity(2);
91 serializer.extend_u8(VERSION_BYTE).extend_u8((CdcConsumerKey::KIND as u8).wrapping_sub(1));
92 serializer.to_encoded_key()
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use std::ops::RangeBounds;
99
100 use super::{CdcConsumerKey, CdcConsumerKeyRange, EncodableKey};
101 use crate::interface::CdcConsumerId;
102
103 #[test]
104 fn test_encode_decode_cdc_consumer() {
105 let key = CdcConsumerKey {
106 consumer: CdcConsumerId::new("test-consumer"),
107 };
108
109 let encoded = key.encode();
110 let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
111
112 assert_eq!(decoded.consumer, CdcConsumerId::new("test-consumer"));
113 }
114
115 #[test]
116 fn test_cdc_consumer_keys_within_range() {
117 let key1 = CdcConsumerKey {
119 consumer: CdcConsumerId::new("consumer-a"),
120 }
121 .encode();
122
123 let key2 = CdcConsumerKey {
124 consumer: CdcConsumerId::new("consumer-b"),
125 }
126 .encode();
127
128 let key3 = CdcConsumerKey {
129 consumer: CdcConsumerId::new("consumer-z"),
130 }
131 .encode();
132
133 let range = CdcConsumerKeyRange::full_scan();
135
136 assert!(range.contains(&key1), "consumer-a key should be in range");
138 assert!(range.contains(&key2), "consumer-b key should be in range");
139 assert!(range.contains(&key3), "consumer-z key should be in range");
140 }
141}