Skip to main content

reifydb_core/key/
sink.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use super::{EncodableKey, KeyKind};
5use crate::{
6	encoded::key::{EncodedKey, EncodedKeyRange},
7	interface::catalog::id::SinkId,
8	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
9};
10
11#[derive(Debug, Clone, PartialEq)]
12pub struct SinkKey {
13	pub sink: SinkId,
14}
15
16const VERSION: u8 = 1;
17
18impl EncodableKey for SinkKey {
19	const KIND: KeyKind = KeyKind::Sink;
20
21	fn encode(&self) -> EncodedKey {
22		let mut serializer = KeySerializer::with_capacity(10);
23		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.sink);
24		serializer.to_encoded_key()
25	}
26
27	fn decode(key: &EncodedKey) -> Option<Self> {
28		let mut de = KeyDeserializer::from_bytes(key.as_slice());
29
30		let version = de.read_u8().ok()?;
31		if version != VERSION {
32			return None;
33		}
34
35		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
36		if kind != Self::KIND {
37			return None;
38		}
39
40		let sink = de.read_u64().ok()?;
41
42		Some(Self {
43			sink: SinkId(sink),
44		})
45	}
46}
47
48impl SinkKey {
49	pub fn encoded(sink: impl Into<SinkId>) -> EncodedKey {
50		Self {
51			sink: sink.into(),
52		}
53		.encode()
54	}
55
56	pub fn full_scan() -> EncodedKeyRange {
57		EncodedKeyRange::start_end(Some(Self::sink_start()), Some(Self::sink_end()))
58	}
59
60	fn sink_start() -> EncodedKey {
61		let mut serializer = KeySerializer::with_capacity(2);
62		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8);
63		serializer.to_encoded_key()
64	}
65
66	fn sink_end() -> EncodedKey {
67		let mut serializer = KeySerializer::with_capacity(2);
68		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8 - 1);
69		serializer.to_encoded_key()
70	}
71}
72
73#[cfg(test)]
74pub mod tests {
75	use super::{EncodableKey, SinkKey};
76	use crate::interface::catalog::id::SinkId;
77
78	#[test]
79	fn test_encode_decode() {
80		let key = SinkKey {
81			sink: SinkId(0x1234),
82		};
83		let encoded = key.encode();
84		let decoded = SinkKey::decode(&encoded).unwrap();
85		assert_eq!(decoded.sink, SinkId(0x1234));
86		assert_eq!(key, decoded);
87	}
88}