reifydb_core/key/
cdc_consumer.rs1use super::{EncodableKey, KeyKind};
5use crate::{
6 encoded::key::{EncodedKey, EncodedKeyRange},
7 interface::{catalog::flow::FlowId, cdc::CdcConsumerId},
8 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11pub trait ToConsumerKey {
12 fn to_consumer_key(&self) -> EncodedKey;
13}
14
15impl ToConsumerKey for EncodedKey {
16 fn to_consumer_key(&self) -> EncodedKey {
17 self.clone()
18 }
19}
20
21impl ToConsumerKey for CdcConsumerId {
22 fn to_consumer_key(&self) -> EncodedKey {
23 CdcConsumerKey {
24 consumer: self.clone(),
25 }
26 .encode()
27 }
28}
29
30impl ToConsumerKey for FlowId {
31 fn to_consumer_key(&self) -> EncodedKey {
32 CdcConsumerKey::encoded(CdcConsumerId::new(format!("flow:{}", self.0)))
33 }
34}
35
36#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
37pub struct CdcConsumerKey {
38 pub consumer: CdcConsumerId,
39}
40
41impl CdcConsumerKey {
42 pub fn encoded(consumer: impl Into<CdcConsumerId>) -> EncodedKey {
43 Self {
44 consumer: consumer.into(),
45 }
46 .encode()
47 }
48}
49
50impl EncodableKey for CdcConsumerKey {
51 const KIND: KeyKind = KeyKind::CdcConsumer;
52
53 fn encode(&self) -> EncodedKey {
54 let mut serializer = KeySerializer::new();
55 serializer.extend_u8(Self::KIND as u8).extend_str(&self.consumer);
56 serializer.to_encoded_key()
57 }
58
59 fn decode(key: &EncodedKey) -> Option<Self>
60 where
61 Self: Sized,
62 {
63 let mut de = KeyDeserializer::from_bytes(key.as_slice());
64
65 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
66 if kind != Self::KIND {
67 return None;
68 }
69
70 let consumer_id = de.read_str().ok()?;
71
72 Some(Self {
73 consumer: CdcConsumerId(consumer_id),
74 })
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct CdcConsumerKeyRange;
80
81impl CdcConsumerKeyRange {
82 pub fn full_scan() -> EncodedKeyRange {
83 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
84 }
85
86 fn start() -> EncodedKey {
87 let mut serializer = KeySerializer::with_capacity(1);
88 serializer.extend_u8(CdcConsumerKey::KIND as u8);
89 serializer.to_encoded_key()
90 }
91
92 fn end() -> EncodedKey {
93 let mut serializer = KeySerializer::with_capacity(1);
94 serializer.extend_u8((CdcConsumerKey::KIND as u8).wrapping_sub(1));
95 serializer.to_encoded_key()
96 }
97}
98
99#[cfg(test)]
100pub mod tests {
101 use std::ops::RangeBounds;
102
103 use super::{CdcConsumerKey, CdcConsumerKeyRange, EncodableKey, ToConsumerKey};
104 use crate::interface::{catalog::flow::FlowId, cdc::CdcConsumerId};
105
106 #[test]
107 fn test_encode_decode_cdc_consumer() {
108 let key = CdcConsumerKey {
109 consumer: CdcConsumerId::new("test-consumer"),
110 };
111
112 let encoded = key.encode();
113 let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
114
115 assert_eq!(decoded.consumer, CdcConsumerId::new("test-consumer"));
116 }
117
118 #[test]
119 fn test_cdc_consumer_keys_within_range() {
120 let key1 = CdcConsumerKey {
121 consumer: CdcConsumerId::new("consumer-a"),
122 }
123 .encode();
124
125 let key2 = CdcConsumerKey {
126 consumer: CdcConsumerId::new("consumer-b"),
127 }
128 .encode();
129
130 let key3 = CdcConsumerKey {
131 consumer: CdcConsumerId::new("consumer-z"),
132 }
133 .encode();
134
135 let range = CdcConsumerKeyRange::full_scan();
136
137 assert!(range.contains(&key1), "consumer-a key should be in range");
138 assert!(range.contains(&key2), "consumer-b key should be in range");
139 assert!(range.contains(&key3), "consumer-z key should be in range");
140 }
141
142 #[test]
143 fn test_flow_id_to_consumer_key() {
144 let flow_id = FlowId(42);
145 let encoded = flow_id.to_consumer_key();
146
147 let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
148 assert_eq!(decoded.consumer, CdcConsumerId::new("flow:42"));
149 }
150
151 #[test]
152 fn test_flow_id_keys_within_range() {
153 let flow1 = FlowId(1).to_consumer_key();
154 let flow2 = FlowId(100).to_consumer_key();
155 let flow3 = FlowId(999).to_consumer_key();
156
157 let range = CdcConsumerKeyRange::full_scan();
158
159 assert!(range.contains(&flow1), "flow:1 key should be in range");
160 assert!(range.contains(&flow2), "flow:100 key should be in range");
161 assert!(range.contains(&flow3), "flow:999 key should be in range");
162 }
163}