Skip to main content

reifydb_core/key/
series_row.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::Bound;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	interface::catalog::{id::SeriesId, schema::SchemaId},
10	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13const VERSION: u8 = 1;
14
15/// Key for series data rows.
16///
17/// Layout without tag: `[Version | Row(0x03) | SchemaId::Series(id) | ordering_value(u64) | sequence(u64)]`
18/// Layout with tag:    `[Version | Row(0x03) | SchemaId::Series(id) | variant_tag(u8) | ordering_value(u64) |
19/// sequence(u64)]`
20#[derive(Debug, Clone, PartialEq)]
21pub struct SeriesRowKey {
22	pub series: SeriesId,
23	pub variant_tag: Option<u8>,
24	pub key: u64,
25	pub sequence: u64,
26}
27
28impl EncodableKey for SeriesRowKey {
29	const KIND: KeyKind = KeyKind::Row;
30
31	fn encode(&self) -> EncodedKey {
32		let object = SchemaId::Series(self.series);
33		let capacity = if self.variant_tag.is_some() {
34			28
35		} else {
36			27
37		};
38		let mut serializer = KeySerializer::with_capacity(capacity);
39		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_schema_id(object);
40		if let Some(tag) = self.variant_tag {
41			serializer.extend_u8(tag);
42		}
43		serializer.extend_u64(self.key).extend_u64(self.sequence);
44		serializer.to_encoded_key()
45	}
46
47	fn decode(key: &EncodedKey) -> Option<Self> {
48		let mut de = KeyDeserializer::from_bytes(key.as_slice());
49
50		let version = de.read_u8().ok()?;
51		if version != VERSION {
52			return None;
53		}
54
55		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
56		if kind != Self::KIND {
57			return None;
58		}
59
60		let object = de.read_schema_id().ok()?;
61		let series = match object {
62			SchemaId::Series(id) => id,
63			_ => return None,
64		};
65
66		// We need to know if there's a variant tag. We can tell by the remaining bytes:
67		// Without tag: u64(8) + u64(8) = 16 bytes remain
68		// With tag: u8(1) + u64(8) + u64(8) = 17 bytes remain
69		let remaining = de.remaining();
70		let variant_tag = if remaining > 16 {
71			Some(de.read_u8().ok()?)
72		} else {
73			None
74		};
75
76		let key = de.read_u64().ok()?;
77		let sequence = de.read_u64().ok()?;
78
79		Some(Self {
80			series,
81			variant_tag,
82			key,
83			sequence,
84		})
85	}
86}
87
88/// Range key for scanning series data rows.
89#[derive(Debug, Clone)]
90pub struct SeriesRowKeyRange {
91	pub series: SeriesId,
92	pub variant_tag: Option<u8>,
93	pub key_start: Option<u64>,
94	pub key_end: Option<u64>,
95}
96
97impl SeriesRowKeyRange {
98	/// Create a range covering all rows for a series (optionally filtered by tag).
99	pub fn full_scan(series: SeriesId, variant_tag: Option<u8>) -> EncodedKeyRange {
100		let range = SeriesRowKeyRange {
101			series,
102			variant_tag,
103			key_start: None,
104			key_end: None,
105		};
106		EncodedKeyRange::new(Bound::Included(range.start_key()), Bound::Included(range.end_key()))
107	}
108
109	/// Create a range scan with optional key bounds.
110	pub fn scan_range(
111		series: SeriesId,
112		variant_tag: Option<u8>,
113		key_start: Option<u64>,
114		key_end: Option<u64>,
115		last_key: Option<&EncodedKey>,
116	) -> EncodedKeyRange {
117		let range = SeriesRowKeyRange {
118			series,
119			variant_tag,
120			key_start,
121			key_end,
122		};
123
124		let start = if let Some(last_key) = last_key {
125			Bound::Excluded(last_key.clone())
126		} else {
127			Bound::Included(range.start_key())
128		};
129
130		EncodedKeyRange::new(start, Bound::Included(range.end_key()))
131	}
132
133	fn start_key(&self) -> EncodedKey {
134		let object = SchemaId::Series(self.series);
135		let mut serializer = KeySerializer::with_capacity(28);
136		serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_schema_id(object);
137		if let Some(tag) = self.variant_tag {
138			serializer.extend_u8(tag);
139		}
140		// Descending key encoding: higher key values have lower encoded values.
141		// The start key (lower bound) uses key_end (the highest key value in
142		// the desired range) to begin scanning from the newest matching row.
143		if let Some(key_val) = self.key_end {
144			serializer.extend_u64(key_val);
145		}
146		serializer.to_encoded_key()
147	}
148
149	fn end_key(&self) -> EncodedKey {
150		// Descending key encoding: lower key values have higher encoded values.
151		// The end key (upper bound) uses key_start (the lowest key value in
152		// the desired range) to stop scanning after the oldest matching row.
153		if let Some(key_val) = self.key_start {
154			let object = SchemaId::Series(self.series);
155			let mut serializer = KeySerializer::with_capacity(28);
156			serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_schema_id(object);
157			if let Some(tag) = self.variant_tag {
158				serializer.extend_u8(tag);
159			}
160			// Use sequence 0 which encodes to max bytes in descending encoding,
161			// ensuring all rows at this key value are included.
162			serializer.extend_u64(key_val).extend_u64(0u64);
163			serializer.to_encoded_key()
164		} else {
165			// Use SchemaId ordering trick to get end of range
166			let object = SchemaId::Series(self.series);
167			let mut serializer = KeySerializer::with_capacity(11);
168			serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_schema_id(object.prev());
169			serializer.to_encoded_key()
170		}
171	}
172}
173
174#[cfg(test)]
175mod tests {
176	use super::*;
177
178	#[test]
179	fn test_encode_decode_without_tag() {
180		let key = SeriesRowKey {
181			series: SeriesId(42),
182			variant_tag: None,
183			key: 1706745600000,
184			sequence: 1,
185		};
186		let encoded = key.encode();
187		let decoded = SeriesRowKey::decode(&encoded).unwrap();
188		assert_eq!(decoded.series, SeriesId(42));
189		assert_eq!(decoded.variant_tag, None);
190		assert_eq!(decoded.key, 1706745600000);
191		assert_eq!(decoded.sequence, 1);
192	}
193
194	#[test]
195	fn test_encode_decode_with_tag() {
196		let key = SeriesRowKey {
197			series: SeriesId(42),
198			variant_tag: Some(3),
199			key: 1706745600000,
200			sequence: 5,
201		};
202		let encoded = key.encode();
203		let decoded = SeriesRowKey::decode(&encoded).unwrap();
204		assert_eq!(decoded.series, SeriesId(42));
205		assert_eq!(decoded.variant_tag, Some(3));
206		assert_eq!(decoded.key, 1706745600000);
207		assert_eq!(decoded.sequence, 5);
208	}
209
210	#[test]
211	fn test_ordering_by_key() {
212		let key1 = SeriesRowKey {
213			series: SeriesId(1),
214			variant_tag: None,
215			key: 100,
216			sequence: 0,
217		};
218		let key2 = SeriesRowKey {
219			series: SeriesId(1),
220			variant_tag: None,
221			key: 200,
222			sequence: 0,
223		};
224		let e1 = key1.encode();
225		let e2 = key2.encode();
226		// Keycode encoding uses NOT of big-endian, producing descending order
227		// Smaller key values sort AFTER larger key values
228		assert!(e1 > e2, "key descending ordering not preserved");
229	}
230
231	#[test]
232	fn test_ordering_by_sequence() {
233		let key1 = SeriesRowKey {
234			series: SeriesId(1),
235			variant_tag: None,
236			key: 100,
237			sequence: 1,
238		};
239		let key2 = SeriesRowKey {
240			series: SeriesId(1),
241			variant_tag: None,
242			key: 100,
243			sequence: 2,
244		};
245		let e1 = key1.encode();
246		let e2 = key2.encode();
247		// Keycode encoding uses NOT of big-endian, producing descending order
248		assert!(e1 > e2, "sequence descending ordering not preserved");
249	}
250}