reifydb_core/key/
index_entry.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::Bound;
5
6use super::{EncodableKey, EncodableKeyRange, KeyKind};
7use crate::{
8	EncodedKey, EncodedKeyRange,
9	interface::catalog::{IndexId, SourceId},
10	util::{
11		CowVec,
12		encoding::keycode::{KeyDeserializer, KeySerializer},
13	},
14	value::index::{EncodedIndexKey, EncodedIndexKeyRange},
15};
16
17const VERSION: u8 = 1;
18
19/// Key for storing actual index entries with the encoded index key data
20#[derive(Debug, Clone, PartialEq)]
21pub struct IndexEntryKey {
22	pub source: SourceId,
23	pub index: IndexId,
24	pub key: EncodedIndexKey,
25}
26
27impl IndexEntryKey {
28	pub fn new(source: impl Into<SourceId>, index: IndexId, key: EncodedIndexKey) -> Self {
29		let source = source.into();
30		Self {
31			source,
32			index,
33			key,
34		}
35	}
36}
37
38#[derive(Debug, Clone, PartialEq)]
39pub struct IndexEntryKeyRange {
40	pub source: SourceId,
41	pub index: IndexId,
42}
43
44impl IndexEntryKeyRange {
45	fn decode_key(key: &EncodedKey) -> Option<Self> {
46		let mut de = KeyDeserializer::from_bytes(key.as_slice());
47
48		let version = de.read_u8().ok()?;
49		if version != VERSION {
50			return None;
51		}
52
53		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
54		if kind != Self::KIND {
55			return None;
56		}
57
58		let source = de.read_source_id().ok()?;
59		let index = de.read_index_id().ok()?;
60
61		Some(IndexEntryKeyRange {
62			source,
63			index,
64		})
65	}
66}
67
68impl EncodableKeyRange for IndexEntryKeyRange {
69	const KIND: KeyKind = KeyKind::IndexEntry;
70
71	fn start(&self) -> Option<EncodedKey> {
72		let mut serializer = KeySerializer::with_capacity(20); // 1 + 1 + 9 + 9
73		serializer
74			.extend_u8(VERSION)
75			.extend_u8(Self::KIND as u8)
76			.extend_source_id(self.source)
77			.extend_index_id(self.index);
78		Some(serializer.to_encoded_key())
79	}
80
81	fn end(&self) -> Option<EncodedKey> {
82		let mut serializer = KeySerializer::with_capacity(20);
83		serializer
84			.extend_u8(VERSION)
85			.extend_u8(Self::KIND as u8)
86			.extend_source_id(self.source)
87			.extend_index_id(self.index.prev());
88		Some(serializer.to_encoded_key())
89	}
90
91	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
92	where
93		Self: Sized,
94	{
95		let start_key = match &range.start {
96			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
97			Bound::Unbounded => None,
98		};
99
100		let end_key = match &range.end {
101			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
102			Bound::Unbounded => None,
103		};
104
105		(start_key, end_key)
106	}
107}
108
109impl EncodableKey for IndexEntryKey {
110	const KIND: KeyKind = KeyKind::IndexEntry;
111
112	fn encode(&self) -> EncodedKey {
113		let mut serializer = KeySerializer::with_capacity(20 + self.key.len());
114		serializer
115			.extend_u8(VERSION)
116			.extend_u8(Self::KIND as u8)
117			.extend_source_id(self.source)
118			.extend_index_id(self.index)
119			// Append the raw index key bytes
120			.extend_raw(self.key.as_slice());
121		serializer.to_encoded_key()
122	}
123
124	fn decode(key: &EncodedKey) -> Option<Self> {
125		let mut de = KeyDeserializer::from_bytes(key.as_slice());
126
127		let version = de.read_u8().ok()?;
128		if version != VERSION {
129			return None;
130		}
131
132		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
133		if kind != Self::KIND {
134			return None;
135		}
136
137		let source = de.read_source_id().ok()?;
138		let index = de.read_index_id().ok()?;
139
140		// The remaining bytes are the index key
141		let remaining = de.remaining();
142		if remaining > 0 {
143			let remaining_bytes = de.read_raw(remaining).ok()?;
144			let index_key = EncodedIndexKey(CowVec::new(remaining_bytes.to_vec()));
145			Some(Self {
146				source,
147				index,
148				key: index_key,
149			})
150		} else {
151			None
152		}
153	}
154}
155
156impl IndexEntryKey {
157	/// Create a range for scanning all entries of a specific index
158	pub fn index_range(source: impl Into<SourceId>, index: IndexId) -> EncodedKeyRange {
159		let range = IndexEntryKeyRange {
160			source: source.into(),
161			index,
162		};
163		EncodedKeyRange::new(Bound::Included(range.start().unwrap()), Bound::Excluded(range.end().unwrap()))
164	}
165
166	/// Create a range for scanning all entries of a source (all indexes)
167	pub fn source_range(source: impl Into<SourceId>) -> EncodedKeyRange {
168		let source = source.into();
169		let mut start_serializer = KeySerializer::with_capacity(11);
170		start_serializer.extend_u8(VERSION).extend_u8(KeyKind::IndexEntry as u8).extend_source_id(source);
171
172		let next_source = source.next();
173		let mut end_serializer = KeySerializer::with_capacity(11);
174		end_serializer.extend_u8(VERSION).extend_u8(KeyKind::IndexEntry as u8).extend_source_id(next_source);
175
176		EncodedKeyRange {
177			start: Bound::Included(start_serializer.to_encoded_key()),
178			end: Bound::Excluded(end_serializer.to_encoded_key()),
179		}
180	}
181
182	/// Create a range for scanning entries within an index with a specific
183	/// key prefix
184	pub fn key_prefix_range(source: impl Into<SourceId>, index: IndexId, key_prefix: &[u8]) -> EncodedKeyRange {
185		let source = source.into();
186		let mut serializer = KeySerializer::with_capacity(20 + key_prefix.len());
187		serializer
188			.extend_u8(VERSION)
189			.extend_u8(KeyKind::IndexEntry as u8)
190			.extend_source_id(source)
191			.extend_index_id(index)
192			.extend_raw(key_prefix);
193		let start = serializer.to_encoded_key();
194
195		// For the end key, append 0xFF to get all keys with this prefix
196		let mut end = start.as_slice().to_vec();
197		end.push(0xFF);
198
199		EncodedKeyRange {
200			start: Bound::Included(start),
201			end: Bound::Excluded(EncodedKey::new(end)),
202		}
203	}
204
205	/// Create a range for entries from an EncodedIndexKeyRange
206	/// This method leverages the EncodedIndexKeyRange type for cleaner
207	/// range handling.
208	pub fn key_range(
209		source: impl Into<SourceId>,
210		index: IndexId,
211		index_range: EncodedIndexKeyRange,
212	) -> EncodedKeyRange {
213		let source = source.into();
214		// Build the prefix for this source and index
215		let mut prefix_serializer = KeySerializer::with_capacity(20);
216		prefix_serializer
217			.extend_u8(VERSION)
218			.extend_u8(KeyKind::IndexEntry as u8)
219			.extend_source_id(source)
220			.extend_index_id(index);
221		let prefix = prefix_serializer.to_encoded_key().to_vec();
222
223		// Convert bounds to include the prefix
224		let start = match index_range.start {
225			Bound::Included(key) => {
226				let mut bytes = prefix.clone();
227				bytes.extend_from_slice(key.as_slice());
228				Bound::Included(EncodedKey::new(bytes))
229			}
230			Bound::Excluded(key) => {
231				let mut bytes = prefix.clone();
232				bytes.extend_from_slice(key.as_slice());
233				Bound::Excluded(EncodedKey::new(bytes))
234			}
235			Bound::Unbounded => {
236				// Start from the beginning of this index
237				Bound::Included(EncodedKey::new(prefix.clone()))
238			}
239		};
240
241		let end = match index_range.end {
242			Bound::Included(key) => {
243				let mut bytes = prefix.clone();
244				bytes.extend_from_slice(key.as_slice());
245				Bound::Included(EncodedKey::new(bytes))
246			}
247			Bound::Excluded(key) => {
248				let mut bytes = prefix.clone();
249				bytes.extend_from_slice(key.as_slice());
250				Bound::Excluded(EncodedKey::new(bytes))
251			}
252			Bound::Unbounded => {
253				// End at the beginning of the next index
254				let mut serializer = KeySerializer::with_capacity(20);
255				serializer
256					.extend_u8(VERSION)
257					.extend_u8(KeyKind::IndexEntry as u8)
258					.extend_source_id(source)
259					// Use prev() for end bound in descending order
260					.extend_index_id(index.prev());
261				Bound::Excluded(serializer.to_encoded_key())
262			}
263		};
264
265		EncodedKeyRange {
266			start,
267			end,
268		}
269	}
270}
271
272#[cfg(test)]
273mod tests {
274	use reifydb_type::Type;
275
276	use super::*;
277	use crate::{SortDirection, value::index::EncodedIndexLayout};
278
279	#[test]
280	fn test_encode_decode() {
281		// Create a simple index key
282		let layout = EncodedIndexLayout::new(
283			&[Type::Uint8, Type::RowNumber],
284			&[SortDirection::Asc, SortDirection::Asc],
285		)
286		.unwrap();
287
288		let mut index_key = layout.allocate_key();
289		layout.set_u64(&mut index_key, 0, 100u64);
290		layout.set_row_number(&mut index_key, 1, 1u64);
291
292		let entry = IndexEntryKey {
293			source: SourceId::table(42),
294			index: IndexId::primary(7),
295			key: index_key.clone(),
296		};
297
298		let encoded = entry.encode();
299		let decoded = IndexEntryKey::decode(&encoded).unwrap();
300
301		assert_eq!(decoded.source, SourceId::table(42));
302		assert_eq!(decoded.index, IndexId::primary(7));
303		assert_eq!(decoded.key.as_slice(), index_key.as_slice());
304	}
305
306	#[test]
307	fn test_ordering() {
308		let layout = EncodedIndexLayout::new(&[Type::Uint8], &[SortDirection::Asc]).unwrap();
309
310		let mut key1 = layout.allocate_key();
311		layout.set_u64(&mut key1, 0, 100u64);
312
313		let mut key2 = layout.allocate_key();
314		layout.set_u64(&mut key2, 0, 200u64);
315
316		// Same source and index, different keys
317		let entry1 = IndexEntryKey {
318			source: SourceId::table(1),
319			index: IndexId::primary(1),
320			key: key1,
321		};
322
323		let entry2 = IndexEntryKey {
324			source: SourceId::table(1),
325			index: IndexId::primary(1),
326			key: key2,
327		};
328
329		let encoded1 = entry1.encode();
330		let encoded2 = entry2.encode();
331
332		// entry1 should come before entry2 because 100 < 200
333		assert!(encoded1.as_slice() < encoded2.as_slice());
334	}
335
336	#[test]
337	fn test_index_range() {
338		let range = IndexEntryKey::index_range(SourceId::table(10), IndexId::primary(5));
339
340		// Create entries that should be included
341		let layout = EncodedIndexLayout::new(&[Type::Uint8], &[SortDirection::Asc]).unwrap();
342
343		let mut key = layout.allocate_key();
344		layout.set_u64(&mut key, 0, 50u64);
345
346		let entry = IndexEntryKey {
347			source: SourceId::table(10),
348			index: IndexId::primary(5),
349			key,
350		};
351
352		let encoded = entry.encode();
353
354		// Check that the entry falls within the range
355		if let (Bound::Included(start), Bound::Excluded(end)) = (&range.start, &range.end) {
356			assert!(encoded.as_slice() >= start.as_slice());
357			assert!(encoded.as_slice() < end.as_slice());
358		} else {
359			panic!("Expected Included/Excluded bounds");
360		}
361
362		// Entry with different index should not be in range
363		// Note: Due to keycode encoding, IndexId(6) will have a smaller
364		// encoded value than IndexId(5) since keycode inverts bits
365		// (larger numbers become smaller byte sequences)
366		let entry2 = IndexEntryKey {
367			source: SourceId::table(10),
368			index: IndexId::primary(6),
369			key: layout.allocate_key(),
370		};
371
372		let encoded2 = entry2.encode();
373		// The entry with IndexId(6) should not be within the range for
374		// IndexId(5)
375		if let (Bound::Included(start), Bound::Excluded(end)) = (&range.start, &range.end) {
376			// encoded2 should either be < start or >= end
377			assert!(encoded2.as_slice() < start.as_slice() || encoded2.as_slice() >= end.as_slice());
378		}
379	}
380
381	#[test]
382	fn test_key_prefix_range() {
383		let layout = EncodedIndexLayout::new(
384			&[Type::Uint8, Type::RowNumber],
385			&[SortDirection::Asc, SortDirection::Asc],
386		)
387		.unwrap();
388
389		let mut key = layout.allocate_key();
390		layout.set_u64(&mut key, 0, 100u64);
391		layout.set_row_number(&mut key, 1, 0u64); // Set to 0 to get the minimal key with this prefix
392
393		// Use the full encoded key up to the first field as the prefix
394		let prefix = &key.as_slice()[..layout.fields[1].offset]; // Include bitvec and first field
395		let range = IndexEntryKey::key_prefix_range(SourceId::table(1), IndexId::primary(1), prefix);
396
397		// Now create a full key with the same prefix
398		layout.set_row_number(&mut key, 1, 999u64);
399		let entry = IndexEntryKey {
400			source: SourceId::table(1),
401			index: IndexId::primary(1),
402			key: key.clone(),
403		};
404
405		let encoded = entry.encode();
406
407		// Should be within range
408		if let (Bound::Included(start), Bound::Excluded(end)) = (&range.start, &range.end) {
409			assert!(encoded.as_slice() >= start.as_slice());
410			assert!(encoded.as_slice() < end.as_slice());
411		}
412
413		// Create a key with different prefix
414		let mut key2 = layout.allocate_key();
415		layout.set_u64(&mut key2, 0, 200u64); // Different first field
416		layout.set_row_number(&mut key2, 1, 1u64);
417
418		let entry2 = IndexEntryKey {
419			source: SourceId::table(1),
420			index: IndexId::primary(1),
421			key: key2,
422		};
423
424		let encoded2 = entry2.encode();
425
426		// Should not be in range
427		if let Bound::Excluded(end) = &range.end {
428			assert!(encoded2.as_slice() >= end.as_slice());
429		}
430	}
431}