reifydb_core/key/
cdc_consumer.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use super::{EncodableKey, KeyKind};
5use crate::{
6	EncodedKey, EncodedKeyRange,
7	interface::CdcConsumerId,
8	util::encoding::keycode::{KeyDeserializer, KeySerializer},
9};
10
11/// Trait for types that can be converted to a consumer key
12pub trait ToConsumerKey {
13	fn to_consumer_key(&self) -> EncodedKey;
14}
15
16impl ToConsumerKey for EncodedKey {
17	fn to_consumer_key(&self) -> EncodedKey {
18		self.clone()
19	}
20}
21
22impl ToConsumerKey for CdcConsumerId {
23	fn to_consumer_key(&self) -> EncodedKey {
24		CdcConsumerKey {
25			consumer: self.clone(),
26		}
27		.encode()
28	}
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
32pub struct CdcConsumerKey {
33	pub consumer: CdcConsumerId,
34}
35
36pub const VERSION_BYTE: u8 = 1;
37
38impl EncodableKey for CdcConsumerKey {
39	const KIND: KeyKind = KeyKind::CdcConsumer;
40
41	fn encode(&self) -> EncodedKey {
42		let mut serializer = KeySerializer::new();
43		serializer.extend_u8(VERSION_BYTE).extend_u8(Self::KIND as u8).extend_str(&self.consumer);
44		serializer.to_encoded_key()
45	}
46
47	fn decode(key: &EncodedKey) -> Option<Self>
48	where
49		Self: Sized,
50	{
51		let mut de = KeyDeserializer::from_bytes(key.as_slice());
52
53		let version = de.read_u8().ok()?;
54		if version != VERSION_BYTE {
55			return None;
56		}
57
58		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
59		if kind != Self::KIND {
60			return None;
61		}
62
63		let consumer_id = de.read_str().ok()?;
64
65		Some(Self {
66			consumer: CdcConsumerId(consumer_id),
67		})
68	}
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct CdcConsumerKeyRange;
73
74impl CdcConsumerKeyRange {
75	/// Creates a key range that spans all CDC consumer checkpoint keys
76	///
77	/// Returns an `EncodedKeyRange` that can be used with transaction
78	/// range scan operations to iterate over all registered CDC consumers.
79	pub fn full_scan() -> EncodedKeyRange {
80		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
81	}
82
83	fn start() -> EncodedKey {
84		let mut serializer = KeySerializer::with_capacity(2);
85		serializer.extend_u8(VERSION_BYTE).extend_u8(CdcConsumerKey::KIND as u8);
86		serializer.to_encoded_key()
87	}
88
89	fn end() -> EncodedKey {
90		let mut serializer = KeySerializer::with_capacity(2);
91		serializer.extend_u8(VERSION_BYTE).extend_u8((CdcConsumerKey::KIND as u8).wrapping_sub(1));
92		serializer.to_encoded_key()
93	}
94}
95
96#[cfg(test)]
97mod tests {
98	use std::ops::RangeBounds;
99
100	use super::{CdcConsumerKey, CdcConsumerKeyRange, EncodableKey};
101	use crate::interface::CdcConsumerId;
102
103	#[test]
104	fn test_encode_decode_cdc_consumer() {
105		let key = CdcConsumerKey {
106			consumer: CdcConsumerId::new("test-consumer"),
107		};
108
109		let encoded = key.encode();
110		let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
111
112		assert_eq!(decoded.consumer, CdcConsumerId::new("test-consumer"));
113	}
114
115	#[test]
116	fn test_cdc_consumer_keys_within_range() {
117		// Create several CDC consumer keys
118		let key1 = CdcConsumerKey {
119			consumer: CdcConsumerId::new("consumer-a"),
120		}
121		.encode();
122
123		let key2 = CdcConsumerKey {
124			consumer: CdcConsumerId::new("consumer-b"),
125		}
126		.encode();
127
128		let key3 = CdcConsumerKey {
129			consumer: CdcConsumerId::new("consumer-z"),
130		}
131		.encode();
132
133		// Get the range
134		let range = CdcConsumerKeyRange::full_scan();
135
136		// All CDC consumer keys should fall within the range
137		assert!(range.contains(&key1), "consumer-a key should be in range");
138		assert!(range.contains(&key2), "consumer-b key should be in range");
139		assert!(range.contains(&key3), "consumer-z key should be in range");
140	}
141}