reifydb_core/key/
series_row.rs1use std::collections::Bound;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 interface::catalog::{id::SeriesId, primitive::PrimitiveId},
10 util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13const VERSION: u8 = 1;
14
15#[derive(Debug, Clone, PartialEq)]
21pub struct SeriesRowKey {
22 pub series: SeriesId,
23 pub variant_tag: Option<u8>,
24 pub timestamp: i64,
25 pub sequence: u64,
26}
27
28impl EncodableKey for SeriesRowKey {
29 const KIND: KeyKind = KeyKind::Row;
30
31 fn encode(&self) -> EncodedKey {
32 let primitive = PrimitiveId::Series(self.series);
33 let capacity = if self.variant_tag.is_some() {
34 28
35 } else {
36 27
37 };
38 let mut serializer = KeySerializer::with_capacity(capacity);
39 serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_primitive_id(primitive);
40 if let Some(tag) = self.variant_tag {
41 serializer.extend_u8(tag);
42 }
43 serializer.extend_i64(self.timestamp).extend_u64(self.sequence);
44 serializer.to_encoded_key()
45 }
46
47 fn decode(key: &EncodedKey) -> Option<Self> {
48 let mut de = KeyDeserializer::from_bytes(key.as_slice());
49
50 let version = de.read_u8().ok()?;
51 if version != VERSION {
52 return None;
53 }
54
55 let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
56 if kind != Self::KIND {
57 return None;
58 }
59
60 let primitive = de.read_primitive_id().ok()?;
61 let series = match primitive {
62 PrimitiveId::Series(id) => id,
63 _ => return None,
64 };
65
66 let remaining = de.remaining();
70 let variant_tag = if remaining > 16 {
71 Some(de.read_u8().ok()?)
72 } else {
73 None
74 };
75
76 let timestamp = de.read_i64().ok()?;
77 let sequence = de.read_u64().ok()?;
78
79 Some(Self {
80 series,
81 variant_tag,
82 timestamp,
83 sequence,
84 })
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct SeriesRowKeyRange {
91 pub series: SeriesId,
92 pub variant_tag: Option<u8>,
93 pub time_start: Option<i64>,
94 pub time_end: Option<i64>,
95}
96
97impl SeriesRowKeyRange {
98 pub fn full_scan(series: SeriesId, variant_tag: Option<u8>) -> EncodedKeyRange {
100 let range = SeriesRowKeyRange {
101 series,
102 variant_tag,
103 time_start: None,
104 time_end: None,
105 };
106 EncodedKeyRange::new(Bound::Included(range.start_key()), Bound::Included(range.end_key()))
107 }
108
109 pub fn scan_range(
111 series: SeriesId,
112 variant_tag: Option<u8>,
113 time_start: Option<i64>,
114 time_end: Option<i64>,
115 last_key: Option<&EncodedKey>,
116 ) -> EncodedKeyRange {
117 let range = SeriesRowKeyRange {
118 series,
119 variant_tag,
120 time_start,
121 time_end,
122 };
123
124 let start = if let Some(last_key) = last_key {
125 Bound::Excluded(last_key.clone())
126 } else {
127 Bound::Included(range.start_key())
128 };
129
130 EncodedKeyRange::new(start, Bound::Included(range.end_key()))
131 }
132
133 fn start_key(&self) -> EncodedKey {
134 let primitive = PrimitiveId::Series(self.series);
135 let mut serializer = KeySerializer::with_capacity(28);
136 serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_primitive_id(primitive);
137 if let Some(tag) = self.variant_tag {
138 serializer.extend_u8(tag);
139 }
140 if let Some(ts) = self.time_end {
144 serializer.extend_i64(ts);
145 }
146 serializer.to_encoded_key()
147 }
148
149 fn end_key(&self) -> EncodedKey {
150 if let Some(ts) = self.time_start {
154 let primitive = PrimitiveId::Series(self.series);
155 let mut serializer = KeySerializer::with_capacity(28);
156 serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_primitive_id(primitive);
157 if let Some(tag) = self.variant_tag {
158 serializer.extend_u8(tag);
159 }
160 serializer.extend_i64(ts).extend_u64(0u64);
163 serializer.to_encoded_key()
164 } else {
165 let primitive = PrimitiveId::Series(self.series);
167 let mut serializer = KeySerializer::with_capacity(11);
168 serializer
169 .extend_u8(VERSION)
170 .extend_u8(KeyKind::Row as u8)
171 .extend_primitive_id(primitive.prev());
172 serializer.to_encoded_key()
173 }
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[test]
182 fn test_encode_decode_without_tag() {
183 let key = SeriesRowKey {
184 series: SeriesId(42),
185 variant_tag: None,
186 timestamp: 1706745600000,
187 sequence: 1,
188 };
189 let encoded = key.encode();
190 let decoded = SeriesRowKey::decode(&encoded).unwrap();
191 assert_eq!(decoded.series, SeriesId(42));
192 assert_eq!(decoded.variant_tag, None);
193 assert_eq!(decoded.timestamp, 1706745600000);
194 assert_eq!(decoded.sequence, 1);
195 }
196
197 #[test]
198 fn test_encode_decode_with_tag() {
199 let key = SeriesRowKey {
200 series: SeriesId(42),
201 variant_tag: Some(3),
202 timestamp: 1706745600000,
203 sequence: 5,
204 };
205 let encoded = key.encode();
206 let decoded = SeriesRowKey::decode(&encoded).unwrap();
207 assert_eq!(decoded.series, SeriesId(42));
208 assert_eq!(decoded.variant_tag, Some(3));
209 assert_eq!(decoded.timestamp, 1706745600000);
210 assert_eq!(decoded.sequence, 5);
211 }
212
213 #[test]
214 fn test_ordering_by_timestamp() {
215 let key1 = SeriesRowKey {
216 series: SeriesId(1),
217 variant_tag: None,
218 timestamp: 100,
219 sequence: 0,
220 };
221 let key2 = SeriesRowKey {
222 series: SeriesId(1),
223 variant_tag: None,
224 timestamp: 200,
225 sequence: 0,
226 };
227 let e1 = key1.encode();
228 let e2 = key2.encode();
229 assert!(e1 > e2, "timestamp descending ordering not preserved");
232 }
233
234 #[test]
235 fn test_ordering_by_sequence() {
236 let key1 = SeriesRowKey {
237 series: SeriesId(1),
238 variant_tag: None,
239 timestamp: 100,
240 sequence: 1,
241 };
242 let key2 = SeriesRowKey {
243 series: SeriesId(1),
244 variant_tag: None,
245 timestamp: 100,
246 sequence: 2,
247 };
248 let e1 = key1.encode();
249 let e2 = key2.encode();
250 assert!(e1 > e2, "sequence descending ordering not preserved");
252 }
253}