Skip to main content

reifydb_core/key/
series_row.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::Bound;
5
6use super::{EncodableKey, KeyKind};
7use crate::{
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	interface::catalog::{id::SeriesId, primitive::PrimitiveId},
10	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
11};
12
13const VERSION: u8 = 1;
14
15/// Key for series data rows.
16///
17/// Layout without tag: `[Version | Row(0x03) | PrimitiveId::Series(id) | timestamp(i64) | sequence(u64)]`
18/// Layout with tag:    `[Version | Row(0x03) | PrimitiveId::Series(id) | variant_tag(u8) | timestamp(i64) |
19/// sequence(u64)]`
20#[derive(Debug, Clone, PartialEq)]
21pub struct SeriesRowKey {
22	pub series: SeriesId,
23	pub variant_tag: Option<u8>,
24	pub timestamp: i64,
25	pub sequence: u64,
26}
27
28impl EncodableKey for SeriesRowKey {
29	const KIND: KeyKind = KeyKind::Row;
30
31	fn encode(&self) -> EncodedKey {
32		let primitive = PrimitiveId::Series(self.series);
33		let capacity = if self.variant_tag.is_some() {
34			28
35		} else {
36			27
37		};
38		let mut serializer = KeySerializer::with_capacity(capacity);
39		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_primitive_id(primitive);
40		if let Some(tag) = self.variant_tag {
41			serializer.extend_u8(tag);
42		}
43		serializer.extend_i64(self.timestamp).extend_u64(self.sequence);
44		serializer.to_encoded_key()
45	}
46
47	fn decode(key: &EncodedKey) -> Option<Self> {
48		let mut de = KeyDeserializer::from_bytes(key.as_slice());
49
50		let version = de.read_u8().ok()?;
51		if version != VERSION {
52			return None;
53		}
54
55		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
56		if kind != Self::KIND {
57			return None;
58		}
59
60		let primitive = de.read_primitive_id().ok()?;
61		let series = match primitive {
62			PrimitiveId::Series(id) => id,
63			_ => return None,
64		};
65
66		// We need to know if there's a variant tag. We can tell by the remaining bytes:
67		// Without tag: i64(8) + u64(8) = 16 bytes remain
68		// With tag: u8(1) + i64(8) + u64(8) = 17 bytes remain
69		let remaining = de.remaining();
70		let variant_tag = if remaining > 16 {
71			Some(de.read_u8().ok()?)
72		} else {
73			None
74		};
75
76		let timestamp = de.read_i64().ok()?;
77		let sequence = de.read_u64().ok()?;
78
79		Some(Self {
80			series,
81			variant_tag,
82			timestamp,
83			sequence,
84		})
85	}
86}
87
88/// Range key for scanning series data rows.
89#[derive(Debug, Clone)]
90pub struct SeriesRowKeyRange {
91	pub series: SeriesId,
92	pub variant_tag: Option<u8>,
93	pub time_start: Option<i64>,
94	pub time_end: Option<i64>,
95}
96
97impl SeriesRowKeyRange {
98	/// Create a range covering all rows for a series (optionally filtered by tag).
99	pub fn full_scan(series: SeriesId, variant_tag: Option<u8>) -> EncodedKeyRange {
100		let range = SeriesRowKeyRange {
101			series,
102			variant_tag,
103			time_start: None,
104			time_end: None,
105		};
106		EncodedKeyRange::new(Bound::Included(range.start_key()), Bound::Included(range.end_key()))
107	}
108
109	/// Create a range scan with optional time bounds.
110	pub fn scan_range(
111		series: SeriesId,
112		variant_tag: Option<u8>,
113		time_start: Option<i64>,
114		time_end: Option<i64>,
115		last_key: Option<&EncodedKey>,
116	) -> EncodedKeyRange {
117		let range = SeriesRowKeyRange {
118			series,
119			variant_tag,
120			time_start,
121			time_end,
122		};
123
124		let start = if let Some(last_key) = last_key {
125			Bound::Excluded(last_key.clone())
126		} else {
127			Bound::Included(range.start_key())
128		};
129
130		EncodedKeyRange::new(start, Bound::Included(range.end_key()))
131	}
132
133	fn start_key(&self) -> EncodedKey {
134		let primitive = PrimitiveId::Series(self.series);
135		let mut serializer = KeySerializer::with_capacity(28);
136		serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_primitive_id(primitive);
137		if let Some(tag) = self.variant_tag {
138			serializer.extend_u8(tag);
139		}
140		// Descending key encoding: higher timestamps have lower key values.
141		// The start key (lower bound) uses time_end (the highest timestamp in
142		// the desired range) to begin scanning from the newest matching row.
143		if let Some(ts) = self.time_end {
144			serializer.extend_i64(ts);
145		}
146		serializer.to_encoded_key()
147	}
148
149	fn end_key(&self) -> EncodedKey {
150		// Descending key encoding: lower timestamps have higher key values.
151		// The end key (upper bound) uses time_start (the lowest timestamp in
152		// the desired range) to stop scanning after the oldest matching row.
153		if let Some(ts) = self.time_start {
154			let primitive = PrimitiveId::Series(self.series);
155			let mut serializer = KeySerializer::with_capacity(28);
156			serializer.extend_u8(VERSION).extend_u8(KeyKind::Row as u8).extend_primitive_id(primitive);
157			if let Some(tag) = self.variant_tag {
158				serializer.extend_u8(tag);
159			}
160			// Use sequence 0 which encodes to max bytes in descending encoding,
161			// ensuring all rows at this timestamp are included.
162			serializer.extend_i64(ts).extend_u64(0u64);
163			serializer.to_encoded_key()
164		} else {
165			// Use PrimitiveId ordering trick to get end of range
166			let primitive = PrimitiveId::Series(self.series);
167			let mut serializer = KeySerializer::with_capacity(11);
168			serializer
169				.extend_u8(VERSION)
170				.extend_u8(KeyKind::Row as u8)
171				.extend_primitive_id(primitive.prev());
172			serializer.to_encoded_key()
173		}
174	}
175}
176
177#[cfg(test)]
178mod tests {
179	use super::*;
180
181	#[test]
182	fn test_encode_decode_without_tag() {
183		let key = SeriesRowKey {
184			series: SeriesId(42),
185			variant_tag: None,
186			timestamp: 1706745600000,
187			sequence: 1,
188		};
189		let encoded = key.encode();
190		let decoded = SeriesRowKey::decode(&encoded).unwrap();
191		assert_eq!(decoded.series, SeriesId(42));
192		assert_eq!(decoded.variant_tag, None);
193		assert_eq!(decoded.timestamp, 1706745600000);
194		assert_eq!(decoded.sequence, 1);
195	}
196
197	#[test]
198	fn test_encode_decode_with_tag() {
199		let key = SeriesRowKey {
200			series: SeriesId(42),
201			variant_tag: Some(3),
202			timestamp: 1706745600000,
203			sequence: 5,
204		};
205		let encoded = key.encode();
206		let decoded = SeriesRowKey::decode(&encoded).unwrap();
207		assert_eq!(decoded.series, SeriesId(42));
208		assert_eq!(decoded.variant_tag, Some(3));
209		assert_eq!(decoded.timestamp, 1706745600000);
210		assert_eq!(decoded.sequence, 5);
211	}
212
213	#[test]
214	fn test_ordering_by_timestamp() {
215		let key1 = SeriesRowKey {
216			series: SeriesId(1),
217			variant_tag: None,
218			timestamp: 100,
219			sequence: 0,
220		};
221		let key2 = SeriesRowKey {
222			series: SeriesId(1),
223			variant_tag: None,
224			timestamp: 200,
225			sequence: 0,
226		};
227		let e1 = key1.encode();
228		let e2 = key2.encode();
229		// Keycode encoding uses NOT of big-endian, producing descending order
230		// Earlier timestamps (smaller values) sort AFTER later timestamps
231		assert!(e1 > e2, "timestamp descending ordering not preserved");
232	}
233
234	#[test]
235	fn test_ordering_by_sequence() {
236		let key1 = SeriesRowKey {
237			series: SeriesId(1),
238			variant_tag: None,
239			timestamp: 100,
240			sequence: 1,
241		};
242		let key2 = SeriesRowKey {
243			series: SeriesId(1),
244			variant_tag: None,
245			timestamp: 100,
246			sequence: 2,
247		};
248		let e1 = key1.encode();
249		let e2 = key2.encode();
250		// Keycode encoding uses NOT of big-endian, producing descending order
251		assert!(e1 > e2, "sequence descending ordering not preserved");
252	}
253}