reifydb_core/key/
namespace_sink.rs1use super::{EncodableKey, KeyKind};
5use crate::{
6 encoded::key::{EncodedKey, EncodedKeyRange},
7 interface::catalog::id::{NamespaceId, SinkId},
8 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11#[derive(Debug, Clone, PartialEq)]
12pub struct NamespaceSinkKey {
13 pub namespace: NamespaceId,
14 pub sink: SinkId,
15}
16
17const VERSION: u8 = 1;
18
19impl EncodableKey for NamespaceSinkKey {
20 const KIND: KeyKind = KeyKind::NamespaceSink;
21
22 fn encode(&self) -> EncodedKey {
23 let mut serializer = KeySerializer::with_capacity(18);
24 serializer
25 .extend_u8(VERSION)
26 .extend_u8(Self::KIND as u8)
27 .extend_u64(self.namespace)
28 .extend_u64(self.sink);
29 serializer.to_encoded_key()
30 }
31
32 fn decode(key: &EncodedKey) -> Option<Self> {
33 let mut de = KeyDeserializer::from_bytes(key.as_slice());
34
35 let version = de.read_u8().ok()?;
36 if version != VERSION {
37 return None;
38 }
39
40 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
41 if kind != Self::KIND {
42 return None;
43 }
44
45 let namespace = de.read_u64().ok()?;
46 let sink = de.read_u64().ok()?;
47
48 Some(Self {
49 namespace: NamespaceId(namespace),
50 sink: SinkId(sink),
51 })
52 }
53}
54
55impl NamespaceSinkKey {
56 pub fn encoded(namespace: impl Into<NamespaceId>, sink: impl Into<SinkId>) -> EncodedKey {
57 Self {
58 namespace: namespace.into(),
59 sink: sink.into(),
60 }
61 .encode()
62 }
63
64 pub fn full_scan(namespace_id: NamespaceId) -> EncodedKeyRange {
65 EncodedKeyRange::start_end(Some(Self::link_start(namespace_id)), Some(Self::link_end(namespace_id)))
66 }
67
68 fn link_start(namespace_id: NamespaceId) -> EncodedKey {
69 let mut serializer = KeySerializer::with_capacity(10);
70 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(namespace_id);
71 serializer.to_encoded_key()
72 }
73
74 fn link_end(namespace_id: NamespaceId) -> EncodedKey {
75 let mut serializer = KeySerializer::with_capacity(10);
76 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(*namespace_id - 1);
77 serializer.to_encoded_key()
78 }
79}
80
81#[cfg(test)]
82pub mod tests {
83 use super::{EncodableKey, NamespaceSinkKey};
84 use crate::interface::catalog::id::{NamespaceId, SinkId};
85
86 #[test]
87 fn test_encode_decode() {
88 let key = NamespaceSinkKey {
89 namespace: NamespaceId(0xABCD),
90 sink: SinkId(0x123456789ABCDEF0),
91 };
92 let encoded = key.encode();
93 let decoded = NamespaceSinkKey::decode(&encoded).unwrap();
94 assert_eq!(decoded.namespace, NamespaceId(0xABCD));
95 assert_eq!(decoded.sink, SinkId(0x123456789ABCDEF0));
96 assert_eq!(key, decoded);
97 }
98
99 #[test]
100 fn test_order_preserving() {
101 let key1 = NamespaceSinkKey {
102 namespace: NamespaceId::SYSTEM,
103 sink: SinkId(100),
104 };
105 let key2 = NamespaceSinkKey {
106 namespace: NamespaceId::SYSTEM,
107 sink: SinkId(200),
108 };
109 let key3 = NamespaceSinkKey {
110 namespace: NamespaceId::DEFAULT,
111 sink: SinkId(0),
112 };
113
114 let encoded1 = key1.encode();
115 let encoded2 = key2.encode();
116 let encoded3 = key3.encode();
117
118 assert!(encoded3 < encoded2, "ordering not preserved");
119 assert!(encoded2 < encoded1, "ordering not preserved");
120 }
121}