reifydb_core/key/
index_entry.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::Bound;
5
6use super::{EncodableKey, EncodableKeyRange, KeyKind};
7use crate::{
8	EncodedKey, EncodedKeyRange,
9	interface::catalog::{IndexId, SourceId},
10	util::{
11		CowVec,
12		encoding::keycode::{KeyDeserializer, KeySerializer},
13	},
14	value::index::{EncodedIndexKey, EncodedIndexKeyRange},
15};
16
17const VERSION: u8 = 1;
18
19/// Key for storing actual index entries with the encoded index key data
20#[derive(Debug, Clone, PartialEq)]
21pub struct IndexEntryKey {
22	pub source: SourceId,
23	pub index: IndexId,
24	pub key: EncodedIndexKey,
25}
26
27impl IndexEntryKey {
28	pub fn new(source: impl Into<SourceId>, index: IndexId, key: EncodedIndexKey) -> Self {
29		Self {
30			source: source.into(),
31			index,
32			key,
33		}
34	}
35
36	pub fn encoded(source: impl Into<SourceId>, index: IndexId, key: EncodedIndexKey) -> EncodedKey {
37		Self::new(source, index, key).encode()
38	}
39}
40
41#[derive(Debug, Clone, PartialEq)]
42pub struct IndexEntryKeyRange {
43	pub source: SourceId,
44	pub index: IndexId,
45}
46
47impl IndexEntryKeyRange {
48	fn decode_key(key: &EncodedKey) -> Option<Self> {
49		let mut de = KeyDeserializer::from_bytes(key.as_slice());
50
51		let version = de.read_u8().ok()?;
52		if version != VERSION {
53			return None;
54		}
55
56		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
57		if kind != Self::KIND {
58			return None;
59		}
60
61		let source = de.read_source_id().ok()?;
62		let index = de.read_index_id().ok()?;
63
64		Some(IndexEntryKeyRange {
65			source,
66			index,
67		})
68	}
69}
70
71impl EncodableKeyRange for IndexEntryKeyRange {
72	const KIND: KeyKind = KeyKind::IndexEntry;
73
74	fn start(&self) -> Option<EncodedKey> {
75		let mut serializer = KeySerializer::with_capacity(20); // 1 + 1 + 9 + 9
76		serializer
77			.extend_u8(VERSION)
78			.extend_u8(Self::KIND as u8)
79			.extend_source_id(self.source)
80			.extend_index_id(self.index);
81		Some(serializer.to_encoded_key())
82	}
83
84	fn end(&self) -> Option<EncodedKey> {
85		let mut serializer = KeySerializer::with_capacity(20);
86		serializer
87			.extend_u8(VERSION)
88			.extend_u8(Self::KIND as u8)
89			.extend_source_id(self.source)
90			.extend_index_id(self.index.prev());
91		Some(serializer.to_encoded_key())
92	}
93
94	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
95	where
96		Self: Sized,
97	{
98		let start_key = match &range.start {
99			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
100			Bound::Unbounded => None,
101		};
102
103		let end_key = match &range.end {
104			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
105			Bound::Unbounded => None,
106		};
107
108		(start_key, end_key)
109	}
110}
111
112impl EncodableKey for IndexEntryKey {
113	const KIND: KeyKind = KeyKind::IndexEntry;
114
115	fn encode(&self) -> EncodedKey {
116		let mut serializer = KeySerializer::with_capacity(20 + self.key.len());
117		serializer
118			.extend_u8(VERSION)
119			.extend_u8(Self::KIND as u8)
120			.extend_source_id(self.source)
121			.extend_index_id(self.index)
122			// Append the raw index key bytes
123			.extend_raw(self.key.as_slice());
124		serializer.to_encoded_key()
125	}
126
127	fn decode(key: &EncodedKey) -> Option<Self> {
128		let mut de = KeyDeserializer::from_bytes(key.as_slice());
129
130		let version = de.read_u8().ok()?;
131		if version != VERSION {
132			return None;
133		}
134
135		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
136		if kind != Self::KIND {
137			return None;
138		}
139
140		let source = de.read_source_id().ok()?;
141		let index = de.read_index_id().ok()?;
142
143		// The remaining bytes are the index key
144		let remaining = de.remaining();
145		if remaining > 0 {
146			let remaining_bytes = de.read_raw(remaining).ok()?;
147			let index_key = EncodedIndexKey(CowVec::new(remaining_bytes.to_vec()));
148			Some(Self {
149				source,
150				index,
151				key: index_key,
152			})
153		} else {
154			None
155		}
156	}
157}
158
159impl IndexEntryKey {
160	/// Create a range for scanning all entries of a specific index
161	pub fn index_range(source: impl Into<SourceId>, index: IndexId) -> EncodedKeyRange {
162		let range = IndexEntryKeyRange {
163			source: source.into(),
164			index,
165		};
166		EncodedKeyRange::new(Bound::Included(range.start().unwrap()), Bound::Excluded(range.end().unwrap()))
167	}
168
169	/// Create a range for scanning all entries of a source (all indexes)
170	pub fn source_range(source: impl Into<SourceId>) -> EncodedKeyRange {
171		let source = source.into();
172		let mut start_serializer = KeySerializer::with_capacity(11);
173		start_serializer.extend_u8(VERSION).extend_u8(KeyKind::IndexEntry as u8).extend_source_id(source);
174
175		let next_source = source.next();
176		let mut end_serializer = KeySerializer::with_capacity(11);
177		end_serializer.extend_u8(VERSION).extend_u8(KeyKind::IndexEntry as u8).extend_source_id(next_source);
178
179		EncodedKeyRange {
180			start: Bound::Included(start_serializer.to_encoded_key()),
181			end: Bound::Excluded(end_serializer.to_encoded_key()),
182		}
183	}
184
185	/// Create a range for scanning entries within an index with a specific
186	/// key prefix
187	pub fn key_prefix_range(source: impl Into<SourceId>, index: IndexId, key_prefix: &[u8]) -> EncodedKeyRange {
188		let source = source.into();
189		let mut serializer = KeySerializer::with_capacity(20 + key_prefix.len());
190		serializer
191			.extend_u8(VERSION)
192			.extend_u8(KeyKind::IndexEntry as u8)
193			.extend_source_id(source)
194			.extend_index_id(index)
195			.extend_raw(key_prefix);
196		let start = serializer.to_encoded_key();
197
198		// For the end key, append 0xFF to get all keys with this prefix
199		let mut end = start.as_slice().to_vec();
200		end.push(0xFF);
201
202		EncodedKeyRange {
203			start: Bound::Included(start),
204			end: Bound::Excluded(EncodedKey::new(end)),
205		}
206	}
207
208	/// Create a range for entries from an EncodedIndexKeyRange
209	/// This method leverages the EncodedIndexKeyRange type for cleaner
210	/// range handling.
211	pub fn key_range(
212		source: impl Into<SourceId>,
213		index: IndexId,
214		index_range: EncodedIndexKeyRange,
215	) -> EncodedKeyRange {
216		let source = source.into();
217		// Build the prefix for this source and index
218		let mut prefix_serializer = KeySerializer::with_capacity(20);
219		prefix_serializer
220			.extend_u8(VERSION)
221			.extend_u8(KeyKind::IndexEntry as u8)
222			.extend_source_id(source)
223			.extend_index_id(index);
224		let prefix = prefix_serializer.to_encoded_key().to_vec();
225
226		// Convert bounds to include the prefix
227		let start = match index_range.start {
228			Bound::Included(key) => {
229				let mut bytes = prefix.clone();
230				bytes.extend_from_slice(key.as_slice());
231				Bound::Included(EncodedKey::new(bytes))
232			}
233			Bound::Excluded(key) => {
234				let mut bytes = prefix.clone();
235				bytes.extend_from_slice(key.as_slice());
236				Bound::Excluded(EncodedKey::new(bytes))
237			}
238			Bound::Unbounded => {
239				// Start from the beginning of this index
240				Bound::Included(EncodedKey::new(prefix.clone()))
241			}
242		};
243
244		let end = match index_range.end {
245			Bound::Included(key) => {
246				let mut bytes = prefix.clone();
247				bytes.extend_from_slice(key.as_slice());
248				Bound::Included(EncodedKey::new(bytes))
249			}
250			Bound::Excluded(key) => {
251				let mut bytes = prefix.clone();
252				bytes.extend_from_slice(key.as_slice());
253				Bound::Excluded(EncodedKey::new(bytes))
254			}
255			Bound::Unbounded => {
256				// End at the beginning of the next index
257				let mut serializer = KeySerializer::with_capacity(20);
258				serializer
259					.extend_u8(VERSION)
260					.extend_u8(KeyKind::IndexEntry as u8)
261					.extend_source_id(source)
262					// Use prev() for end bound in descending order
263					.extend_index_id(index.prev());
264				Bound::Excluded(serializer.to_encoded_key())
265			}
266		};
267
268		EncodedKeyRange {
269			start,
270			end,
271		}
272	}
273}
274
275#[cfg(test)]
276mod tests {
277	use reifydb_type::Type;
278
279	use super::*;
280	use crate::{SortDirection, value::index::EncodedIndexLayout};
281
282	#[test]
283	fn test_encode_decode() {
284		// Create a simple index key
285		let layout =
286			EncodedIndexLayout::new(&[Type::Uint8, Type::Uint8], &[SortDirection::Asc, SortDirection::Asc])
287				.unwrap();
288
289		let mut index_key = layout.allocate_key();
290		layout.set_u64(&mut index_key, 0, 100u64);
291		layout.set_row_number(&mut index_key, 1, 1u64);
292
293		let entry = IndexEntryKey {
294			source: SourceId::table(42),
295			index: IndexId::primary(7),
296			key: index_key.clone(),
297		};
298
299		let encoded = entry.encode();
300		let decoded = IndexEntryKey::decode(&encoded).unwrap();
301
302		assert_eq!(decoded.source, SourceId::table(42));
303		assert_eq!(decoded.index, IndexId::primary(7));
304		assert_eq!(decoded.key.as_slice(), index_key.as_slice());
305	}
306
307	#[test]
308	fn test_ordering() {
309		let layout = EncodedIndexLayout::new(&[Type::Uint8], &[SortDirection::Asc]).unwrap();
310
311		let mut key1 = layout.allocate_key();
312		layout.set_u64(&mut key1, 0, 100u64);
313
314		let mut key2 = layout.allocate_key();
315		layout.set_u64(&mut key2, 0, 200u64);
316
317		// Same source and index, different keys
318		let entry1 = IndexEntryKey {
319			source: SourceId::table(1),
320			index: IndexId::primary(1),
321			key: key1,
322		};
323
324		let entry2 = IndexEntryKey {
325			source: SourceId::table(1),
326			index: IndexId::primary(1),
327			key: key2,
328		};
329
330		let encoded1 = entry1.encode();
331		let encoded2 = entry2.encode();
332
333		// entry1 should come before entry2 because 100 < 200
334		assert!(encoded1.as_slice() < encoded2.as_slice());
335	}
336
337	#[test]
338	fn test_index_range() {
339		let range = IndexEntryKey::index_range(SourceId::table(10), IndexId::primary(5));
340
341		// Create entries that should be included
342		let layout = EncodedIndexLayout::new(&[Type::Uint8], &[SortDirection::Asc]).unwrap();
343
344		let mut key = layout.allocate_key();
345		layout.set_u64(&mut key, 0, 50u64);
346
347		let entry = IndexEntryKey {
348			source: SourceId::table(10),
349			index: IndexId::primary(5),
350			key,
351		};
352
353		let encoded = entry.encode();
354
355		// Check that the entry falls within the range
356		if let (Bound::Included(start), Bound::Excluded(end)) = (&range.start, &range.end) {
357			assert!(encoded.as_slice() >= start.as_slice());
358			assert!(encoded.as_slice() < end.as_slice());
359		} else {
360			panic!("Expected Included/Excluded bounds");
361		}
362
363		// Entry with different index should not be in range
364		// Note: Due to keycode encoding, IndexId(6) will have a smaller
365		// encoded value than IndexId(5) since keycode inverts bits
366		// (larger numbers become smaller byte sequences)
367		let entry2 = IndexEntryKey {
368			source: SourceId::table(10),
369			index: IndexId::primary(6),
370			key: layout.allocate_key(),
371		};
372
373		let encoded2 = entry2.encode();
374		// The entry with IndexId(6) should not be within the range for
375		// IndexId(5)
376		if let (Bound::Included(start), Bound::Excluded(end)) = (&range.start, &range.end) {
377			// encoded2 should either be < start or >= end
378			assert!(encoded2.as_slice() < start.as_slice() || encoded2.as_slice() >= end.as_slice());
379		}
380	}
381
382	#[test]
383	fn test_key_prefix_range() {
384		let layout =
385			EncodedIndexLayout::new(&[Type::Uint8, Type::Uint8], &[SortDirection::Asc, SortDirection::Asc])
386				.unwrap();
387
388		let mut key = layout.allocate_key();
389		layout.set_u64(&mut key, 0, 100u64);
390		layout.set_row_number(&mut key, 1, 0u64); // Set to 0 to get the minimal key with this prefix
391
392		// Use the full encoded key up to the first field as the prefix
393		let prefix = &key.as_slice()[..layout.fields[1].offset]; // Include bitvec and first field
394		let range = IndexEntryKey::key_prefix_range(SourceId::table(1), IndexId::primary(1), prefix);
395
396		// Now create a full key with the same prefix
397		layout.set_row_number(&mut key, 1, 999u64);
398		let entry = IndexEntryKey {
399			source: SourceId::table(1),
400			index: IndexId::primary(1),
401			key: key.clone(),
402		};
403
404		let encoded = entry.encode();
405
406		// Should be within range
407		if let (Bound::Included(start), Bound::Excluded(end)) = (&range.start, &range.end) {
408			assert!(encoded.as_slice() >= start.as_slice());
409			assert!(encoded.as_slice() < end.as_slice());
410		}
411
412		// Create a key with different prefix
413		let mut key2 = layout.allocate_key();
414		layout.set_u64(&mut key2, 0, 200u64); // Different first field
415		layout.set_row_number(&mut key2, 1, 1u64);
416
417		let entry2 = IndexEntryKey {
418			source: SourceId::table(1),
419			index: IndexId::primary(1),
420			key: key2,
421		};
422
423		let encoded2 = entry2.encode();
424
425		// Should not be in range
426		if let Bound::Excluded(end) = &range.end {
427			assert!(encoded2.as_slice() >= end.as_slice());
428		}
429	}
430}