use super::{EncodableKey, KeyKind};
use crate::{
encoded::key::{EncodedKey, EncodedKeyRange},
interface::{catalog::flow::FlowId, cdc::CdcConsumerId},
util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
};
pub trait ToConsumerKey {
fn to_consumer_key(&self) -> EncodedKey;
}
impl ToConsumerKey for EncodedKey {
fn to_consumer_key(&self) -> EncodedKey {
self.clone()
}
}
impl ToConsumerKey for CdcConsumerId {
fn to_consumer_key(&self) -> EncodedKey {
CdcConsumerKey {
consumer: self.clone(),
}
.encode()
}
}
impl ToConsumerKey for FlowId {
fn to_consumer_key(&self) -> EncodedKey {
CdcConsumerKey::encoded(CdcConsumerId::new(format!("flow:{}", self.0)))
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct CdcConsumerKey {
pub consumer: CdcConsumerId,
}
impl CdcConsumerKey {
pub fn encoded(consumer: impl Into<CdcConsumerId>) -> EncodedKey {
Self {
consumer: consumer.into(),
}
.encode()
}
}
pub const VERSION_BYTE: u8 = 1;
impl EncodableKey for CdcConsumerKey {
const KIND: KeyKind = KeyKind::CdcConsumer;
fn encode(&self) -> EncodedKey {
let mut serializer = KeySerializer::new();
serializer.extend_u8(VERSION_BYTE).extend_u8(Self::KIND as u8).extend_str(&self.consumer);
serializer.to_encoded_key()
}
fn decode(key: &EncodedKey) -> Option<Self>
where
Self: Sized,
{
let mut de = KeyDeserializer::from_bytes(key.as_slice());
let version = de.read_u8().ok()?;
if version != VERSION_BYTE {
return None;
}
let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
if kind != Self::KIND {
return None;
}
let consumer_id = de.read_str().ok()?;
Some(Self {
consumer: CdcConsumerId(consumer_id),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CdcConsumerKeyRange;
impl CdcConsumerKeyRange {
pub fn full_scan() -> EncodedKeyRange {
EncodedKeyRange::start_end(Some(Self::start()), Some(Self::end()))
}
fn start() -> EncodedKey {
let mut serializer = KeySerializer::with_capacity(2);
serializer.extend_u8(VERSION_BYTE).extend_u8(CdcConsumerKey::KIND as u8);
serializer.to_encoded_key()
}
fn end() -> EncodedKey {
let mut serializer = KeySerializer::with_capacity(2);
serializer.extend_u8(VERSION_BYTE).extend_u8((CdcConsumerKey::KIND as u8).wrapping_sub(1));
serializer.to_encoded_key()
}
}
#[cfg(test)]
pub mod tests {
use std::ops::RangeBounds;
use super::{CdcConsumerKey, CdcConsumerKeyRange, EncodableKey, ToConsumerKey};
use crate::interface::{catalog::flow::FlowId, cdc::CdcConsumerId};
#[test]
fn test_encode_decode_cdc_consumer() {
let key = CdcConsumerKey {
consumer: CdcConsumerId::new("test-consumer"),
};
let encoded = key.encode();
let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
assert_eq!(decoded.consumer, CdcConsumerId::new("test-consumer"));
}
#[test]
fn test_cdc_consumer_keys_within_range() {
let key1 = CdcConsumerKey {
consumer: CdcConsumerId::new("consumer-a"),
}
.encode();
let key2 = CdcConsumerKey {
consumer: CdcConsumerId::new("consumer-b"),
}
.encode();
let key3 = CdcConsumerKey {
consumer: CdcConsumerId::new("consumer-z"),
}
.encode();
let range = CdcConsumerKeyRange::full_scan();
assert!(range.contains(&key1), "consumer-a key should be in range");
assert!(range.contains(&key2), "consumer-b key should be in range");
assert!(range.contains(&key3), "consumer-z key should be in range");
}
#[test]
fn test_flow_id_to_consumer_key() {
let flow_id = FlowId(42);
let encoded = flow_id.to_consumer_key();
let decoded = CdcConsumerKey::decode(&encoded).expect("Failed to decode key");
assert_eq!(decoded.consumer, CdcConsumerId::new("flow:42"));
}
#[test]
fn test_flow_id_keys_within_range() {
let flow1 = FlowId(1).to_consumer_key();
let flow2 = FlowId(100).to_consumer_key();
let flow3 = FlowId(999).to_consumer_key();
let range = CdcConsumerKeyRange::full_scan();
assert!(range.contains(&flow1), "flow:1 key should be in range");
assert!(range.contains(&flow2), "flow:100 key should be in range");
assert!(range.contains(&flow3), "flow:999 key should be in range");
}
}