1use super::{EncodableKey, KeyKind};
5use crate::{
6 encoded::key::{EncodedKey, EncodedKeyRange},
7 interface::catalog::id::SinkId,
8 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11#[derive(Debug, Clone, PartialEq)]
12pub struct SinkKey {
13 pub sink: SinkId,
14}
15
16impl EncodableKey for SinkKey {
17 const KIND: KeyKind = KeyKind::Sink;
18
19 fn encode(&self) -> EncodedKey {
20 let mut serializer = KeySerializer::with_capacity(9);
21 serializer.extend_u8(Self::KIND as u8).extend_u64(self.sink);
22 serializer.to_encoded_key()
23 }
24
25 fn decode(key: &EncodedKey) -> Option<Self> {
26 let mut de = KeyDeserializer::from_bytes(key.as_slice());
27
28 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
29 if kind != Self::KIND {
30 return None;
31 }
32
33 let sink = de.read_u64().ok()?;
34
35 Some(Self {
36 sink: SinkId(sink),
37 })
38 }
39}
40
41impl SinkKey {
42 pub fn encoded(sink: impl Into<SinkId>) -> EncodedKey {
43 Self {
44 sink: sink.into(),
45 }
46 .encode()
47 }
48
49 pub fn full_scan() -> EncodedKeyRange {
50 EncodedKeyRange::start_end(Some(Self::sink_start()), Some(Self::sink_end()))
51 }
52
53 fn sink_start() -> EncodedKey {
54 let mut serializer = KeySerializer::with_capacity(1);
55 serializer.extend_u8(Self::KIND as u8);
56 serializer.to_encoded_key()
57 }
58
59 fn sink_end() -> EncodedKey {
60 let mut serializer = KeySerializer::with_capacity(1);
61 serializer.extend_u8(Self::KIND as u8 - 1);
62 serializer.to_encoded_key()
63 }
64}
65
66#[cfg(test)]
67pub mod tests {
68 use super::{EncodableKey, SinkKey};
69 use crate::interface::catalog::id::SinkId;
70
71 #[test]
72 fn test_encode_decode() {
73 let key = SinkKey {
74 sink: SinkId(0x1234),
75 };
76 let encoded = key.encode();
77 let decoded = SinkKey::decode(&encoded).unwrap();
78 assert_eq!(decoded.sink, SinkId(0x1234));
79 assert_eq!(key, decoded);
80 }
81}