reifydb_core/key/
series_row.rs1use std::collections::Bound;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 interface::catalog::{id::SeriesId, schema::SchemaId},
10 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13const VERSION: u8 = 1;
14
15#[derive(Debug, Clone, PartialEq)]
21pub struct SeriesRowKey {
22 pub series: SeriesId,
23 pub variant_tag: Option<u8>,
24 pub key: u64,
25 pub sequence: u64,
26}
27
28impl EncodableKey for SeriesRowKey {
29 const KIND: KeyKind = KeyKind::Row;
30
31 fn encode(&self) -> EncodedKey {
32 let object = SchemaId::Series(self.series);
33 let capacity = if self.variant_tag.is_some() {
34 28
35 } else {
36 27
37 };
38 let mut serializer = KeySerializer::with_capacity(capacity);
39 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_schema_id(object);
40 if let Some(tag) = self.variant_tag {
41 serializer.extend_u8(tag);
42 }
43 serializer.extend_u64(self.key).extend_u64(self.sequence);
44 serializer.to_encoded_key()
45 }
46
47 fn decode(key: &EncodedKey) -> Option<Self> {
48 let mut de = KeyDeserializer::from_bytes(key.as_slice());
49
50 let version = de.read_u8().ok()?;
51 if version != VERSION {
52 return None;
53 }
54
55 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
56 if kind != Self::KIND {
57 return None;
58 }
59
60 let object = de.read_schema_id().ok()?;
61 let series = match object {
62 SchemaId::Series(id) => id,
63 _ => return None,
64 };
65
66 let remaining = de.remaining();
70 let variant_tag = if remaining > 16 {
71 Some(de.read_u8().ok()?)
72 } else {
73 None
74 };
75
76 let key = de.read_u64().ok()?;
77 let sequence = de.read_u64().ok()?;
78
79 Some(Self {
80 series,
81 variant_tag,
82 key,
83 sequence,
84 })
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct SeriesRowKeyRange {
91 pub series: SeriesId,
92 pub variant_tag: Option<u8>,
93 pub key_start: Option<u64>,
94 pub key_end: Option<u64>,
95}
96
97impl SeriesRowKeyRange {
98 pub fn full_scan(series: SeriesId, variant_tag: Option<u8>) -> EncodedKeyRange {
100 let range = SeriesRowKeyRange {
101 series,
102 variant_tag,
103 key_start: None,
104 key_end: None,
105 };
106 EncodedKeyRange::new(Bound::Included(range.start_key()), Bound::Included(range.end_key()))
107 }
108
109 pub fn scan_range(
111 series: SeriesId,
112 variant_tag: Option<u8>,
113 key_start: Option<u64>,
114 key_end: Option<u64>,
115 last_key: Option<&EncodedKey>,
116 ) -> EncodedKeyRange {
117 let range = SeriesRowKeyRange {
118 series,
119 variant_tag,
120 key_start,
121 key_end,
122 };
123
124 let start = if let Some(last_key) = last_key {
125 Bound::Excluded(last_key.clone())
126 } else {
127 Bound::Included(range.start_key())
128 };
129
130 EncodedKeyRange::new(start, Bound::Included(range.end_key()))
131 }
132
133 fn start_key(&self) -> EncodedKey {
134 let object = SchemaId::Series(self.series);
135 let mut serializer = KeySerializer::with_capacity(28);
136 serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_schema_id(object);
137 if let Some(tag) = self.variant_tag {
138 serializer.extend_u8(tag);
139 }
140 if let Some(key_val) = self.key_end {
144 serializer.extend_u64(key_val);
145 }
146 serializer.to_encoded_key()
147 }
148
149 fn end_key(&self) -> EncodedKey {
150 if let Some(key_val) = self.key_start {
154 let object = SchemaId::Series(self.series);
155 let mut serializer = KeySerializer::with_capacity(28);
156 serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_schema_id(object);
157 if let Some(tag) = self.variant_tag {
158 serializer.extend_u8(tag);
159 }
160 serializer.extend_u64(key_val).extend_u64(0u64);
163 serializer.to_encoded_key()
164 } else {
165 let object = SchemaId::Series(self.series);
167 let mut serializer = KeySerializer::with_capacity(11);
168 serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_schema_id(object.prev());
169 serializer.to_encoded_key()
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[test]
179 fn test_encode_decode_without_tag() {
180 let key = SeriesRowKey {
181 series: SeriesId(42),
182 variant_tag: None,
183 key: 1706745600000,
184 sequence: 1,
185 };
186 let encoded = key.encode();
187 let decoded = SeriesRowKey::decode(&encoded).unwrap();
188 assert_eq!(decoded.series, SeriesId(42));
189 assert_eq!(decoded.variant_tag, None);
190 assert_eq!(decoded.key, 1706745600000);
191 assert_eq!(decoded.sequence, 1);
192 }
193
194 #[test]
195 fn test_encode_decode_with_tag() {
196 let key = SeriesRowKey {
197 series: SeriesId(42),
198 variant_tag: Some(3),
199 key: 1706745600000,
200 sequence: 5,
201 };
202 let encoded = key.encode();
203 let decoded = SeriesRowKey::decode(&encoded).unwrap();
204 assert_eq!(decoded.series, SeriesId(42));
205 assert_eq!(decoded.variant_tag, Some(3));
206 assert_eq!(decoded.key, 1706745600000);
207 assert_eq!(decoded.sequence, 5);
208 }
209
210 #[test]
211 fn test_ordering_by_key() {
212 let key1 = SeriesRowKey {
213 series: SeriesId(1),
214 variant_tag: None,
215 key: 100,
216 sequence: 0,
217 };
218 let key2 = SeriesRowKey {
219 series: SeriesId(1),
220 variant_tag: None,
221 key: 200,
222 sequence: 0,
223 };
224 let e1 = key1.encode();
225 let e2 = key2.encode();
226 assert!(e1 > e2, "key descending ordering not preserved");
229 }
230
231 #[test]
232 fn test_ordering_by_sequence() {
233 let key1 = SeriesRowKey {
234 series: SeriesId(1),
235 variant_tag: None,
236 key: 100,
237 sequence: 1,
238 };
239 let key2 = SeriesRowKey {
240 series: SeriesId(1),
241 variant_tag: None,
242 key: 100,
243 sequence: 2,
244 };
245 let e1 = key1.encode();
246 let e2 = key2.encode();
247 assert!(e1 > e2, "sequence descending ordering not preserved");
249 }
250}