Skip to main content

reifydb_core/key/
cdc_consumer.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use super::{EncodableKey, KeyKind};
5use crate::{
6	encoded::key::{EncodedKey, EncodedKeyRange},
7	interface::{catalog::flow::FlowId, cdc::CdcConsumerId},
8	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11/// Trait for types that can be converted to a consumer key
12pub trait ToConsumerKey {
13	fn to_consumer_key(&self) -> EncodedKey;
14}
15
16impl ToConsumerKey for EncodedKey {
17	fn to_consumer_key(&self) -> EncodedKey {
18		self.clone()
19	}
20}
21
22impl ToConsumerKey for CdcConsumerId {
23	fn to_consumer_key(&self) -> EncodedKey {
24		CdcConsumerKey {
25			consumer: self.clone(),
26		}
27		.encode()
28	}
29}
30
31impl ToConsumerKey for FlowId {
32	fn to_consumer_key(&self) -> EncodedKey {
33		CdcConsumerKey::encoded(CdcConsumerId::new(format!("flow:{}", self.0)))
34	}
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
38pub struct CdcConsumerKey {
39	pub consumer: CdcConsumerId,
40}
41
42impl CdcConsumerKey {
43	pub fn encoded(consumer: impl Into<CdcConsumerId>) -> EncodedKey {
44		Self {
45			consumer: consumer.into(),
46		}
47		.encode()
48	}
49}
50
51pub const VERSION_BYTE: u8 = 1;
52
53impl EncodableKey for CdcConsumerKey {
54	const KIND: KeyKind = KeyKind::CdcConsumer;
55
56	fn encode(&self) -> EncodedKey {
57		let mut serializer = KeySerializer::new();
58		serializer.extend_u8(VERSION_BYTE).extend_u8(Self::KIND as u8).extend_str(&self.consumer);
59		serializer.to_encoded_key()
60	}
61
62	fn decode(key: &EncodedKey) -> Option<Self>
63	where
64		Self: Sized,
65	{
66		let mut de = KeyDeserializer::from_bytes(key.as_slice());
67
68		let version = de.read_u8().ok()?;
69		if version != VERSION_BYTE {
70			return None;
71		}
72
73		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
74		if kind != Self::KIND {
75			return None;
76		}
77
78		let consumer_id = de.read_str().ok()?;
79
80		Some(Self {
81			consumer: CdcConsumerId(consumer_id),
82		})
83	}
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct CdcConsumerKeyRange;
88
89impl CdcConsumerKeyRange {
90	/// Creates a key range that spans all CDC consumer checkpoint keys
91	///
92	/// Returns an `EncodedKeyRange` that can be used with transaction
93	/// range scan operations to iterate over all registered CDC consumers.
94	pub fn full_scan() -> EncodedKeyRange {
95		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
96	}
97
98	fn start() -> EncodedKey {
99		let mut serializer = KeySerializer::with_capacity(2);
100		serializer.extend_u8(VERSION_BYTE).extend_u8(CdcConsumerKey::KIND as u8);
101		serializer.to_encoded_key()
102	}
103
104	fn end() -> EncodedKey {
105		let mut serializer = KeySerializer::with_capacity(2);
106		serializer.extend_u8(VERSION_BYTE).extend_u8((CdcConsumerKey::KIND as u8).wrapping_sub(1));
107		serializer.to_encoded_key()
108	}
109}
110
111#[cfg(test)]
112pub mod tests {
113	use std::ops::RangeBounds;
114
115	use super::{CdcConsumerKey, CdcConsumerKeyRange, EncodableKey, ToConsumerKey};
116	use crate::interface::{catalog::flow::FlowId, cdc::CdcConsumerId};
117
118	#[test]
119	fn test_encode_decode_cdc_consumer() {
120		let key = CdcConsumerKey {
121			consumer: CdcConsumerId::new("test-consumer"),
122		};
123
124		let encoded = key.encode();
125		let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
126
127		assert_eq!(decoded.consumer, CdcConsumerId::new("test-consumer"));
128	}
129
130	#[test]
131	fn test_cdc_consumer_keys_within_range() {
132		// Create several CDC consumer keys
133		let key1 = CdcConsumerKey {
134			consumer: CdcConsumerId::new("consumer-a"),
135		}
136		.encode();
137
138		let key2 = CdcConsumerKey {
139			consumer: CdcConsumerId::new("consumer-b"),
140		}
141		.encode();
142
143		let key3 = CdcConsumerKey {
144			consumer: CdcConsumerId::new("consumer-z"),
145		}
146		.encode();
147
148		// Get the range
149		let range = CdcConsumerKeyRange::full_scan();
150
151		// All CDC consumer keys should fall within the range
152		assert!(range.contains(&key1), "consumer-a key should be in range");
153		assert!(range.contains(&key2), "consumer-b key should be in range");
154		assert!(range.contains(&key3), "consumer-z key should be in range");
155	}
156
157	#[test]
158	fn test_flow_id_to_consumer_key() {
159		let flow_id = FlowId(42);
160		let encoded = flow_id.to_consumer_key();
161
162		// Decode and verify it creates the expected consumer key format
163		let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
164		assert_eq!(decoded.consumer, CdcConsumerId::new("flow:42"));
165	}
166
167	#[test]
168	fn test_flow_id_keys_within_range() {
169		let flow1 = FlowId(1).to_consumer_key();
170		let flow2 = FlowId(100).to_consumer_key();
171		let flow3 = FlowId(999).to_consumer_key();
172
173		let range = CdcConsumerKeyRange::full_scan();
174
175		assert!(range.contains(&flow1), "flow:1 key should be in range");
176		assert!(range.contains(&flow2), "flow:100 key should be in range");
177		assert!(range.contains(&flow3), "flow:999 key should be in range");
178	}
179}