reifydb_core/key/
cdc_consumer.rs1use super::{EncodableKey, KeyKind};
5use crate::{
6 encoded::key::{EncodedKey, EncodedKeyRange},
7 interface::{catalog::flow::FlowId, cdc::CdcConsumerId},
8 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11pub trait ToConsumerKey {
13 fn to_consumer_key(&self) -> EncodedKey;
14}
15
16impl ToConsumerKey for EncodedKey {
17 fn to_consumer_key(&self) -> EncodedKey {
18 self.clone()
19 }
20}
21
22impl ToConsumerKey for CdcConsumerId {
23 fn to_consumer_key(&self) -> EncodedKey {
24 CdcConsumerKey {
25 consumer: self.clone(),
26 }
27 .encode()
28 }
29}
30
31impl ToConsumerKey for FlowId {
32 fn to_consumer_key(&self) -> EncodedKey {
33 CdcConsumerKey::encoded(CdcConsumerId::new(format!("flow:{}", self.0)))
34 }
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
38pub struct CdcConsumerKey {
39 pub consumer: CdcConsumerId,
40}
41
42impl CdcConsumerKey {
43 pub fn encoded(consumer: impl Into<CdcConsumerId>) -> EncodedKey {
44 Self {
45 consumer: consumer.into(),
46 }
47 .encode()
48 }
49}
50
51pub const VERSION_BYTE: u8 = 1;
52
53impl EncodableKey for CdcConsumerKey {
54 const KIND: KeyKind = KeyKind::CdcConsumer;
55
56 fn encode(&self) -> EncodedKey {
57 let mut serializer = KeySerializer::new();
58 serializer.extend_u8(VERSION_BYTE).extend_u8(Self::KIND as u8).extend_str(&self.consumer);
59 serializer.to_encoded_key()
60 }
61
62 fn decode(key: &EncodedKey) -> Option<Self>
63 where
64 Self: Sized,
65 {
66 let mut de = KeyDeserializer::from_bytes(key.as_slice());
67
68 let version = de.read_u8().ok()?;
69 if version != VERSION_BYTE {
70 return None;
71 }
72
73 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
74 if kind != Self::KIND {
75 return None;
76 }
77
78 let consumer_id = de.read_str().ok()?;
79
80 Some(Self {
81 consumer: CdcConsumerId(consumer_id),
82 })
83 }
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct CdcConsumerKeyRange;
88
89impl CdcConsumerKeyRange {
90 pub fn full_scan() -> EncodedKeyRange {
95 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
96 }
97
98 fn start() -> EncodedKey {
99 let mut serializer = KeySerializer::with_capacity(2);
100 serializer.extend_u8(VERSION_BYTE).extend_u8(CdcConsumerKey::KIND as u8);
101 serializer.to_encoded_key()
102 }
103
104 fn end() -> EncodedKey {
105 let mut serializer = KeySerializer::with_capacity(2);
106 serializer.extend_u8(VERSION_BYTE).extend_u8((CdcConsumerKey::KIND as u8).wrapping_sub(1));
107 serializer.to_encoded_key()
108 }
109}
110
111#[cfg(test)]
112pub mod tests {
113 use std::ops::RangeBounds;
114
115 use super::{CdcConsumerKey, CdcConsumerKeyRange, EncodableKey, ToConsumerKey};
116 use crate::interface::{catalog::flow::FlowId, cdc::CdcConsumerId};
117
118 #[test]
119 fn test_encode_decode_cdc_consumer() {
120 let key = CdcConsumerKey {
121 consumer: CdcConsumerId::new("test-consumer"),
122 };
123
124 let encoded = key.encode();
125 let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
126
127 assert_eq!(decoded.consumer, CdcConsumerId::new("test-consumer"));
128 }
129
130 #[test]
131 fn test_cdc_consumer_keys_within_range() {
132 let key1 = CdcConsumerKey {
134 consumer: CdcConsumerId::new("consumer-a"),
135 }
136 .encode();
137
138 let key2 = CdcConsumerKey {
139 consumer: CdcConsumerId::new("consumer-b"),
140 }
141 .encode();
142
143 let key3 = CdcConsumerKey {
144 consumer: CdcConsumerId::new("consumer-z"),
145 }
146 .encode();
147
148 let range = CdcConsumerKeyRange::full_scan();
150
151 assert!(range.contains(&key1), "consumer-a key should be in range");
153 assert!(range.contains(&key2), "consumer-b key should be in range");
154 assert!(range.contains(&key3), "consumer-z key should be in range");
155 }
156
157 #[test]
158 fn test_flow_id_to_consumer_key() {
159 let flow_id = FlowId(42);
160 let encoded = flow_id.to_consumer_key();
161
162 let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
164 assert_eq!(decoded.consumer, CdcConsumerId::new("flow:42"));
165 }
166
167 #[test]
168 fn test_flow_id_keys_within_range() {
169 let flow1 = FlowId(1).to_consumer_key();
170 let flow2 = FlowId(100).to_consumer_key();
171 let flow3 = FlowId(999).to_consumer_key();
172
173 let range = CdcConsumerKeyRange::full_scan();
174
175 assert!(range.contains(&flow1), "flow:1 key should be in range");
176 assert!(range.contains(&flow2), "flow:100 key should be in range");
177 assert!(range.contains(&flow3), "flow:999 key should be in range");
178 }
179}