Skip to main content

reifydb_core/key/
index_entry.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::collections::Bound;
5
6use reifydb_type::util::cowvec::CowVec;
7
8use super::{EncodableKey, EncodableKeyRange, KeyKind};
9use crate::{
10	encoded::key::{EncodedKey, EncodedKeyRange},
11	interface::catalog::{id::IndexId, schema::SchemaId},
12	util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
13	value::index::{encoded::EncodedIndexKey, range::EncodedIndexKeyRange},
14};
15
16const VERSION: u8 = 1;
17
18/// Key for storing actual index entries with the encoded index key data
19#[derive(Debug, Clone, PartialEq)]
20pub struct IndexEntryKey {
21	pub object: SchemaId,
22	pub index: IndexId,
23	pub key: EncodedIndexKey,
24}
25
26impl IndexEntryKey {
27	pub fn new(object: impl Into<SchemaId>, index: IndexId, key: EncodedIndexKey) -> Self {
28		Self {
29			object: object.into(),
30			index,
31			key,
32		}
33	}
34
35	pub fn encoded(object: impl Into<SchemaId>, index: IndexId, key: EncodedIndexKey) -> EncodedKey {
36		Self::new(object, index, key).encode()
37	}
38}
39
40#[derive(Debug, Clone, PartialEq)]
41pub struct IndexEntryKeyRange {
42	pub object: SchemaId,
43	pub index: IndexId,
44}
45
46impl IndexEntryKeyRange {
47	fn decode_key(key: &EncodedKey) -> Option<Self> {
48		let mut de = KeyDeserializer::from_bytes(key.as_slice());
49
50		let version = de.read_u8().ok()?;
51		if version != VERSION {
52			return None;
53		}
54
55		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
56		if kind != Self::KIND {
57			return None;
58		}
59
60		let object = de.read_schema_id().ok()?;
61		let index = de.read_index_id().ok()?;
62
63		Some(IndexEntryKeyRange {
64			object,
65			index,
66		})
67	}
68}
69
70impl EncodableKeyRange for IndexEntryKeyRange {
71	const KIND: KeyKind = KeyKind::IndexEntry;
72
73	fn start(&self) -> Option<EncodedKey> {
74		let mut serializer = KeySerializer::with_capacity(20); // 1 + 1 + 9 + 9
75		serializer
76			.extend_u8(VERSION)
77			.extend_u8(Self::KIND as u8)
78			.extend_schema_id(self.object)
79			.extend_index_id(self.index);
80		Some(serializer.to_encoded_key())
81	}
82
83	fn end(&self) -> Option<EncodedKey> {
84		let mut serializer = KeySerializer::with_capacity(20);
85		serializer
86			.extend_u8(VERSION)
87			.extend_u8(Self::KIND as u8)
88			.extend_schema_id(self.object)
89			.extend_index_id(self.index.prev());
90		Some(serializer.to_encoded_key())
91	}
92
93	fn decode(range: &EncodedKeyRange) -> (Option<Self>, Option<Self>)
94	where
95		Self: Sized,
96	{
97		let start_key = match &range.start {
98			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
99			Bound::Unbounded => None,
100		};
101
102		let end_key = match &range.end {
103			Bound::Included(key) | Bound::Excluded(key) => Self::decode_key(key),
104			Bound::Unbounded => None,
105		};
106
107		(start_key, end_key)
108	}
109}
110
111impl EncodableKey for IndexEntryKey {
112	const KIND: KeyKind = KeyKind::IndexEntry;
113
114	fn encode(&self) -> EncodedKey {
115		let mut serializer = KeySerializer::with_capacity(20 + self.key.len());
116		serializer
117			.extend_u8(VERSION)
118			.extend_u8(Self::KIND as u8)
119			.extend_schema_id(self.object)
120			.extend_index_id(self.index)
121			// Append the raw index key bytes
122			.extend_raw(self.key.as_slice());
123		serializer.to_encoded_key()
124	}
125
126	fn decode(key: &EncodedKey) -> Option<Self> {
127		let mut de = KeyDeserializer::from_bytes(key.as_slice());
128
129		let version = de.read_u8().ok()?;
130		if version != VERSION {
131			return None;
132		}
133
134		let kind: KeyKind = de.read_u8().ok()?.try_into().ok()?;
135		if kind != Self::KIND {
136			return None;
137		}
138
139		let object = de.read_schema_id().ok()?;
140		let index = de.read_index_id().ok()?;
141
142		// The remaining bytes are the index key
143		let remaining = de.remaining();
144		if remaining > 0 {
145			let remaining_bytes = de.read_raw(remaining).ok()?;
146			let index_key = EncodedIndexKey(CowVec::new(remaining_bytes.to_vec()));
147			Some(Self {
148				object,
149				index,
150				key: index_key,
151			})
152		} else {
153			None
154		}
155	}
156}
157
158impl IndexEntryKey {
159	/// Create a range for scanning all entries of a specific index
160	pub fn index_range(object: impl Into<SchemaId>, index: IndexId) -> EncodedKeyRange {
161		let range = IndexEntryKeyRange {
162			object: object.into(),
163			index,
164		};
165		EncodedKeyRange::new(Bound::Included(range.start().unwrap()), Bound::Excluded(range.end().unwrap()))
166	}
167
168	/// Create a range for scanning all entries of a object (all indexes)
169	pub fn object_range(object: impl Into<SchemaId>) -> EncodedKeyRange {
170		let object = object.into();
171		let mut start_serializer = KeySerializer::with_capacity(11);
172		start_serializer.extend_u8(VERSION).extend_u8(KeyKind::IndexEntry as u8).extend_schema_id(object);
173
174		let next_primitive = object.next();
175		let mut end_serializer = KeySerializer::with_capacity(11);
176		end_serializer.extend_u8(VERSION).extend_u8(KeyKind::IndexEntry as u8).extend_schema_id(next_primitive);
177
178		EncodedKeyRange {
179			start: Bound::Included(start_serializer.to_encoded_key()),
180			end: Bound::Excluded(end_serializer.to_encoded_key()),
181		}
182	}
183
184	/// Create a range for scanning entries within an index with a specific
185	/// key prefix
186	pub fn key_prefix_range(object: impl Into<SchemaId>, index: IndexId, key_prefix: &[u8]) -> EncodedKeyRange {
187		let object = object.into();
188		let mut serializer = KeySerializer::with_capacity(20 + key_prefix.len());
189		serializer
190			.extend_u8(VERSION)
191			.extend_u8(KeyKind::IndexEntry as u8)
192			.extend_schema_id(object)
193			.extend_index_id(index)
194			.extend_raw(key_prefix);
195		let start = serializer.to_encoded_key();
196
197		// For the end key, append 0xFF to get all keys with this prefix
198		let mut end = start.as_slice().to_vec();
199		end.push(0xFF);
200
201		EncodedKeyRange {
202			start: Bound::Included(start),
203			end: Bound::Excluded(EncodedKey::new(end)),
204		}
205	}
206
207	/// Create a range for entries from an EncodedIndexKeyRange
208	/// This method leverages the EncodedIndexKeyRange type for cleaner
209	/// range handling.
210	pub fn key_range(
211		object: impl Into<SchemaId>,
212		index: IndexId,
213		index_range: EncodedIndexKeyRange,
214	) -> EncodedKeyRange {
215		let object = object.into();
216		// Build the prefix for this object and index
217		let mut prefix_serializer = KeySerializer::with_capacity(20);
218		prefix_serializer
219			.extend_u8(VERSION)
220			.extend_u8(KeyKind::IndexEntry as u8)
221			.extend_schema_id(object)
222			.extend_index_id(index);
223		let prefix = prefix_serializer.to_encoded_key().to_vec();
224
225		// Convert bounds to include the prefix
226		let start = match index_range.start {
227			Bound::Included(key) => {
228				let mut bytes = prefix.clone();
229				bytes.extend_from_slice(key.as_slice());
230				Bound::Included(EncodedKey::new(bytes))
231			}
232			Bound::Excluded(key) => {
233				let mut bytes = prefix.clone();
234				bytes.extend_from_slice(key.as_slice());
235				Bound::Excluded(EncodedKey::new(bytes))
236			}
237			Bound::Unbounded => {
238				// Start from the beginning of this index
239				Bound::Included(EncodedKey::new(prefix.clone()))
240			}
241		};
242
243		let end = match index_range.end {
244			Bound::Included(key) => {
245				let mut bytes = prefix.clone();
246				bytes.extend_from_slice(key.as_slice());
247				Bound::Included(EncodedKey::new(bytes))
248			}
249			Bound::Excluded(key) => {
250				let mut bytes = prefix.clone();
251				bytes.extend_from_slice(key.as_slice());
252				Bound::Excluded(EncodedKey::new(bytes))
253			}
254			Bound::Unbounded => {
255				// End at the beginning of the next index
256				let mut serializer = KeySerializer::with_capacity(20);
257				serializer
258					.extend_u8(VERSION)
259					.extend_u8(KeyKind::IndexEntry as u8)
260					.extend_schema_id(object)
261					// Use prev() for end bound in descending order
262					.extend_index_id(index.prev());
263				Bound::Excluded(serializer.to_encoded_key())
264			}
265		};
266
267		EncodedKeyRange {
268			start,
269			end,
270		}
271	}
272}
273
274#[cfg(test)]
275pub mod tests {
276	use reifydb_type::value::r#type::Type;
277
278	use super::*;
279	use crate::{sort::SortDirection, value::index::schema::IndexSchema};
280
281	#[test]
282	fn test_encode_decode() {
283		// Create a simple index key
284		let layout = IndexSchema::new(&[Type::Uint8, Type::Uint8], &[SortDirection::Asc, SortDirection::Asc])
285			.unwrap();
286
287		let mut index_key = layout.allocate_key();
288		layout.set_u64(&mut index_key, 0, 100u64);
289		layout.set_row_number(&mut index_key, 1, 1u64);
290
291		let entry = IndexEntryKey {
292			object: SchemaId::table(42),
293			index: IndexId::primary(7),
294			key: index_key.clone(),
295		};
296
297		let encoded = entry.encode();
298		let decoded = IndexEntryKey::decode(&encoded).unwrap();
299
300		assert_eq!(decoded.object, SchemaId::table(42));
301		assert_eq!(decoded.index, IndexId::primary(7));
302		assert_eq!(decoded.key.as_slice(), index_key.as_slice());
303	}
304
305	#[test]
306	fn test_ordering() {
307		let layout = IndexSchema::new(&[Type::Uint8], &[SortDirection::Asc]).unwrap();
308
309		let mut key1 = layout.allocate_key();
310		layout.set_u64(&mut key1, 0, 100u64);
311
312		let mut key2 = layout.allocate_key();
313		layout.set_u64(&mut key2, 0, 200u64);
314
315		// Same object and index, different keys
316		let entry1 = IndexEntryKey {
317			object: SchemaId::table(1),
318			index: IndexId::primary(1),
319			key: key1,
320		};
321
322		let entry2 = IndexEntryKey {
323			object: SchemaId::table(1),
324			index: IndexId::primary(1),
325			key: key2,
326		};
327
328		let encoded1 = entry1.encode();
329		let encoded2 = entry2.encode();
330
331		// entry1 should come before entry2 because 100 < 200
332		assert!(encoded1.as_slice() < encoded2.as_slice());
333	}
334
335	#[test]
336	fn test_index_range() {
337		let range = IndexEntryKey::index_range(SchemaId::table(10), IndexId::primary(5));
338
339		// Create entries that should be included
340		let layout = IndexSchema::new(&[Type::Uint8], &[SortDirection::Asc]).unwrap();
341
342		let mut key = layout.allocate_key();
343		layout.set_u64(&mut key, 0, 50u64);
344
345		let entry = IndexEntryKey {
346			object: SchemaId::table(10),
347			index: IndexId::primary(5),
348			key,
349		};
350
351		let encoded = entry.encode();
352
353		// Check that the entry falls within the range
354		if let (Bound::Included(start), Bound::Excluded(end)) = (&range.start, &range.end) {
355			assert!(encoded.as_slice() >= start.as_slice());
356			assert!(encoded.as_slice() < end.as_slice());
357		} else {
358			panic!("Expected Included/Excluded bounds");
359		}
360
361		// Entry with different index should not be in range
362		// Note: Due to keycode encoding, IndexId(6) will have a smaller
363		// encoded value than IndexId(5) since keycode inverts bits
364		// (larger numbers become smaller byte sequences)
365		let entry2 = IndexEntryKey {
366			object: SchemaId::table(10),
367			index: IndexId::primary(6),
368			key: layout.allocate_key(),
369		};
370
371		let encoded2 = entry2.encode();
372		// The entry with IndexId(6) should not be within the range for
373		// IndexId(5)
374		if let (Bound::Included(start), Bound::Excluded(end)) = (&range.start, &range.end) {
375			// encoded2 should either be < start or >= end
376			assert!(encoded2.as_slice() < start.as_slice() || encoded2.as_slice() >= end.as_slice());
377		}
378	}
379
380	#[test]
381	fn test_key_prefix_range() {
382		let layout = IndexSchema::new(&[Type::Uint8, Type::Uint8], &[SortDirection::Asc, SortDirection::Asc])
383			.unwrap();
384
385		let mut key = layout.allocate_key();
386		layout.set_u64(&mut key, 0, 100u64);
387		layout.set_row_number(&mut key, 1, 0u64); // Set to 0 to get the minimal key with this prefix
388
389		// Use the full encoded key up to the first field as the prefix
390		let prefix = &key.as_slice()[..layout.fields[1].offset]; // Include bitvec and first field
391		let range = IndexEntryKey::key_prefix_range(SchemaId::table(1), IndexId::primary(1), prefix);
392
393		// Now create a full key with the same prefix
394		layout.set_row_number(&mut key, 1, 999u64);
395		let entry = IndexEntryKey {
396			object: SchemaId::table(1),
397			index: IndexId::primary(1),
398			key: key.clone(),
399		};
400
401		let encoded = entry.encode();
402
403		// Should be within range
404		if let (Bound::Included(start), Bound::Excluded(end)) = (&range.start, &range.end) {
405			assert!(encoded.as_slice() >= start.as_slice());
406			assert!(encoded.as_slice() < end.as_slice());
407		}
408
409		// Create a key with different prefix
410		let mut key2 = layout.allocate_key();
411		layout.set_u64(&mut key2, 0, 200u64); // Different first field
412		layout.set_row_number(&mut key2, 1, 1u64);
413
414		let entry2 = IndexEntryKey {
415			object: SchemaId::table(1),
416			index: IndexId::primary(1),
417			key: key2,
418		};
419
420		let encoded2 = entry2.encode();
421
422		// Should not be in range
423		if let Bound::Excluded(end) = &range.end {
424			assert!(encoded2.as_slice() >= end.as_slice());
425		}
426	}
427}