Skip to main content

reifydb_core/key/
cdc_consumer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use super::{EncodableKey, KeyKind};
5use crate::{
6	encoded::key::{EncodedKey, EncodedKeyRange},
7	interface::{catalog::flow::FlowId, cdc::CdcConsumerId},
8	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11pub trait ToConsumerKey {
12	fn to_consumer_key(&self) -> EncodedKey;
13}
14
15impl ToConsumerKey for EncodedKey {
16	fn to_consumer_key(&self) -> EncodedKey {
17		self.clone()
18	}
19}
20
21impl ToConsumerKey for CdcConsumerId {
22	fn to_consumer_key(&self) -> EncodedKey {
23		CdcConsumerKey {
24			consumer: self.clone(),
25		}
26		.encode()
27	}
28}
29
30impl ToConsumerKey for FlowId {
31	fn to_consumer_key(&self) -> EncodedKey {
32		CdcConsumerKey::encoded(CdcConsumerId::new(format!("flow:{}", self.0)))
33	}
34}
35
36#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
37pub struct CdcConsumerKey {
38	pub consumer: CdcConsumerId,
39}
40
41impl CdcConsumerKey {
42	pub fn encoded(consumer: impl Into<CdcConsumerId>) -> EncodedKey {
43		Self {
44			consumer: consumer.into(),
45		}
46		.encode()
47	}
48}
49
50impl EncodableKey for CdcConsumerKey {
51	const KIND: KeyKind = KeyKind::CdcConsumer;
52
53	fn encode(&self) -> EncodedKey {
54		let mut serializer = KeySerializer::new();
55		serializer.extend_u8(Self::KIND as u8).extend_str(&self.consumer);
56		serializer.to_encoded_key()
57	}
58
59	fn decode(key: &EncodedKey) -> Option<Self>
60	where
61		Self: Sized,
62	{
63		let mut de = KeyDeserializer::from_bytes(key.as_slice());
64
65		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
66		if kind != Self::KIND {
67			return None;
68		}
69
70		let consumer_id = de.read_str().ok()?;
71
72		Some(Self {
73			consumer: CdcConsumerId(consumer_id),
74		})
75	}
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct CdcConsumerKeyRange;
80
81impl CdcConsumerKeyRange {
82	pub fn full_scan() -> EncodedKeyRange {
83		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
84	}
85
86	fn start() -> EncodedKey {
87		let mut serializer = KeySerializer::with_capacity(1);
88		serializer.extend_u8(CdcConsumerKey::KIND as u8);
89		serializer.to_encoded_key()
90	}
91
92	fn end() -> EncodedKey {
93		let mut serializer = KeySerializer::with_capacity(1);
94		serializer.extend_u8((CdcConsumerKey::KIND as u8).wrapping_sub(1));
95		serializer.to_encoded_key()
96	}
97}
98
99#[cfg(test)]
100pub mod tests {
101	use std::ops::RangeBounds;
102
103	use super::{CdcConsumerKey, CdcConsumerKeyRange, EncodableKey, ToConsumerKey};
104	use crate::interface::{catalog::flow::FlowId, cdc::CdcConsumerId};
105
106	#[test]
107	fn test_encode_decode_cdc_consumer() {
108		let key = CdcConsumerKey {
109			consumer: CdcConsumerId::new("test-consumer"),
110		};
111
112		let encoded = key.encode();
113		let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
114
115		assert_eq!(decoded.consumer, CdcConsumerId::new("test-consumer"));
116	}
117
118	#[test]
119	fn test_cdc_consumer_keys_within_range() {
120		let key1 = CdcConsumerKey {
121			consumer: CdcConsumerId::new("consumer-a"),
122		}
123		.encode();
124
125		let key2 = CdcConsumerKey {
126			consumer: CdcConsumerId::new("consumer-b"),
127		}
128		.encode();
129
130		let key3 = CdcConsumerKey {
131			consumer: CdcConsumerId::new("consumer-z"),
132		}
133		.encode();
134
135		let range = CdcConsumerKeyRange::full_scan();
136
137		assert!(range.contains(&key1), "consumer-a key should be in range");
138		assert!(range.contains(&key2), "consumer-b key should be in range");
139		assert!(range.contains(&key3), "consumer-z key should be in range");
140	}
141
142	#[test]
143	fn test_flow_id_to_consumer_key() {
144		let flow_id = FlowId(42);
145		let encoded = flow_id.to_consumer_key();
146
147		let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
148		assert_eq!(decoded.consumer, CdcConsumerId::new("flow:42"));
149	}
150
151	#[test]
152	fn test_flow_id_keys_within_range() {
153		let flow1 = FlowId(1).to_consumer_key();
154		let flow2 = FlowId(100).to_consumer_key();
155		let flow3 = FlowId(999).to_consumer_key();
156
157		let range = CdcConsumerKeyRange::full_scan();
158
159		assert!(range.contains(&flow1), "flow:1 key should be in range");
160		assert!(range.contains(&flow2), "flow:100 key should be in range");
161		assert!(range.contains(&flow3), "flow:999 key should be in range");
162	}
163}