reifydb_core/key/
row.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::Bound;
5
6use reifydb_type::RowNumber;
7
8use super::{EncodableKey, KeyKind};
9use crate::{
10	EncodedKey, EncodedKeyRange,
11	interface::{EncodableKeyRange, catalog::SourceId},
12	util::encoding::keycode::{KeyDeserializer, KeySerializer},
13};
14
15const VERSION: u8 = 1;
16
17#[derive(Debug, Clone, PartialEq)]
18pub struct RowKey {
19	pub source: SourceId,
20	pub row: RowNumber,
21}
22
23impl EncodableKey for RowKey {
24	const KIND: KeyKind = KeyKind::Row;
25
26	fn encode(&self) -> EncodedKey {
27		let mut serializer = KeySerializer::with_capacity(19); // 1 + 1 + 9 + 8
28		serializer
29			.extend_u8(VERSION)
30			.extend_u8(Self::KIND as u8)
31			.extend_source_id(self.source)
32			.extend_u64(self.row.0);
33		serializer.to_encoded_key()
34	}
35
36	fn decode(key: &EncodedKey) -> Option<Self> {
37		let mut de = KeyDeserializer::from_bytes(key.as_slice());
38
39		let version = de.read_u8().ok()?;
40		if version != VERSION {
41			return None;
42		}
43
44		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
45		if kind != Self::KIND {
46			return None;
47		}
48
49		let source = de.read_source_id().ok()?;
50		let row = de.read_row_number().ok()?;
51
52		Some(Self {
53			source,
54			row,
55		})
56	}
57}
58
59#[derive(Debug, Clone, PartialEq)]
60pub struct RowKeyRange {
61	pub source: SourceId,
62}
63
64impl RowKeyRange {
65	fn decode_key(key: &EncodedKey) -> Option<Self> {
66		let mut de = KeyDeserializer::from_bytes(key.as_slice());
67
68		let version = de.read_u8().ok()?;
69		if version != VERSION {
70			return None;
71		}
72
73		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
74		if kind != Self::KIND {
75			return None;
76		}
77
78		let source = de.read_source_id().ok()?;
79
80		Some(RowKeyRange {
81			source,
82		})
83	}
84
85	/// Create a range for scanning rows from a source
86	///
87	/// If `last_key` is provided, creates a range that continues from after that key.
88	/// Otherwise, creates a range that includes all rows for the source.
89	///
90	/// The caller is responsible for limiting the number of results returned.
91	pub fn scan_range(source: SourceId, last_key: Option<&EncodedKey>) -> EncodedKeyRange {
92		let range = RowKeyRange {
93			source,
94		};
95
96		if let Some(last_key) = last_key {
97			EncodedKeyRange::new(Bound::Excluded(last_key.clone()), Bound::Included(range.end().unwrap()))
98		} else {
99			EncodedKeyRange::new(
100				Bound::Included(range.start().unwrap()),
101				Bound::Included(range.end().unwrap()),
102			)
103		}
104	}
105}
106
107impl EncodableKeyRange for RowKeyRange {
108	const KIND: KeyKind = KeyKind::Row;
109
110	fn start(&self) -> Option<EncodedKey> {
111		let mut serializer = KeySerializer::with_capacity(11); // 1 + 1 + 9
112		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(self.source);
113		Some(serializer.to_encoded_key())
114	}
115
116	fn end(&self) -> Option<EncodedKey> {
117		let mut serializer = KeySerializer::with_capacity(11);
118		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(self.source.prev());
119		Some(serializer.to_encoded_key())
120	}
121
122	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
123	where
124		Self: Sized,
125	{
126		let start_key = match &range.start {
127			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
128			Bound::Unbounded => None,
129		};
130
131		let end_key = match &range.end {
132			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
133			Bound::Unbounded => None,
134		};
135
136		(start_key, end_key)
137	}
138}
139
140impl RowKey {
141	pub fn full_scan(source: impl Into<SourceId>) -> EncodedKeyRange {
142		let source = source.into();
143		EncodedKeyRange::start_end(Some(Self::source_start(source)), Some(Self::source_end(source)))
144	}
145
146	pub fn source_start(source: impl Into<SourceId>) -> EncodedKey {
147		let source = source.into();
148		let mut serializer = KeySerializer::with_capacity(11);
149		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(source);
150		serializer.to_encoded_key()
151	}
152
153	pub fn source_end(source: impl Into<SourceId>) -> EncodedKey {
154		let source = source.into();
155		let mut serializer = KeySerializer::with_capacity(11);
156		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(source.prev());
157		serializer.to_encoded_key()
158	}
159}
160
161#[cfg(test)]
162mod tests {
163	use reifydb_type::RowNumber;
164
165	use super::{EncodableKey, RowKey};
166	use crate::interface::catalog::SourceId;
167
168	#[test]
169	fn test_encode_decode() {
170		let key = RowKey {
171			source: SourceId::table(0xABCD),
172			row: RowNumber(0x123456789ABCDEF0),
173		};
174		let encoded = key.encode();
175
176		let expected: Vec<u8> = vec![
177			0xFE, // version
178			0xFC, // kind
179			0x01, // SourceId type discriminator (Table)
180			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x54, 0x32, 0xED, 0xCB, 0xA9, 0x87, 0x65, 0x43, 0x21, 0x0F,
181		];
182
183		assert_eq!(encoded.as_slice(), expected);
184
185		let key = RowKey::decode(&encoded).unwrap();
186		assert_eq!(key.source, SourceId::table(0xABCD));
187		assert_eq!(key.row, 0x123456789ABCDEF0);
188	}
189
190	#[test]
191	fn test_order_preserving() {
192		let key1 = RowKey {
193			source: SourceId::table(1),
194			row: RowNumber(100),
195		};
196		let key2 = RowKey {
197			source: SourceId::table(1),
198			row: RowNumber(200),
199		};
200		let key3 = RowKey {
201			source: SourceId::table(2),
202			row: RowNumber(1),
203		};
204
205		let encoded1 = key1.encode();
206		let encoded2 = key2.encode();
207		let encoded3 = key3.encode();
208
209		assert!(encoded3 < encoded2, "ordering not preserved");
210		assert!(encoded2 < encoded1, "ordering not preserved");
211	}
212}