reifydb_core/key/
series_row.rs1use std::collections::Bound;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 interface::catalog::{id::SeriesId, shape::ShapeId},
10 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13const VERSION: u8 = 1;
14
15#[derive(Debug, Clone, PartialEq)]
21pub struct SeriesRowKey {
22 pub series: SeriesId,
23 pub variant_tag: Option<u8>,
24 pub key: u64,
25 pub sequence: u64,
26}
27
28impl EncodableKey for SeriesRowKey {
29 const KIND: KeyKind = KeyKind::Row;
30
31 fn encode(&self) -> EncodedKey {
32 let object = ShapeId::Series(self.series);
33 let capacity = if self.variant_tag.is_some() {
34 28
35 } else {
36 27
37 };
38 let mut serializer = KeySerializer::with_capacity(capacity);
39 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_shape_id(object);
40 if let Some(tag) = self.variant_tag {
41 serializer.extend_u8(tag);
42 }
43 serializer.extend_u64(self.key).extend_u64(self.sequence);
44 serializer.to_encoded_key()
45 }
46
47 fn decode(key: &EncodedKey) -> Option<Self> {
48 let mut de = KeyDeserializer::from_bytes(key.as_slice());
49
50 let version = de.read_u8().ok()?;
51 if version != VERSION {
52 return None;
53 }
54
55 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
56 if kind != Self::KIND {
57 return None;
58 }
59
60 let object = de.read_shape_id().ok()?;
61 let series = match object {
62 ShapeId::Series(id) => id,
63 _ => return None,
64 };
65
66 let remaining = de.remaining();
70 let variant_tag = if remaining > 16 {
71 Some(de.read_u8().ok()?)
72 } else {
73 None
74 };
75
76 let key = de.read_u64().ok()?;
77 let sequence = de.read_u64().ok()?;
78
79 Some(Self {
80 series,
81 variant_tag,
82 key,
83 sequence,
84 })
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct SeriesRowKeyRange {
91 pub series: SeriesId,
92 pub variant_tag: Option<u8>,
93 pub key_start: Option<u64>,
94 pub key_end: Option<u64>,
95}
96
97impl SeriesRowKeyRange {
98 pub fn full_scan(series: SeriesId, variant_tag: Option<u8>) -> EncodedKeyRange {
100 let range = SeriesRowKeyRange {
101 series,
102 variant_tag,
103 key_start: None,
104 key_end: None,
105 };
106 EncodedKeyRange::new(Bound::Included(range.start_key()), Bound::Included(range.end_key()))
107 }
108
109 pub fn scan_range(
114 series: SeriesId,
115 variant_tag: Option<u8>,
116 key_start: Option<u64>,
117 key_end: Option<u64>,
118 last_key: Option<&EncodedKey>,
119 ) -> EncodedKeyRange {
120 if matches!(key_end, Some(0)) {
123 let empty = EncodedKey::new(Vec::<u8>::new());
124 return EncodedKeyRange::new(Bound::Excluded(empty.clone()), Bound::Excluded(empty));
125 }
126
127 let range = SeriesRowKeyRange {
128 series,
129 variant_tag,
130 key_start,
131 key_end,
132 };
133
134 let start = if let Some(last_key) = last_key {
135 Bound::Excluded(last_key.clone())
136 } else {
137 Bound::Included(range.start_key())
138 };
139
140 EncodedKeyRange::new(start, Bound::Included(range.end_key()))
141 }
142
143 fn start_key(&self) -> EncodedKey {
144 let object = ShapeId::Series(self.series);
145 let mut serializer = KeySerializer::with_capacity(28);
146 serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_shape_id(object);
147 if let Some(tag) = self.variant_tag {
148 serializer.extend_u8(tag);
149 }
150 if let Some(key_val) = self.key_end {
157 serializer.extend_u64(key_val - 1);
158 }
159 serializer.to_encoded_key()
160 }
161
162 fn end_key(&self) -> EncodedKey {
163 if let Some(key_val) = self.key_start {
167 let object = ShapeId::Series(self.series);
168 let mut serializer = KeySerializer::with_capacity(28);
169 serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_shape_id(object);
170 if let Some(tag) = self.variant_tag {
171 serializer.extend_u8(tag);
172 }
173 serializer.extend_u64(key_val).extend_u64(0u64);
176 serializer.to_encoded_key()
177 } else {
178 let object = ShapeId::Series(self.series);
180 let mut serializer = KeySerializer::with_capacity(11);
181 serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_shape_id(object.prev());
182 serializer.to_encoded_key()
183 }
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn test_encode_decode_without_tag() {
193 let key = SeriesRowKey {
194 series: SeriesId(42),
195 variant_tag: None,
196 key: 1706745600000,
197 sequence: 1,
198 };
199 let encoded = key.encode();
200 let decoded = SeriesRowKey::decode(&encoded).unwrap();
201 assert_eq!(decoded.series, SeriesId(42));
202 assert_eq!(decoded.variant_tag, None);
203 assert_eq!(decoded.key, 1706745600000);
204 assert_eq!(decoded.sequence, 1);
205 }
206
207 #[test]
208 fn test_encode_decode_with_tag() {
209 let key = SeriesRowKey {
210 series: SeriesId(42),
211 variant_tag: Some(3),
212 key: 1706745600000,
213 sequence: 5,
214 };
215 let encoded = key.encode();
216 let decoded = SeriesRowKey::decode(&encoded).unwrap();
217 assert_eq!(decoded.series, SeriesId(42));
218 assert_eq!(decoded.variant_tag, Some(3));
219 assert_eq!(decoded.key, 1706745600000);
220 assert_eq!(decoded.sequence, 5);
221 }
222
223 #[test]
224 fn test_ordering_by_key() {
225 let key1 = SeriesRowKey {
226 series: SeriesId(1),
227 variant_tag: None,
228 key: 100,
229 sequence: 0,
230 };
231 let key2 = SeriesRowKey {
232 series: SeriesId(1),
233 variant_tag: None,
234 key: 200,
235 sequence: 0,
236 };
237 let e1 = key1.encode();
238 let e2 = key2.encode();
239 assert!(e1 > e2, "key descending ordering not preserved");
242 }
243
244 #[test]
245 fn test_ordering_by_sequence() {
246 let key1 = SeriesRowKey {
247 series: SeriesId(1),
248 variant_tag: None,
249 key: 100,
250 sequence: 1,
251 };
252 let key2 = SeriesRowKey {
253 series: SeriesId(1),
254 variant_tag: None,
255 key: 100,
256 sequence: 2,
257 };
258 let e1 = key1.encode();
259 let e2 = key2.encode();
260 assert!(e1 > e2, "sequence descending ordering not preserved");
262 }
263}