reifydb_core/key/
ringbuffer.rs1use reifydb_type::value::Value;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 interface::catalog::id::RingBufferId,
10 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13const VERSION: u8 = 1;
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct RingBufferKey {
17 pub ringbuffer: RingBufferId,
18}
19
20impl RingBufferKey {
21 pub fn new(ringbuffer: RingBufferId) -> Self {
22 Self {
23 ringbuffer,
24 }
25 }
26
27 pub fn encoded(ringbuffer: impl Into<RingBufferId>) -> EncodedKey {
28 Self::new(ringbuffer.into()).encode()
29 }
30
31 pub fn full_scan() -> EncodedKeyRange {
32 EncodedKeyRange::start_end(Some(Self::ringbuffer_start()), Some(Self::ringbuffer_end()))
33 }
34
35 fn ringbuffer_start() -> EncodedKey {
36 let mut serializer = KeySerializer::with_capacity(2);
37 serializer.extend_u8(VERSION);
38 serializer.extend_u8(Self::KIND as u8);
39 serializer.to_encoded_key()
40 }
41
42 fn ringbuffer_end() -> EncodedKey {
43 let mut serializer = KeySerializer::with_capacity(2);
44 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8 - 1);
45 serializer.to_encoded_key()
46 }
47}
48
49impl EncodableKey for RingBufferKey {
50 const KIND: KeyKind = KeyKind::RingBuffer;
51
52 fn encode(&self) -> EncodedKey {
53 let mut serializer = KeySerializer::with_capacity(10);
54 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.ringbuffer);
55 serializer.to_encoded_key()
56 }
57
58 fn decode(key: &EncodedKey) -> Option<Self> {
59 let mut de = KeyDeserializer::from_bytes(key.as_slice());
60
61 let version = de.read_u8().ok()?;
62 if version != VERSION {
63 return None;
64 }
65
66 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
67 if kind != Self::KIND {
68 return None;
69 }
70
71 let ringbuffer = de.read_u64().ok()?;
72
73 Some(Self {
74 ringbuffer: RingBufferId(ringbuffer),
75 })
76 }
77}
78
79#[derive(Debug, Clone, PartialEq)]
80pub struct RingBufferMetadataKey {
81 pub ringbuffer: RingBufferId,
82 pub partition_values: Vec<Value>,
83}
84
85impl RingBufferMetadataKey {
86 pub fn new(ringbuffer: RingBufferId) -> Self {
87 Self {
88 ringbuffer,
89 partition_values: vec![],
90 }
91 }
92
93 pub fn encoded(ringbuffer: impl Into<RingBufferId>) -> EncodedKey {
94 Self::new(ringbuffer.into()).encode()
95 }
96
97 pub fn encoded_partition(ringbuffer: impl Into<RingBufferId>, partition_values: Vec<Value>) -> EncodedKey {
98 Self {
99 ringbuffer: ringbuffer.into(),
100 partition_values,
101 }
102 .encode()
103 }
104
105 pub fn full_scan_for_ringbuffer(ringbuffer: RingBufferId) -> EncodedKeyRange {
107 let mut start = KeySerializer::with_capacity(10);
108 start.extend_u8(VERSION);
109 start.extend_u8(Self::KIND as u8);
110 start.extend_u64(ringbuffer);
111 let start_key = start.to_encoded_key();
112
113 let mut end = KeySerializer::with_capacity(10);
114 end.extend_u8(VERSION);
115 end.extend_u8(Self::KIND as u8);
116 end.extend_u64(RingBufferId(ringbuffer.0 - 1));
117 let end_key = end.to_encoded_key();
118
119 EncodedKeyRange::start_end(Some(start_key), Some(end_key))
120 }
121}
122
123impl EncodableKey for RingBufferMetadataKey {
124 const KIND: KeyKind = KeyKind::RingBufferMetadata;
125
126 fn encode(&self) -> EncodedKey {
127 let mut serializer = KeySerializer::with_capacity(32);
128 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_u64(self.ringbuffer);
129 for value in &self.partition_values {
130 serializer.extend_value(value);
131 }
132 serializer.to_encoded_key()
133 }
134
135 fn decode(key: &EncodedKey) -> Option<Self> {
136 let mut de = KeyDeserializer::from_bytes(key.as_slice());
137
138 let version = de.read_u8().ok()?;
139 if version != VERSION {
140 return None;
141 }
142
143 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
144 if kind != Self::KIND {
145 return None;
146 }
147
148 let ringbuffer = de.read_u64().ok()?;
149
150 Some(Self {
151 ringbuffer: RingBufferId(ringbuffer),
152 partition_values: vec![],
153 })
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160
161 #[test]
162 fn test_metadata_key_encode_decode_roundtrip() {
163 let key = RingBufferMetadataKey::encoded_partition(
164 RingBufferId(42),
165 vec![Value::Utf8("east".to_string())],
166 );
167 let mut de = KeyDeserializer::from_bytes(key.as_slice());
168 let _ = (de.read_u8(), de.read_u8(), de.read_u64());
169 let value = de.read_value().unwrap();
170 assert_eq!(value, Value::Utf8("east".to_string()));
171 }
172
173 #[test]
174 fn test_metadata_key_encode_decode_multiple() {
175 let key = RingBufferMetadataKey::encoded_partition(
176 RingBufferId(7),
177 vec![Value::Utf8("us".to_string()), Value::Uint8(42)],
178 );
179 let mut de = KeyDeserializer::from_bytes(key.as_slice());
180 let _ = (de.read_u8(), de.read_u8(), de.read_u64());
181 assert_eq!(de.read_value().unwrap(), Value::Utf8("us".to_string()));
182 assert_eq!(de.read_value().unwrap(), Value::Uint8(42));
183 }
184}