1use super::{EncodableKey, KeyKind};
5use crate::{
6 encoded::key::{EncodedKey, EncodedKeyRange},
7 interface::catalog::id::SinkId,
8 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11#[derive(Debug, Clone, PartialEq)]
12pub struct SinkKey {
13 pub sink: SinkId,
14}
15
16const VERSION: u8 = 1;
17
18impl EncodableKey for SinkKey {
19 const KIND: KeyKind = KeyKind::Sink;
20
21 fn encode(&self) -> EncodedKey {
22 let mut serializer = KeySerializer::with_capacity(10);
23 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.sink);
24 serializer.to_encoded_key()
25 }
26
27 fn decode(key: &EncodedKey) -> Option<Self> {
28 let mut de = KeyDeserializer::from_bytes(key.as_slice());
29
30 let version = de.read_u8().ok()?;
31 if version != VERSION {
32 return None;
33 }
34
35 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
36 if kind != Self::KIND {
37 return None;
38 }
39
40 let sink = de.read_u64().ok()?;
41
42 Some(Self {
43 sink: SinkId(sink),
44 })
45 }
46}
47
48impl SinkKey {
49 pub fn encoded(sink: impl Into<SinkId>) -> EncodedKey {
50 Self {
51 sink: sink.into(),
52 }
53 .encode()
54 }
55
56 pub fn full_scan() -> EncodedKeyRange {
57 EncodedKeyRange::start_end(Some(Self::sink_start()), Some(Self::sink_end()))
58 }
59
60 fn sink_start() -> EncodedKey {
61 let mut serializer = KeySerializer::with_capacity(2);
62 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
63 serializer.to_encoded_key()
64 }
65
66 fn sink_end() -> EncodedKey {
67 let mut serializer = KeySerializer::with_capacity(2);
68 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8 - 1);
69 serializer.to_encoded_key()
70 }
71}
72
73#[cfg(test)]
74pub mod tests {
75 use super::{EncodableKey, SinkKey};
76 use crate::interface::catalog::id::SinkId;
77
78 #[test]
79 fn test_encode_decode() {
80 let key = SinkKey {
81 sink: SinkId(0x1234),
82 };
83 let encoded = key.encode();
84 let decoded = SinkKey::decode(&encoded).unwrap();
85 assert_eq!(decoded.sink, SinkId(0x1234));
86 assert_eq!(key, decoded);
87 }
88}