reifydb_core/key/
row.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::Bound;
5
6use reifydb_type::RowNumber;
7
8use super::{EncodableKey, KeyKind};
9use crate::{
10	EncodedKey, EncodedKeyRange,
11	interface::{EncodableKeyRange, catalog::SourceId},
12	util::encoding::keycode::{KeyDeserializer, KeySerializer},
13};
14
15const VERSION: u8 = 1;
16
17#[derive(Debug, Clone, PartialEq)]
18pub struct RowKey {
19	pub source: SourceId,
20	pub row: RowNumber,
21}
22
23impl EncodableKey for RowKey {
24	const KIND: KeyKind = KeyKind::Row;
25
26	fn encode(&self) -> EncodedKey {
27		let mut serializer = KeySerializer::with_capacity(19); // 1 + 1 + 9 + 8
28		serializer
29			.extend_u8(VERSION)
30			.extend_u8(Self::KIND as u8)
31			.extend_source_id(self.source)
32			.extend_u64(self.row.0);
33		serializer.to_encoded_key()
34	}
35
36	fn decode(key: &EncodedKey) -> Option<Self> {
37		let mut de = KeyDeserializer::from_bytes(key.as_slice());
38
39		let version = de.read_u8().ok()?;
40		if version != VERSION {
41			return None;
42		}
43
44		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
45		if kind != Self::KIND {
46			return None;
47		}
48
49		let source = de.read_source_id().ok()?;
50		let row = de.read_row_number().ok()?;
51
52		Some(Self {
53			source,
54			row,
55		})
56	}
57}
58
59#[derive(Debug, Clone, PartialEq)]
60pub struct RowKeyRange {
61	pub source: SourceId,
62}
63
64impl RowKeyRange {
65	fn decode_key(key: &EncodedKey) -> Option<Self> {
66		let mut de = KeyDeserializer::from_bytes(key.as_slice());
67
68		let version = de.read_u8().ok()?;
69		if version != VERSION {
70			return None;
71		}
72
73		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
74		if kind != Self::KIND {
75			return None;
76		}
77
78		let source = de.read_source_id().ok()?;
79
80		Some(RowKeyRange {
81			source,
82		})
83	}
84
85	/// Create a range for scanning rows from a source
86	///
87	/// If `last_key` is provided, creates a range that continues from after that key.
88	/// Otherwise, creates a range that includes all rows for the source.
89	///
90	/// The caller is responsible for limiting the number of results returned.
91	pub fn scan_range(source: SourceId, last_key: Option<&EncodedKey>) -> EncodedKeyRange {
92		let range = RowKeyRange {
93			source,
94		};
95
96		if let Some(last_key) = last_key {
97			EncodedKeyRange::new(Bound::Excluded(last_key.clone()), Bound::Included(range.end().unwrap()))
98		} else {
99			EncodedKeyRange::new(
100				Bound::Included(range.start().unwrap()),
101				Bound::Included(range.end().unwrap()),
102			)
103		}
104	}
105}
106
107impl EncodableKeyRange for RowKeyRange {
108	const KIND: KeyKind = KeyKind::Row;
109
110	fn start(&self) -> Option<EncodedKey> {
111		let mut serializer = KeySerializer::with_capacity(11); // 1 + 1 + 9
112		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(self.source);
113		Some(serializer.to_encoded_key())
114	}
115
116	fn end(&self) -> Option<EncodedKey> {
117		let mut serializer = KeySerializer::with_capacity(11);
118		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(self.source.prev());
119		Some(serializer.to_encoded_key())
120	}
121
122	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
123	where
124		Self: Sized,
125	{
126		let start_key = match &range.start {
127			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
128			Bound::Unbounded => None,
129		};
130
131		let end_key = match &range.end {
132			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
133			Bound::Unbounded => None,
134		};
135
136		(start_key, end_key)
137	}
138}
139
140impl RowKey {
141	pub fn encoded(source: impl Into<SourceId>, row: impl Into<RowNumber>) -> EncodedKey {
142		Self {
143			source: source.into(),
144			row: row.into(),
145		}
146		.encode()
147	}
148
149	pub fn full_scan(source: impl Into<SourceId>) -> EncodedKeyRange {
150		let source = source.into();
151		EncodedKeyRange::start_end(Some(Self::source_start(source)), Some(Self::source_end(source)))
152	}
153
154	pub fn source_start(source: impl Into<SourceId>) -> EncodedKey {
155		let source = source.into();
156		let mut serializer = KeySerializer::with_capacity(11);
157		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(source);
158		serializer.to_encoded_key()
159	}
160
161	pub fn source_end(source: impl Into<SourceId>) -> EncodedKey {
162		let source = source.into();
163		let mut serializer = KeySerializer::with_capacity(11);
164		serializer.extend_u8(VERSION).extend_u8(Self::KIND as u8).extend_source_id(source.prev());
165		serializer.to_encoded_key()
166	}
167}
168
169#[cfg(test)]
170mod tests {
171	use reifydb_type::RowNumber;
172
173	use super::{EncodableKey, RowKey};
174	use crate::interface::catalog::SourceId;
175
176	#[test]
177	fn test_encode_decode() {
178		let key = RowKey {
179			source: SourceId::table(0xABCD),
180			row: RowNumber(0x123456789ABCDEF0),
181		};
182		let encoded = key.encode();
183
184		let expected: Vec<u8> = vec![
185			0xFE, // version
186			0xFC, // kind
187			0x01, // SourceId type discriminator (Table)
188			0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x54, 0x32, 0xED, 0xCB, 0xA9, 0x87, 0x65, 0x43, 0x21, 0x0F,
189		];
190
191		assert_eq!(encoded.as_slice(), expected);
192
193		let key = RowKey::decode(&encoded).unwrap();
194		assert_eq!(key.source, SourceId::table(0xABCD));
195		assert_eq!(key.row, 0x123456789ABCDEF0);
196	}
197
198	#[test]
199	fn test_order_preserving() {
200		let key1 = RowKey {
201			source: SourceId::table(1),
202			row: RowNumber(100),
203		};
204		let key2 = RowKey {
205			source: SourceId::table(1),
206			row: RowNumber(200),
207		};
208		let key3 = RowKey {
209			source: SourceId::table(2),
210			row: RowNumber(1),
211		};
212
213		let encoded1 = key1.encode();
214		let encoded2 = key2.encode();
215		let encoded3 = key3.encode();
216
217		assert!(encoded3 < encoded2, "ordering not preserved");
218		assert!(encoded2 < encoded1, "ordering not preserved");
219	}
220}