reifydb_core/key/
series_row.rs1use std::collections::Bound;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 interface::catalog::{id::SeriesId, shape::ShapeId},
10 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13#[derive(Debug, Clone, PartialEq)]
14pub struct SeriesRowKey {
15 pub series: SeriesId,
16 pub variant_tag: Option<u8>,
17 pub key: u64,
18 pub sequence: u64,
19}
20
21impl EncodableKey for SeriesRowKey {
22 const KIND: KeyKind = KeyKind::Row;
23
24 fn encode(&self) -> EncodedKey {
25 let object = ShapeId::Series(self.series);
26 let capacity = if self.variant_tag.is_some() {
27 28
28 } else {
29 27
30 };
31 let mut serializer = KeySerializer::with_capacity(capacity);
32 serializer.extend_u8(Self::KIND as u8).extend_shape_id(object);
33 if let Some(tag) = self.variant_tag {
34 serializer.extend_u8(tag);
35 }
36 serializer.extend_u64(self.key).extend_u64(self.sequence);
37 serializer.to_encoded_key()
38 }
39
40 fn decode(key: &EncodedKey) -> Option<Self> {
41 let mut de = KeyDeserializer::from_bytes(key.as_slice());
42
43 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
44 if kind != Self::KIND {
45 return None;
46 }
47
48 let object = de.read_shape_id().ok()?;
49 let series = match object {
50 ShapeId::Series(id) => id,
51 _ => return None,
52 };
53
54 let mut temp_de = KeyDeserializer::from_bytes(de.remaining_bytes());
55 let tag_present = if temp_de.read_u64().is_ok() {
56 if temp_de.read_u64().is_ok() {
57 !temp_de.is_empty()
58 } else {
59 true
60 }
61 } else {
62 true
63 };
64
65 let variant_tag = if tag_present {
66 Some(de.read_u8().ok()?)
67 } else {
68 None
69 };
70
71 let key = de.read_u64().ok()?;
72 let sequence = de.read_u64().ok()?;
73
74 Some(Self {
75 series,
76 variant_tag,
77 key,
78 sequence,
79 })
80 }
81}
82
83#[derive(Debug, Clone)]
84pub struct SeriesRowKeyRange {
85 pub series: SeriesId,
86 pub variant_tag: Option<u8>,
87 pub key_start: Option<u64>,
88 pub key_end: Option<u64>,
89}
90
91impl SeriesRowKeyRange {
92 pub fn full_scan(series: SeriesId, variant_tag: Option<u8>) -> EncodedKeyRange {
93 let range = SeriesRowKeyRange {
94 series,
95 variant_tag,
96 key_start: None,
97 key_end: None,
98 };
99 EncodedKeyRange::new(Bound::Included(range.start_key()), Bound::Included(range.end_key()))
100 }
101
102 pub fn scan_range(
103 series: SeriesId,
104 variant_tag: Option<u8>,
105 key_start: Option<u64>,
106 key_end: Option<u64>,
107 last_key: Option<&EncodedKey>,
108 ) -> EncodedKeyRange {
109 if matches!(key_end, Some(0)) {
110 let empty = EncodedKey::new(Vec::<u8>::new());
111 return EncodedKeyRange::new(Bound::Excluded(empty.clone()), Bound::Excluded(empty));
112 }
113
114 let range = SeriesRowKeyRange {
115 series,
116 variant_tag,
117 key_start,
118 key_end,
119 };
120
121 let start = if let Some(last_key) = last_key {
122 Bound::Excluded(last_key.clone())
123 } else {
124 Bound::Included(range.start_key())
125 };
126
127 EncodedKeyRange::new(start, Bound::Included(range.end_key()))
128 }
129
130 fn start_key(&self) -> EncodedKey {
131 let object = ShapeId::Series(self.series);
132 let mut serializer = KeySerializer::with_capacity(27);
133 serializer.extend_u8(KeyKind::Row as u8).extend_shape_id(object);
134 if let Some(tag) = self.variant_tag {
135 serializer.extend_u8(tag);
136 }
137
138 if let Some(key_val) = self.key_end {
139 serializer.extend_u64(key_val - 1);
140 }
141 serializer.to_encoded_key()
142 }
143
144 fn end_key(&self) -> EncodedKey {
145 if let Some(key_val) = self.key_start {
146 let object = ShapeId::Series(self.series);
147 let mut serializer = KeySerializer::with_capacity(27);
148 serializer.extend_u8(KeyKind::Row as u8).extend_shape_id(object);
149 if let Some(tag) = self.variant_tag {
150 serializer.extend_u8(tag);
151 }
152
153 serializer.extend_u64(key_val).extend_u64(0u64);
154 serializer.to_encoded_key()
155 } else {
156 let object = ShapeId::Series(self.series);
157 let mut serializer = KeySerializer::with_capacity(10);
158 serializer.extend_u8(KeyKind::Row as u8).extend_shape_id(object.prev());
159 serializer.to_encoded_key()
160 }
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167
168 #[test]
169 fn test_encode_decode_without_tag() {
170 let key = SeriesRowKey {
171 series: SeriesId(42),
172 variant_tag: None,
173 key: 1706745600000,
174 sequence: 1,
175 };
176 let encoded = key.encode();
177 let decoded = SeriesRowKey::decode(&encoded).unwrap();
178 assert_eq!(decoded.series, SeriesId(42));
179 assert_eq!(decoded.variant_tag, None);
180 assert_eq!(decoded.key, 1706745600000);
181 assert_eq!(decoded.sequence, 1);
182 }
183
184 #[test]
185 fn test_encode_decode_with_tag() {
186 let key = SeriesRowKey {
187 series: SeriesId(42),
188 variant_tag: Some(3),
189 key: 1706745600000,
190 sequence: 5,
191 };
192 let encoded = key.encode();
193 let decoded = SeriesRowKey::decode(&encoded).unwrap();
194 assert_eq!(decoded.series, SeriesId(42));
195 assert_eq!(decoded.variant_tag, Some(3));
196 assert_eq!(decoded.key, 1706745600000);
197 assert_eq!(decoded.sequence, 5);
198 }
199
200 #[test]
201 fn test_ordering_by_key() {
202 let key1 = SeriesRowKey {
203 series: SeriesId(1),
204 variant_tag: None,
205 key: 100,
206 sequence: 0,
207 };
208 let key2 = SeriesRowKey {
209 series: SeriesId(1),
210 variant_tag: None,
211 key: 200,
212 sequence: 0,
213 };
214 let e1 = key1.encode();
215 let e2 = key2.encode();
216
217 assert!(e1 > e2, "key descending ordering not preserved");
218 }
219
220 #[test]
221 fn test_ordering_by_sequence() {
222 let key1 = SeriesRowKey {
223 series: SeriesId(1),
224 variant_tag: None,
225 key: 100,
226 sequence: 1,
227 };
228 let key2 = SeriesRowKey {
229 series: SeriesId(1),
230 variant_tag: None,
231 key: 100,
232 sequence: 2,
233 };
234 let e1 = key1.encode();
235 let e2 = key2.encode();
236
237 assert!(e1 > e2, "sequence descending ordering not preserved");
238 }
239}