Skip to main content

reifydb_core/key/
series_row.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::Bound;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	interface::catalog::{id::SeriesId, shape::ShapeId},
10	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13const VERSION: u8 = 1;
14
15/// Key for series data rows.
16///
17/// Layout without tag: `[Version | Row(0x03) | ShapeId::Series(id) | ordering_value(u64) | sequence(u64)]`
18/// Layout with tag:    `[Version | Row(0x03) | ShapeId::Series(id) | variant_tag(u8) | ordering_value(u64) |
19/// sequence(u64)]`
20#[derive(Debug, Clone, PartialEq)]
21pub struct SeriesRowKey {
22	pub series: SeriesId,
23	pub variant_tag: Option<u8>,
24	pub key: u64,
25	pub sequence: u64,
26}
27
28impl EncodableKey for SeriesRowKey {
29	const KIND: KeyKind = KeyKind::Row;
30
31	fn encode(&self) -> EncodedKey {
32		let object = ShapeId::Series(self.series);
33		let capacity = if self.variant_tag.is_some() {
34			28
35		} else {
36			27
37		};
38		let mut serializer = KeySerializer::with_capacity(capacity);
39		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_shape_id(object);
40		if let Some(tag) = self.variant_tag {
41			serializer.extend_u8(tag);
42		}
43		serializer.extend_u64(self.key).extend_u64(self.sequence);
44		serializer.to_encoded_key()
45	}
46
47	fn decode(key: &EncodedKey) -> Option<Self> {
48		let mut de = KeyDeserializer::from_bytes(key.as_slice());
49
50		let version = de.read_u8().ok()?;
51		if version != VERSION {
52			return None;
53		}
54
55		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
56		if kind != Self::KIND {
57			return None;
58		}
59
60		let object = de.read_shape_id().ok()?;
61		let series = match object {
62			ShapeId::Series(id) => id,
63			_ => return None,
64		};
65
66		// We need to know if there's a variant tag. We can tell by the remaining bytes:
67		// Without tag: u64(8) + u64(8) = 16 bytes remain
68		// With tag: u8(1) + u64(8) + u64(8) = 17 bytes remain
69		let remaining = de.remaining();
70		let variant_tag = if remaining > 16 {
71			Some(de.read_u8().ok()?)
72		} else {
73			None
74		};
75
76		let key = de.read_u64().ok()?;
77		let sequence = de.read_u64().ok()?;
78
79		Some(Self {
80			series,
81			variant_tag,
82			key,
83			sequence,
84		})
85	}
86}
87
88/// Range key for scanning series data rows.
89#[derive(Debug, Clone)]
90pub struct SeriesRowKeyRange {
91	pub series: SeriesId,
92	pub variant_tag: Option<u8>,
93	pub key_start: Option<u64>,
94	pub key_end: Option<u64>,
95}
96
97impl SeriesRowKeyRange {
98	/// Create a range covering all rows for a series (optionally filtered by tag).
99	pub fn full_scan(series: SeriesId, variant_tag: Option<u8>) -> EncodedKeyRange {
100		let range = SeriesRowKeyRange {
101			series,
102			variant_tag,
103			key_start: None,
104			key_end: None,
105		};
106		EncodedKeyRange::new(Bound::Included(range.start_key()), Bound::Included(range.end_key()))
107	}
108
109	/// Create a range scan with optional key bounds. Semantics are half-open
110	/// `[key_start, key_end)` - `key_start` is inclusive, `key_end` is
111	/// exclusive. This matches `Bucket`'s convention so callers can pass
112	/// bucket bounds directly.
113	pub fn scan_range(
114		series: SeriesId,
115		variant_tag: Option<u8>,
116		key_start: Option<u64>,
117		key_end: Option<u64>,
118		last_key: Option<&EncodedKey>,
119	) -> EncodedKeyRange {
120		// `[start, 0)` is empty regardless of `key_start`. Represent it as
121		// a range that yields no rows.
122		if matches!(key_end, Some(0)) {
123			let empty = EncodedKey::new(Vec::<u8>::new());
124			return EncodedKeyRange::new(Bound::Excluded(empty.clone()), Bound::Excluded(empty));
125		}
126
127		let range = SeriesRowKeyRange {
128			series,
129			variant_tag,
130			key_start,
131			key_end,
132		};
133
134		let start = if let Some(last_key) = last_key {
135			Bound::Excluded(last_key.clone())
136		} else {
137			Bound::Included(range.start_key())
138		};
139
140		EncodedKeyRange::new(start, Bound::Included(range.end_key()))
141	}
142
143	fn start_key(&self) -> EncodedKey {
144		let object = ShapeId::Series(self.series);
145		let mut serializer = KeySerializer::with_capacity(28);
146		serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_shape_id(object);
147		if let Some(tag) = self.variant_tag {
148			serializer.extend_u8(tag);
149		}
150		// Descending key encoding: higher key values have lower encoded values.
151		// The start key (lower bound) uses key_end (exclusive) to begin
152		// scanning from the newest matching row. Under half-open
153		// `[start, end)` semantics the highest *included* key is
154		// `end - 1`, so serialize that. `key_end == Some(0)` is handled
155		// in `scan_range` before reaching here.
156		if let Some(key_val) = self.key_end {
157			serializer.extend_u64(key_val - 1);
158		}
159		serializer.to_encoded_key()
160	}
161
162	fn end_key(&self) -> EncodedKey {
163		// Descending key encoding: lower key values have higher encoded values.
164		// The end key (upper bound) uses key_start (the lowest key value in
165		// the desired range) to stop scanning after the oldest matching row.
166		if let Some(key_val) = self.key_start {
167			let object = ShapeId::Series(self.series);
168			let mut serializer = KeySerializer::with_capacity(28);
169			serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_shape_id(object);
170			if let Some(tag) = self.variant_tag {
171				serializer.extend_u8(tag);
172			}
173			// Use sequence 0 which encodes to max bytes in descending encoding,
174			// ensuring all rows at this key value are included.
175			serializer.extend_u64(key_val).extend_u64(0u64);
176			serializer.to_encoded_key()
177		} else {
178			// Use ShapeId ordering trick to get end of range
179			let object = ShapeId::Series(self.series);
180			let mut serializer = KeySerializer::with_capacity(11);
181			serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_shape_id(object.prev());
182			serializer.to_encoded_key()
183		}
184	}
185}
186
187#[cfg(test)]
188mod tests {
189	use super::*;
190
191	#[test]
192	fn test_encode_decode_without_tag() {
193		let key = SeriesRowKey {
194			series: SeriesId(42),
195			variant_tag: None,
196			key: 1706745600000,
197			sequence: 1,
198		};
199		let encoded = key.encode();
200		let decoded = SeriesRowKey::decode(&encoded).unwrap();
201		assert_eq!(decoded.series, SeriesId(42));
202		assert_eq!(decoded.variant_tag, None);
203		assert_eq!(decoded.key, 1706745600000);
204		assert_eq!(decoded.sequence, 1);
205	}
206
207	#[test]
208	fn test_encode_decode_with_tag() {
209		let key = SeriesRowKey {
210			series: SeriesId(42),
211			variant_tag: Some(3),
212			key: 1706745600000,
213			sequence: 5,
214		};
215		let encoded = key.encode();
216		let decoded = SeriesRowKey::decode(&encoded).unwrap();
217		assert_eq!(decoded.series, SeriesId(42));
218		assert_eq!(decoded.variant_tag, Some(3));
219		assert_eq!(decoded.key, 1706745600000);
220		assert_eq!(decoded.sequence, 5);
221	}
222
223	#[test]
224	fn test_ordering_by_key() {
225		let key1 = SeriesRowKey {
226			series: SeriesId(1),
227			variant_tag: None,
228			key: 100,
229			sequence: 0,
230		};
231		let key2 = SeriesRowKey {
232			series: SeriesId(1),
233			variant_tag: None,
234			key: 200,
235			sequence: 0,
236		};
237		let e1 = key1.encode();
238		let e2 = key2.encode();
239		// Keycode encoding uses NOT of big-endian, producing descending order
240		// Smaller key values sort AFTER larger key values
241		assert!(e1 > e2, "key descending ordering not preserved");
242	}
243
244	#[test]
245	fn test_ordering_by_sequence() {
246		let key1 = SeriesRowKey {
247			series: SeriesId(1),
248			variant_tag: None,
249			key: 100,
250			sequence: 1,
251		};
252		let key2 = SeriesRowKey {
253			series: SeriesId(1),
254			variant_tag: None,
255			key: 100,
256			sequence: 2,
257		};
258		let e1 = key1.encode();
259		let e2 = key2.encode();
260		// Keycode encoding uses NOT of big-endian, producing descending order
261		assert!(e1 > e2, "sequence descending ordering not preserved");
262	}
263}