reifydb_core/key/
cdc_consumer.rs1use super::{EncodableKey, KeyKind};
5use crate::{
6 EncodedKey, EncodedKeyRange,
7 interface::CdcConsumerId,
8 util::encoding::keycode::{KeyDeserializer, KeySerializer},
9};
10
11pub trait ToConsumerKey {
13 fn to_consumer_key(&self) -> EncodedKey;
14}
15
16impl ToConsumerKey for EncodedKey {
17 fn to_consumer_key(&self) -> EncodedKey {
18 self.clone()
19 }
20}
21
22impl ToConsumerKey for CdcConsumerId {
23 fn to_consumer_key(&self) -> EncodedKey {
24 CdcConsumerKey {
25 consumer: self.clone(),
26 }
27 .encode()
28 }
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
32pub struct CdcConsumerKey {
33 pub consumer: CdcConsumerId,
34}
35
36impl CdcConsumerKey {
37 pub fn encoded(consumer: impl Into<CdcConsumerId>) -> EncodedKey {
38 Self {
39 consumer: consumer.into(),
40 }
41 .encode()
42 }
43}
44
45pub const VERSION_BYTE: u8 = 1;
46
47impl EncodableKey for CdcConsumerKey {
48 const KIND: KeyKind = KeyKind::CdcConsumer;
49
50 fn encode(&self) -> EncodedKey {
51 let mut serializer = KeySerializer::new();
52 serializer.extend_u8(VERSION_BYTE).extend_u8(Self::KIND as u8).extend_str(&self.consumer);
53 serializer.to_encoded_key()
54 }
55
56 fn decode(key: &EncodedKey) -> Option<Self>
57 where
58 Self: Sized,
59 {
60 let mut de = KeyDeserializer::from_bytes(key.as_slice());
61
62 let version = de.read_u8().ok()?;
63 if version != VERSION_BYTE {
64 return None;
65 }
66
67 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
68 if kind != Self::KIND {
69 return None;
70 }
71
72 let consumer_id = de.read_str().ok()?;
73
74 Some(Self {
75 consumer: CdcConsumerId(consumer_id),
76 })
77 }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct CdcConsumerKeyRange;
82
83impl CdcConsumerKeyRange {
84 pub fn full_scan() -> EncodedKeyRange {
89 EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
90 }
91
92 fn start() -> EncodedKey {
93 let mut serializer = KeySerializer::with_capacity(2);
94 serializer.extend_u8(VERSION_BYTE).extend_u8(CdcConsumerKey::KIND as u8);
95 serializer.to_encoded_key()
96 }
97
98 fn end() -> EncodedKey {
99 let mut serializer = KeySerializer::with_capacity(2);
100 serializer.extend_u8(VERSION_BYTE).extend_u8((CdcConsumerKey::KIND as u8).wrapping_sub(1));
101 serializer.to_encoded_key()
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::ops::RangeBounds;
108
109 use super::{CdcConsumerKey, CdcConsumerKeyRange, EncodableKey};
110 use crate::interface::CdcConsumerId;
111
112 #[test]
113 fn test_encode_decode_cdc_consumer() {
114 let key = CdcConsumerKey {
115 consumer: CdcConsumerId::new("test-consumer"),
116 };
117
118 let encoded = key.encode();
119 let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
120
121 assert_eq!(decoded.consumer, CdcConsumerId::new("test-consumer"));
122 }
123
124 #[test]
125 fn test_cdc_consumer_keys_within_range() {
126 let key1 = CdcConsumerKey {
128 consumer: CdcConsumerId::new("consumer-a"),
129 }
130 .encode();
131
132 let key2 = CdcConsumerKey {
133 consumer: CdcConsumerId::new("consumer-b"),
134 }
135 .encode();
136
137 let key3 = CdcConsumerKey {
138 consumer: CdcConsumerId::new("consumer-z"),
139 }
140 .encode();
141
142 let range = CdcConsumerKeyRange::full_scan();
144
145 assert!(range.contains(&key1), "consumer-a key should be in range");
147 assert!(range.contains(&key2), "consumer-b key should be in range");
148 assert!(range.contains(&key3), "consumer-z key should be in range");
149 }
150}