reifydb_core/key/
cdc_consumer.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use super::{EncodableKey, KeyKind};
5use crate::{
6	EncodedKey, EncodedKeyRange,
7	interface::CdcConsumerId,
8	util::encoding::keycode::{KeyDeserializer, KeySerializer},
9};
10
11/// Trait for types that can be converted to a consumer key
12pub trait ToConsumerKey {
13	fn to_consumer_key(&self) -> EncodedKey;
14}
15
16impl ToConsumerKey for EncodedKey {
17	fn to_consumer_key(&self) -> EncodedKey {
18		self.clone()
19	}
20}
21
22impl ToConsumerKey for CdcConsumerId {
23	fn to_consumer_key(&self) -> EncodedKey {
24		CdcConsumerKey {
25			consumer: self.clone(),
26		}
27		.encode()
28	}
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
32pub struct CdcConsumerKey {
33	pub consumer: CdcConsumerId,
34}
35
36impl CdcConsumerKey {
37	pub fn encoded(consumer: impl Into<CdcConsumerId>) -> EncodedKey {
38		Self {
39			consumer: consumer.into(),
40		}
41		.encode()
42	}
43}
44
45pub const VERSION_BYTE: u8 = 1;
46
47impl EncodableKey for CdcConsumerKey {
48	const KIND: KeyKind = KeyKind::CdcConsumer;
49
50	fn encode(&self) -> EncodedKey {
51		let mut serializer = KeySerializer::new();
52		serializer.extend_u8(VERSION_BYTE).extend_u8(Self::KIND as u8).extend_str(&self.consumer);
53		serializer.to_encoded_key()
54	}
55
56	fn decode(key: &EncodedKey) -> Option<Self>
57	where
58		Self: Sized,
59	{
60		let mut de = KeyDeserializer::from_bytes(key.as_slice());
61
62		let version = de.read_u8().ok()?;
63		if version != VERSION_BYTE {
64			return None;
65		}
66
67		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
68		if kind != Self::KIND {
69			return None;
70		}
71
72		let consumer_id = de.read_str().ok()?;
73
74		Some(Self {
75			consumer: CdcConsumerId(consumer_id),
76		})
77	}
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct CdcConsumerKeyRange;
82
83impl CdcConsumerKeyRange {
84	/// Creates a key range that spans all CDC consumer checkpoint keys
85	///
86	/// Returns an `EncodedKeyRange` that can be used with transaction
87	/// range scan operations to iterate over all registered CDC consumers.
88	pub fn full_scan() -> EncodedKeyRange {
89		EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
90	}
91
92	fn start() -> EncodedKey {
93		let mut serializer = KeySerializer::with_capacity(2);
94		serializer.extend_u8(VERSION_BYTE).extend_u8(CdcConsumerKey::KIND as u8);
95		serializer.to_encoded_key()
96	}
97
98	fn end() -> EncodedKey {
99		let mut serializer = KeySerializer::with_capacity(2);
100		serializer.extend_u8(VERSION_BYTE).extend_u8((CdcConsumerKey::KIND as u8).wrapping_sub(1));
101		serializer.to_encoded_key()
102	}
103}
104
105#[cfg(test)]
106mod tests {
107	use std::ops::RangeBounds;
108
109	use super::{CdcConsumerKey, CdcConsumerKeyRange, EncodableKey};
110	use crate::interface::CdcConsumerId;
111
112	#[test]
113	fn test_encode_decode_cdc_consumer() {
114		let key = CdcConsumerKey {
115			consumer: CdcConsumerId::new("test-consumer"),
116		};
117
118		let encoded = key.encode();
119		let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
120
121		assert_eq!(decoded.consumer, CdcConsumerId::new("test-consumer"));
122	}
123
124	#[test]
125	fn test_cdc_consumer_keys_within_range() {
126		// Create several CDC consumer keys
127		let key1 = CdcConsumerKey {
128			consumer: CdcConsumerId::new("consumer-a"),
129		}
130		.encode();
131
132		let key2 = CdcConsumerKey {
133			consumer: CdcConsumerId::new("consumer-b"),
134		}
135		.encode();
136
137		let key3 = CdcConsumerKey {
138			consumer: CdcConsumerId::new("consumer-z"),
139		}
140		.encode();
141
142		// Get the range
143		let range = CdcConsumerKeyRange::full_scan();
144
145		// All CDC consumer keys should fall within the range
146		assert!(range.contains(&key1), "consumer-a key should be in range");
147		assert!(range.contains(&key2), "consumer-b key should be in range");
148		assert!(range.contains(&key3), "consumer-z key should be in range");
149	}
150}