Skip to main content

reifydb_core/value/index/
set.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::ptr;
5
6use reifydb_type::value::{
7	date::Date,
8	datetime::DateTime,
9	duration::Duration,
10	identity::IdentityId,
11	time::Time,
12	r#type::Type,
13	uuid::{Uuid4, Uuid7},
14};
15use uuid::Uuid;
16
17use crate::{
18	sort::SortDirection,
19	value::index::{encoded::EncodedIndexKey, layout::EncodedIndexLayout},
20};
21
22impl EncodedIndexLayout {
23	pub fn set_bool(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<bool>) {
24		let field = &self.fields[index];
25		debug_assert_eq!(field.value, Type::Boolean);
26		key.set_valid(index, true);
27
28		let byte_value = match field.direction {
29			SortDirection::Asc => {
30				if value.into() {
31					1u8
32				} else {
33					0u8
34				}
35			}
36			SortDirection::Desc => {
37				if value.into() {
38					0u8
39				} else {
40					1u8
41				}
42			}
43		};
44
45		unsafe { ptr::write_unaligned(key.make_mut().as_mut_ptr().add(field.offset), byte_value) }
46	}
47
48	pub fn set_f32(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<f32>) {
49		let field = &self.fields[index];
50		debug_assert_eq!(field.value, Type::Float4);
51		key.set_valid(index, true);
52
53		let v = value.into();
54		let mut bytes = v.to_bits().to_be_bytes();
55
56		// Apply ASC encoding first
57		if v.is_sign_negative() {
58			for b in bytes.iter_mut() {
59				*b = !*b;
60			}
61		} else {
62			bytes[0] ^= 0x80;
63		}
64
65		// For DESC, invert all bytes
66		if field.direction == SortDirection::Desc {
67			for b in bytes.iter_mut() {
68				*b = !*b;
69			}
70		}
71
72		unsafe {
73			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 4);
74		}
75	}
76
77	pub fn set_f64(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<f64>) {
78		let field = &self.fields[index];
79		debug_assert_eq!(field.value, Type::Float8);
80		key.set_valid(index, true);
81
82		let v = value.into();
83		let mut bytes = v.to_bits().to_be_bytes();
84
85		// Apply ASC encoding first
86		if v.is_sign_negative() {
87			for b in bytes.iter_mut() {
88				*b = !*b;
89			}
90		} else {
91			bytes[0] ^= 0x80;
92		}
93
94		// For DESC, invert all bytes
95		if field.direction == SortDirection::Desc {
96			for b in bytes.iter_mut() {
97				*b = !*b;
98			}
99		}
100
101		unsafe {
102			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 8);
103		}
104	}
105
106	pub fn set_i8(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<i8>) {
107		let field = &self.fields[index];
108		debug_assert_eq!(field.value, Type::Int1);
109		key.set_valid(index, true);
110
111		let mut bytes = value.into().to_be_bytes();
112
113		match field.direction {
114			SortDirection::Asc => {
115				bytes[0] ^= 0x80;
116			}
117			SortDirection::Desc => {
118				bytes[0] ^= 0x80;
119				for b in bytes.iter_mut() {
120					*b = !*b;
121				}
122			}
123		}
124
125		unsafe { ptr::write_unaligned(key.make_mut().as_mut_ptr().add(field.offset), bytes[0]) }
126	}
127
128	pub fn set_i16(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<i16>) {
129		let field = &self.fields[index];
130		debug_assert_eq!(field.value, Type::Int2);
131		key.set_valid(index, true);
132
133		let mut bytes = value.into().to_be_bytes();
134
135		match field.direction {
136			SortDirection::Asc => {
137				bytes[0] ^= 0x80;
138			}
139			SortDirection::Desc => {
140				bytes[0] ^= 0x80;
141				for b in bytes.iter_mut() {
142					*b = !*b;
143				}
144			}
145		}
146
147		unsafe {
148			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 2);
149		}
150	}
151
152	pub fn set_i32(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<i32>) {
153		let field = &self.fields[index];
154		debug_assert_eq!(field.value, Type::Int4);
155		key.set_valid(index, true);
156
157		let mut bytes = value.into().to_be_bytes();
158
159		match field.direction {
160			SortDirection::Asc => {
161				bytes[0] ^= 0x80;
162			}
163			SortDirection::Desc => {
164				bytes[0] ^= 0x80;
165				for b in bytes.iter_mut() {
166					*b = !*b;
167				}
168			}
169		}
170
171		unsafe {
172			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 4);
173		}
174	}
175
176	pub fn set_i64(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<i64>) {
177		let field = &self.fields[index];
178		debug_assert_eq!(field.value, Type::Int8);
179		key.set_valid(index, true);
180
181		let mut bytes = value.into().to_be_bytes();
182
183		match field.direction {
184			SortDirection::Asc => {
185				bytes[0] ^= 0x80;
186			}
187			SortDirection::Desc => {
188				bytes[0] ^= 0x80;
189				for b in bytes.iter_mut() {
190					*b = !*b;
191				}
192			}
193		}
194
195		unsafe {
196			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 8);
197		}
198	}
199
200	pub fn set_i128(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<i128>) {
201		let field = &self.fields[index];
202		debug_assert_eq!(field.value, Type::Int16);
203		key.set_valid(index, true);
204
205		let mut bytes = value.into().to_be_bytes();
206
207		match field.direction {
208			SortDirection::Asc => {
209				bytes[0] ^= 0x80;
210			}
211			SortDirection::Desc => {
212				bytes[0] ^= 0x80;
213				for b in bytes.iter_mut() {
214					*b = !*b;
215				}
216			}
217		}
218
219		unsafe {
220			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 16);
221		}
222	}
223
224	pub fn set_u8(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<u8>) {
225		let field = &self.fields[index];
226		debug_assert_eq!(field.value, Type::Uint1);
227		key.set_valid(index, true);
228
229		let byte = match field.direction {
230			SortDirection::Asc => value.into(),
231			SortDirection::Desc => !value.into(),
232		};
233
234		unsafe { ptr::write_unaligned(key.make_mut().as_mut_ptr().add(field.offset), byte) }
235	}
236
237	pub fn set_u16(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<u16>) {
238		let field = &self.fields[index];
239		debug_assert_eq!(field.value, Type::Uint2);
240		key.set_valid(index, true);
241
242		let bytes = match field.direction {
243			SortDirection::Asc => value.into().to_be_bytes(),
244			SortDirection::Desc => (!value.into()).to_be_bytes(),
245		};
246
247		unsafe {
248			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 2);
249		}
250	}
251
252	pub fn set_u32(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<u32>) {
253		let field = &self.fields[index];
254		debug_assert_eq!(field.value, Type::Uint4);
255		key.set_valid(index, true);
256
257		let bytes = match field.direction {
258			SortDirection::Asc => value.into().to_be_bytes(),
259			SortDirection::Desc => (!value.into()).to_be_bytes(),
260		};
261
262		unsafe {
263			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 4);
264		}
265	}
266
267	pub fn set_u64(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<u64>) {
268		let field = &self.fields[index];
269		debug_assert_eq!(field.value, Type::Uint8);
270		key.set_valid(index, true);
271
272		let bytes = match field.direction {
273			SortDirection::Asc => value.into().to_be_bytes(),
274			SortDirection::Desc => (!value.into()).to_be_bytes(),
275		};
276
277		unsafe {
278			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 8);
279		}
280	}
281
282	pub fn set_u128(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<u128>) {
283		let field = &self.fields[index];
284		debug_assert_eq!(field.value, Type::Uint16);
285		key.set_valid(index, true);
286
287		let bytes = match field.direction {
288			SortDirection::Asc => value.into().to_be_bytes(),
289			SortDirection::Desc => (!value.into()).to_be_bytes(),
290		};
291
292		unsafe {
293			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 16);
294		}
295	}
296
297	pub fn set_row_number(&self, key: &mut EncodedIndexKey, index: usize, value: impl Into<u64>) {
298		let field = &self.fields[index];
299		debug_assert_eq!(field.value, Type::Uint8);
300		key.set_valid(index, true);
301
302		let bytes = match field.direction {
303			SortDirection::Asc => value.into().to_be_bytes(),
304			SortDirection::Desc => (!value.into()).to_be_bytes(),
305		};
306
307		unsafe {
308			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 8);
309		}
310	}
311
312	pub fn set_date(&self, key: &mut EncodedIndexKey, index: usize, value: Date) {
313		let field = &self.fields[index];
314		debug_assert_eq!(field.value, Type::Date);
315		key.set_valid(index, true);
316
317		let days = value.to_days_since_epoch();
318		let mut bytes = days.to_be_bytes();
319
320		match field.direction {
321			SortDirection::Asc => {
322				bytes[0] ^= 0x80;
323			}
324			SortDirection::Desc => {
325				bytes[0] ^= 0x80;
326				for b in bytes.iter_mut() {
327					*b = !*b;
328				}
329			}
330		}
331
332		unsafe {
333			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 4);
334		}
335	}
336
337	pub fn set_datetime(&self, key: &mut EncodedIndexKey, index: usize, value: DateTime) {
338		let field = &self.fields[index];
339		debug_assert_eq!(field.value, Type::DateTime);
340		key.set_valid(index, true);
341
342		let (seconds, nanos) = value.to_parts();
343		let mut sec_bytes = seconds.to_be_bytes();
344		let mut nano_bytes = nanos.to_be_bytes();
345
346		match field.direction {
347			SortDirection::Asc => {
348				sec_bytes[0] ^= 0x80;
349			}
350			SortDirection::Desc => {
351				sec_bytes[0] ^= 0x80;
352				for b in sec_bytes.iter_mut() {
353					*b = !*b;
354				}
355				for b in nano_bytes.iter_mut() {
356					*b = !*b;
357				}
358			}
359		}
360
361		unsafe {
362			ptr::copy_nonoverlapping(sec_bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 8);
363			ptr::copy_nonoverlapping(
364				nano_bytes.as_ptr(),
365				key.make_mut().as_mut_ptr().add(field.offset + 8),
366				4,
367			);
368		}
369	}
370
371	pub fn set_time(&self, key: &mut EncodedIndexKey, index: usize, value: Time) {
372		let field = &self.fields[index];
373		debug_assert_eq!(field.value, Type::Time);
374		key.set_valid(index, true);
375
376		let nanos = value.to_nanos_since_midnight();
377		let bytes = match field.direction {
378			SortDirection::Asc => nanos.to_be_bytes(),
379			SortDirection::Desc => (!nanos).to_be_bytes(),
380		};
381
382		unsafe {
383			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 8);
384		}
385	}
386
387	pub fn set_duration(&self, key: &mut EncodedIndexKey, index: usize, value: Duration) {
388		let field = &self.fields[index];
389		debug_assert_eq!(field.value, Type::Duration);
390		key.set_valid(index, true);
391
392		let mut months_bytes = value.get_months().to_be_bytes();
393		let mut days_bytes = value.get_days().to_be_bytes();
394		let mut nanos_bytes = value.get_nanos().to_be_bytes();
395
396		match field.direction {
397			SortDirection::Asc => {
398				months_bytes[0] ^= 0x80;
399				days_bytes[0] ^= 0x80;
400				nanos_bytes[0] ^= 0x80;
401			}
402			SortDirection::Desc => {
403				months_bytes[0] ^= 0x80;
404				days_bytes[0] ^= 0x80;
405				nanos_bytes[0] ^= 0x80;
406				for b in months_bytes.iter_mut() {
407					*b = !*b;
408				}
409				for b in days_bytes.iter_mut() {
410					*b = !*b;
411				}
412				for b in nanos_bytes.iter_mut() {
413					*b = !*b;
414				}
415			}
416		}
417
418		unsafe {
419			ptr::copy_nonoverlapping(
420				months_bytes.as_ptr(),
421				key.make_mut().as_mut_ptr().add(field.offset),
422				4,
423			);
424			ptr::copy_nonoverlapping(
425				days_bytes.as_ptr(),
426				key.make_mut().as_mut_ptr().add(field.offset + 4),
427				4,
428			);
429			ptr::copy_nonoverlapping(
430				nanos_bytes.as_ptr(),
431				key.make_mut().as_mut_ptr().add(field.offset + 8),
432				8,
433			);
434		}
435	}
436
437	pub fn set_uuid4(&self, key: &mut EncodedIndexKey, index: usize, value: Uuid4) {
438		let field = &self.fields[index];
439		debug_assert_eq!(field.value, Type::Uuid4);
440		key.set_valid(index, true);
441
442		let uuid: Uuid = value.into();
443		let uuid_bytes = uuid.as_bytes();
444		let mut bytes = [0u8; 16];
445		bytes.copy_from_slice(uuid_bytes);
446
447		if field.direction == SortDirection::Desc {
448			for b in bytes.iter_mut() {
449				*b = !*b;
450			}
451		}
452
453		unsafe {
454			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 16);
455		}
456	}
457
458	pub fn set_uuid7(&self, key: &mut EncodedIndexKey, index: usize, value: Uuid7) {
459		let field = &self.fields[index];
460		debug_assert_eq!(field.value, Type::Uuid7);
461		key.set_valid(index, true);
462
463		let uuid: Uuid = value.into();
464		let uuid_bytes = uuid.as_bytes();
465		let mut bytes = [0u8; 16];
466		bytes.copy_from_slice(uuid_bytes);
467
468		if field.direction == SortDirection::Desc {
469			for b in bytes.iter_mut() {
470				*b = !*b;
471			}
472		}
473
474		unsafe {
475			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 16);
476		}
477	}
478
479	pub fn set_identity_id(&self, key: &mut EncodedIndexKey, index: usize, value: IdentityId) {
480		let field = &self.fields[index];
481		debug_assert_eq!(field.value, Type::IdentityId);
482		key.set_valid(index, true);
483
484		// Direct conversion from inner Uuid7 to Uuid
485		let uuid: Uuid = value.0.into();
486		let uuid_bytes = uuid.as_bytes();
487		let mut bytes = [0u8; 16];
488		bytes.copy_from_slice(uuid_bytes);
489
490		if field.direction == SortDirection::Desc {
491			for b in bytes.iter_mut() {
492				*b = !*b;
493			}
494		}
495
496		unsafe {
497			ptr::copy_nonoverlapping(bytes.as_ptr(), key.make_mut().as_mut_ptr().add(field.offset), 16);
498		}
499	}
500
501	pub fn set_undefined(&self, key: &mut EncodedIndexKey, index: usize) {
502		let field = &self.fields[index];
503		key.set_valid(index, false);
504
505		let buf = key.make_mut();
506		let start = field.offset;
507		let end = start + field.size;
508		buf[start..end].fill(0);
509	}
510}
511
512#[cfg(test)]
513pub mod tests {
514	use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
515
516	mod bool {
517		use reifydb_type::value::r#type::Type;
518
519		use super::*;
520
521		#[test]
522		fn test_asc() {
523			let layout = EncodedIndexLayout::new(&[Type::Boolean], &[SortDirection::Asc]).unwrap();
524			let mut key_false = layout.allocate_key();
525			let mut key_true = layout.allocate_key();
526
527			layout.set_bool(&mut key_false, 0, false);
528			layout.set_bool(&mut key_true, 0, true);
529
530			// Check bitvec shows field is set
531			assert_eq!(key_false[0] & 0x01, 0x01);
532			assert_eq!(key_true[0] & 0x01, 0x01);
533
534			// Check values at field offset (after bitvec)
535			let offset = layout.fields[0].offset;
536			assert_eq!(key_false[offset], 0);
537			assert_eq!(key_true[offset], 1);
538
539			// Verify ordering
540			assert!(key_false.as_slice() < key_true.as_slice());
541		}
542
543		#[test]
544		fn test_desc() {
545			let layout = EncodedIndexLayout::new(&[Type::Boolean], &[SortDirection::Desc]).unwrap();
546			let mut key_false = layout.allocate_key();
547			let mut key_true = layout.allocate_key();
548
549			layout.set_bool(&mut key_false, 0, false);
550			layout.set_bool(&mut key_true, 0, true);
551
552			// Check values at field offset (inverted for DESC)
553			let offset = layout.fields[0].offset;
554			assert_eq!(key_false[offset], 1); // false becomes 1 in DESC
555			assert_eq!(key_true[offset], 0); // true becomes 0 in DESC
556
557			// Verify ordering (reversed)
558			assert!(key_false.as_slice() > key_true.as_slice());
559		}
560	}
561
562	mod i8 {
563		use reifydb_type::value::r#type::Type;
564
565		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
566
567		#[test]
568		fn test_asc() {
569			let layout = EncodedIndexLayout::new(&[Type::Int1], &[SortDirection::Asc]).unwrap();
570			let mut key_neg = layout.allocate_key();
571			let mut key_zero = layout.allocate_key();
572			let mut key_pos = layout.allocate_key();
573
574			layout.set_i8(&mut key_neg, 0, -128i8);
575			layout.set_i8(&mut key_zero, 0, 0i8);
576			layout.set_i8(&mut key_pos, 0, 127i8);
577
578			let offset = layout.fields[0].offset;
579			// -128 with sign bit flipped: 0x80 -> 0x00
580			assert_eq!(key_neg[offset], 0x00);
581			// 0 with sign bit flipped: 0x00 -> 0x80
582			assert_eq!(key_zero[offset], 0x80);
583			// 127 with sign bit flipped: 0x7F -> 0xFF
584			assert_eq!(key_pos[offset], 0xFF);
585
586			// Verify ordering
587			assert!(key_neg.as_slice() < key_zero.as_slice());
588			assert!(key_zero.as_slice() < key_pos.as_slice());
589		}
590
591		#[test]
592		fn test_desc() {
593			let layout = EncodedIndexLayout::new(&[Type::Int1], &[SortDirection::Desc]).unwrap();
594			let mut key_neg = layout.allocate_key();
595			let mut key_zero = layout.allocate_key();
596			let mut key_pos = layout.allocate_key();
597
598			layout.set_i8(&mut key_neg, 0, -128i8);
599			layout.set_i8(&mut key_zero, 0, 0i8);
600			layout.set_i8(&mut key_pos, 0, 127i8);
601
602			let offset = layout.fields[0].offset;
603			// -128: 0x80 -> flip sign: 0x00 -> invert: 0xFF
604			assert_eq!(key_neg[offset], 0xFF);
605			// 0: 0x00 -> flip sign: 0x80 -> invert: 0x7F
606			assert_eq!(key_zero[offset], 0x7F);
607			// 127: 0x7F -> flip sign: 0xFF -> invert: 0x00
608			assert_eq!(key_pos[offset], 0x00);
609
610			// Verify ordering (reversed)
611			assert!(key_neg.as_slice() > key_zero.as_slice());
612			assert!(key_zero.as_slice() > key_pos.as_slice());
613		}
614	}
615
616	mod i32 {
617		use reifydb_type::value::r#type::Type;
618
619		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
620
621		#[test]
622		fn test_asc() {
623			let layout = EncodedIndexLayout::new(&[Type::Int4], &[SortDirection::Asc]).unwrap();
624			let mut key_neg = layout.allocate_key();
625			let mut key_zero = layout.allocate_key();
626			let mut key_pos = layout.allocate_key();
627
628			layout.set_i32(&mut key_neg, 0, i32::MIN);
629			layout.set_i32(&mut key_zero, 0, 0i32);
630			layout.set_i32(&mut key_pos, 0, i32::MAX);
631
632			let offset = layout.fields[0].offset;
633			// i32::MIN in big-endian with sign bit flipped
634			assert_eq!(&key_neg[offset..offset + 4], &[0x00, 0x00, 0x00, 0x00]);
635			// 0 with sign bit flipped
636			assert_eq!(&key_zero[offset..offset + 4], &[0x80, 0x00, 0x00, 0x00]);
637			// i32::MAX with sign bit flipped
638			assert_eq!(&key_pos[offset..offset + 4], &[0xFF, 0xFF, 0xFF, 0xFF]);
639
640			// Verify ordering
641			assert!(key_neg.as_slice() < key_zero.as_slice());
642			assert!(key_zero.as_slice() < key_pos.as_slice());
643		}
644
645		#[test]
646		fn test_desc() {
647			let layout = EncodedIndexLayout::new(&[Type::Int4], &[SortDirection::Desc]).unwrap();
648			let mut key_neg = layout.allocate_key();
649			let mut key_zero = layout.allocate_key();
650			let mut key_pos = layout.allocate_key();
651
652			layout.set_i32(&mut key_neg, 0, i32::MIN);
653			layout.set_i32(&mut key_zero, 0, 0i32);
654			layout.set_i32(&mut key_pos, 0, i32::MAX);
655
656			let offset = layout.fields[0].offset;
657			// i32::MIN: flip sign then invert all
658			assert_eq!(&key_neg[offset..offset + 4], &[0xFF, 0xFF, 0xFF, 0xFF]);
659			// 0: flip sign then invert all
660			assert_eq!(&key_zero[offset..offset + 4], &[0x7F, 0xFF, 0xFF, 0xFF]);
661			// i32::MAX: flip sign then invert all
662			assert_eq!(&key_pos[offset..offset + 4], &[0x00, 0x00, 0x00, 0x00]);
663
664			// Verify ordering (reversed)
665			assert!(key_neg.as_slice() > key_zero.as_slice());
666			assert!(key_zero.as_slice() > key_pos.as_slice());
667		}
668	}
669
670	mod i64 {
671		use reifydb_type::value::r#type::Type;
672
673		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
674
675		#[test]
676		fn test_asc() {
677			let layout = EncodedIndexLayout::new(&[Type::Int8], &[SortDirection::Asc]).unwrap();
678			let mut key = layout.allocate_key();
679
680			layout.set_i64(&mut key, 0, -1i64);
681
682			let offset = layout.fields[0].offset;
683			// -1 in two's complement is all 1s, with sign bit
684			// flipped becomes 0x7F...
685			assert_eq!(&key[offset..offset + 8], &[0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]);
686		}
687
688		#[test]
689		fn test_desc() {
690			let layout = EncodedIndexLayout::new(&[Type::Int8], &[SortDirection::Desc]).unwrap();
691			let mut key = layout.allocate_key();
692
693			layout.set_i64(&mut key, 0, -1i64);
694
695			let offset = layout.fields[0].offset;
696			// -1: flip sign then invert all
697			assert_eq!(&key[offset..offset + 8], &[0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
698		}
699	}
700
701	mod u8 {
702		use reifydb_type::value::r#type::Type;
703
704		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
705
706		#[test]
707		fn test_asc() {
708			let layout = EncodedIndexLayout::new(&[Type::Uint1], &[SortDirection::Asc]).unwrap();
709			let mut key_min = layout.allocate_key();
710			let mut key_mid = layout.allocate_key();
711			let mut key_max = layout.allocate_key();
712
713			layout.set_u8(&mut key_min, 0, 0u8);
714			layout.set_u8(&mut key_mid, 0, 128u8);
715			layout.set_u8(&mut key_max, 0, 255u8);
716
717			let offset = layout.fields[0].offset;
718			assert_eq!(key_min[offset], 0x00);
719			assert_eq!(key_mid[offset], 0x80);
720			assert_eq!(key_max[offset], 0xFF);
721
722			// Verify ordering
723			assert!(key_min.as_slice() < key_mid.as_slice());
724			assert!(key_mid.as_slice() < key_max.as_slice());
725		}
726
727		#[test]
728		fn test_desc() {
729			let layout = EncodedIndexLayout::new(&[Type::Uint1], &[SortDirection::Desc]).unwrap();
730			let mut key_min = layout.allocate_key();
731			let mut key_mid = layout.allocate_key();
732			let mut key_max = layout.allocate_key();
733
734			layout.set_u8(&mut key_min, 0, 0u8);
735			layout.set_u8(&mut key_mid, 0, 128u8);
736			layout.set_u8(&mut key_max, 0, 255u8);
737
738			let offset = layout.fields[0].offset;
739			// Inverted for DESC
740			assert_eq!(key_min[offset], 0xFF);
741			assert_eq!(key_mid[offset], 0x7F);
742			assert_eq!(key_max[offset], 0x00);
743
744			// Verify ordering (reversed)
745			assert!(key_min.as_slice() > key_mid.as_slice());
746			assert!(key_mid.as_slice() > key_max.as_slice());
747		}
748	}
749
750	mod u32 {
751		use reifydb_type::value::r#type::Type;
752
753		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
754
755		#[test]
756		fn test_asc() {
757			let layout = EncodedIndexLayout::new(&[Type::Uint4], &[SortDirection::Asc]).unwrap();
758			let mut key = layout.allocate_key();
759
760			layout.set_u32(&mut key, 0, 0x12345678u32);
761
762			let offset = layout.fields[0].offset;
763			// Big-endian representation
764			assert_eq!(&key[offset..offset + 4], &[0x12, 0x34, 0x56, 0x78]);
765		}
766
767		#[test]
768		fn test_desc() {
769			let layout = EncodedIndexLayout::new(&[Type::Uint4], &[SortDirection::Desc]).unwrap();
770			let mut key = layout.allocate_key();
771
772			layout.set_u32(&mut key, 0, 0x12345678u32);
773
774			let offset = layout.fields[0].offset;
775			// Inverted for DESC
776			assert_eq!(&key[offset..offset + 4], &[0xED, 0xCB, 0xA9, 0x87]);
777		}
778	}
779
780	mod u64 {
781		use reifydb_type::value::r#type::Type;
782
783		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
784
785		#[test]
786		fn test_asc() {
787			let layout = EncodedIndexLayout::new(&[Type::Uint8], &[SortDirection::Asc]).unwrap();
788			let mut key = layout.allocate_key();
789
790			layout.set_u64(&mut key, 0, u64::MAX);
791
792			let offset = layout.fields[0].offset;
793			assert_eq!(&key[offset..offset + 8], &[0xFF; 8]);
794		}
795
796		#[test]
797		fn test_desc() {
798			let layout = EncodedIndexLayout::new(&[Type::Uint8], &[SortDirection::Desc]).unwrap();
799			let mut key = layout.allocate_key();
800
801			layout.set_u64(&mut key, 0, u64::MAX);
802
803			let offset = layout.fields[0].offset;
804			assert_eq!(&key[offset..offset + 8], &[0x00; 8]);
805		}
806	}
807
808	mod f32 {
809		use reifydb_type::value::r#type::Type;
810
811		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
812
813		#[test]
814		fn test_asc() {
815			let layout = EncodedIndexLayout::new(&[Type::Float4], &[SortDirection::Asc]).unwrap();
816			let mut key_neg = layout.allocate_key();
817			let mut key_zero = layout.allocate_key();
818			let mut key_pos = layout.allocate_key();
819
820			layout.set_f32(&mut key_neg, 0, -1.0f32);
821			layout.set_f32(&mut key_zero, 0, 0.0f32);
822			layout.set_f32(&mut key_pos, 0, 1.0f32);
823
824			let offset = layout.fields[0].offset;
825
826			// -1.0f32: 0xBF800000 -> invert all: 0x407FFFFF
827			assert_eq!(&key_neg[offset..offset + 4], &[0x40, 0x7F, 0xFF, 0xFF]);
828			// 0.0f32: 0x00000000 -> flip sign: 0x80000000
829			assert_eq!(&key_zero[offset..offset + 4], &[0x80, 0x00, 0x00, 0x00]);
830			// 1.0f32: 0x3F800000 -> flip sign: 0xBF800000
831			assert_eq!(&key_pos[offset..offset + 4], &[0xBF, 0x80, 0x00, 0x00]);
832
833			// Verify ordering
834			assert!(key_neg.as_slice() < key_zero.as_slice());
835			assert!(key_zero.as_slice() < key_pos.as_slice());
836		}
837
838		#[test]
839		fn test_desc() {
840			let layout = EncodedIndexLayout::new(&[Type::Float4], &[SortDirection::Desc]).unwrap();
841			let mut key_neg = layout.allocate_key();
842			let mut key_pos = layout.allocate_key();
843
844			layout.set_f32(&mut key_neg, 0, -1.0f32);
845			layout.set_f32(&mut key_pos, 0, 1.0f32);
846
847			let offset = layout.fields[0].offset;
848
849			// -1.0f32: ASC encoding then invert for DESC
850			assert_eq!(&key_neg[offset..offset + 4], &[0xBF, 0x80, 0x00, 0x00]);
851			// 1.0f32: ASC encoding then invert for DESC
852			assert_eq!(&key_pos[offset..offset + 4], &[0x40, 0x7F, 0xFF, 0xFF]);
853
854			// Verify ordering (reversed)
855			assert!(key_neg.as_slice() > key_pos.as_slice());
856		}
857	}
858
859	mod f64 {
860		use reifydb_type::value::r#type::Type;
861
862		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
863
864		#[test]
865		fn test_asc() {
866			let layout = EncodedIndexLayout::new(&[Type::Float8], &[SortDirection::Asc]).unwrap();
867			let mut key = layout.allocate_key();
868
869			layout.set_f64(&mut key, 0, std::f64::consts::PI);
870
871			let offset = layout.fields[0].offset;
872			// PI in IEEE 754: 0x400921FB54442D18 -> flip sign bit
873			assert_eq!(&key[offset..offset + 8], &[0xC0, 0x09, 0x21, 0xFB, 0x54, 0x44, 0x2D, 0x18]);
874		}
875
876		#[test]
877		fn test_desc() {
878			let layout = EncodedIndexLayout::new(&[Type::Float8], &[SortDirection::Desc]).unwrap();
879			let mut key = layout.allocate_key();
880
881			layout.set_f64(&mut key, 0, std::f64::consts::PI);
882
883			let offset = layout.fields[0].offset;
884			// PI: ASC encoding then invert for DESC
885			assert_eq!(&key[offset..offset + 8], &[0x3F, 0xF6, 0xDE, 0x04, 0xAB, 0xBB, 0xD2, 0xE7]);
886		}
887	}
888
889	mod row_number {
890		use reifydb_type::value::r#type::Type;
891
892		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
893
894		#[test]
895		fn test_asc() {
896			let layout = EncodedIndexLayout::new(&[Type::Uint8], &[SortDirection::Asc]).unwrap();
897			let mut key = layout.allocate_key();
898
899			layout.set_row_number(&mut key, 0, 0x123456789ABCDEFu64);
900
901			let offset = layout.fields[0].offset;
902			assert_eq!(&key[offset..offset + 8], &[0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF]);
903		}
904
905		#[test]
906		fn test_desc() {
907			let layout = EncodedIndexLayout::new(&[Type::Uint8], &[SortDirection::Desc]).unwrap();
908			let mut key = layout.allocate_key();
909
910			layout.set_row_number(&mut key, 0, 0x123456789ABCDEFu64);
911
912			let offset = layout.fields[0].offset;
913			// Inverted for DESC
914			assert_eq!(&key[offset..offset + 8], &[0xFE, 0xDC, 0xBA, 0x98, 0x76, 0x54, 0x32, 0x10]);
915		}
916	}
917
918	mod date {
919		use reifydb_type::value::{date::Date, r#type::Type};
920
921		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
922
923		#[test]
924		fn test_asc() {
925			let layout = EncodedIndexLayout::new(&[Type::Date], &[SortDirection::Asc]).unwrap();
926			let mut key = layout.allocate_key();
927
928			let date = Date::new(2025, 1, 1).unwrap();
929			layout.set_date(&mut key, 0, date);
930
931			let offset = layout.fields[0].offset;
932			// Date is stored as i32 days since epoch with sign bit
933			// flipped
934			let bytes = &key[offset..offset + 4];
935
936			// Verify it's properly encoded
937			let mut expected = date.to_days_since_epoch().to_be_bytes();
938			expected[0] ^= 0x80;
939			assert_eq!(bytes, expected);
940		}
941
942		#[test]
943		fn test_desc() {
944			let layout = EncodedIndexLayout::new(&[Type::Date], &[SortDirection::Desc]).unwrap();
945			let mut key = layout.allocate_key();
946
947			let date = Date::new(2025, 1, 1).unwrap();
948			layout.set_date(&mut key, 0, date);
949
950			let offset = layout.fields[0].offset;
951			let bytes = &key[offset..offset + 4];
952
953			// Date with sign bit flipped then all inverted for DESC
954			let mut expected = date.to_days_since_epoch().to_be_bytes();
955			expected[0] ^= 0x80;
956			for b in expected.iter_mut() {
957				*b = !*b;
958			}
959			assert_eq!(bytes, expected);
960		}
961	}
962
963	mod composite {
964		use reifydb_type::value::r#type::Type;
965
966		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
967
968		#[test]
969		fn test_mixed_directions() {
970			let layout = EncodedIndexLayout::new(
971				&[Type::Int4, Type::Uint8],
972				&[SortDirection::Desc, SortDirection::Asc],
973			)
974			.unwrap();
975
976			let mut key = layout.allocate_key();
977			layout.set_i32(&mut key, 0, 100);
978			layout.set_u64(&mut key, 1, 200u64);
979
980			// Check first field (i32 DESC)
981			let offset1 = layout.fields[0].offset;
982			let mut expected_i32 = 100i32.to_be_bytes();
983			expected_i32[0] ^= 0x80;
984			for b in expected_i32.iter_mut() {
985				*b = !*b;
986			}
987			assert_eq!(&key[offset1..offset1 + 4], expected_i32);
988
989			// Check second field (u64 ASC)
990			let offset2 = layout.fields[1].offset;
991			let expected_u64 = 200u64.to_be_bytes();
992			assert_eq!(&key[offset2..offset2 + 8], expected_u64);
993		}
994	}
995
996	mod uuid4 {
997		use reifydb_type::value::{r#type::Type, uuid::Uuid4};
998
999		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
1000
1001		#[test]
1002		fn test_asc() {
1003			let layout = EncodedIndexLayout::new(&[Type::Uuid4], &[SortDirection::Asc]).unwrap();
1004			let mut key1 = layout.allocate_key();
1005			let mut key2 = layout.allocate_key();
1006
1007			let uuid1 = Uuid4::generate();
1008			let uuid2 = Uuid4::generate();
1009
1010			layout.set_uuid4(&mut key1, 0, uuid1.clone());
1011			layout.set_uuid4(&mut key2, 0, uuid2.clone());
1012
1013			// Check bitvec shows field is set
1014			assert!(key1.is_defined(0));
1015			assert!(key2.is_defined(0));
1016
1017			// Check values are stored correctly (16 bytes)
1018			let offset = layout.fields[0].offset;
1019			let uuid1_bytes: Vec<u8> = uuid1.as_bytes().to_vec();
1020			let uuid2_bytes: Vec<u8> = uuid2.as_bytes().to_vec();
1021
1022			assert_eq!(&key1[offset..offset + 16], &uuid1_bytes[..]);
1023			assert_eq!(&key2[offset..offset + 16], &uuid2_bytes[..]);
1024		}
1025
1026		#[test]
1027		fn test_desc() {
1028			let layout = EncodedIndexLayout::new(&[Type::Uuid4], &[SortDirection::Desc]).unwrap();
1029			let mut key = layout.allocate_key();
1030
1031			let uuid = Uuid4::generate();
1032			layout.set_uuid4(&mut key, 0, uuid.clone());
1033
1034			// Check value is inverted for DESC
1035			let offset = layout.fields[0].offset;
1036			let mut expected_bytes = uuid.as_bytes().to_vec();
1037			for b in expected_bytes.iter_mut() {
1038				*b = !*b;
1039			}
1040
1041			assert_eq!(&key[offset..offset + 16], &expected_bytes[..]);
1042		}
1043	}
1044
1045	mod uuid7 {
1046		use std::{thread::sleep, time::Duration};
1047
1048		use reifydb_type::value::{r#type::Type, uuid::Uuid7};
1049
1050		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
1051
1052		#[test]
1053		fn test_asc() {
1054			let layout = EncodedIndexLayout::new(&[Type::Uuid7], &[SortDirection::Asc]).unwrap();
1055			let mut key1 = layout.allocate_key();
1056			let mut key2 = layout.allocate_key();
1057
1058			let uuid1 = Uuid7::generate();
1059			// Sleep a bit to ensure different timestamps
1060			sleep(Duration::from_millis(10));
1061			let uuid2 = Uuid7::generate();
1062
1063			layout.set_uuid7(&mut key1, 0, uuid1.clone());
1064			layout.set_uuid7(&mut key2, 0, uuid2.clone());
1065
1066			// Check bitvec shows field is set
1067			assert!(key1.is_defined(0));
1068			assert!(key2.is_defined(0));
1069
1070			// Check values are stored correctly (16 bytes)
1071			let offset = layout.fields[0].offset;
1072			let uuid1_bytes: Vec<u8> = uuid1.as_bytes().to_vec();
1073			let uuid2_bytes: Vec<u8> = uuid2.as_bytes().to_vec();
1074
1075			assert_eq!(&key1[offset..offset + 16], &uuid1_bytes[..]);
1076			assert_eq!(&key2[offset..offset + 16], &uuid2_bytes[..]);
1077
1078			// UUID7 has timestamp prefix, so later should be
1079			// greater
1080			assert!(key1.as_slice() < key2.as_slice());
1081		}
1082
1083		#[test]
1084		fn test_desc() {
1085			let layout = EncodedIndexLayout::new(&[Type::Uuid7], &[SortDirection::Desc]).unwrap();
1086			let mut key1 = layout.allocate_key();
1087			let mut key2 = layout.allocate_key();
1088
1089			let uuid1 = Uuid7::generate();
1090			// Sleep a bit to ensure different timestamps
1091			sleep(Duration::from_millis(10));
1092			let uuid2 = Uuid7::generate();
1093
1094			layout.set_uuid7(&mut key1, 0, uuid1.clone());
1095			layout.set_uuid7(&mut key2, 0, uuid2.clone());
1096
1097			// Check values are inverted for DESC
1098			let offset = layout.fields[0].offset;
1099			let mut expected_bytes1 = uuid1.as_bytes().to_vec();
1100			let mut expected_bytes2 = uuid2.as_bytes().to_vec();
1101			for b in expected_bytes1.iter_mut() {
1102				*b = !*b;
1103			}
1104			for b in expected_bytes2.iter_mut() {
1105				*b = !*b;
1106			}
1107
1108			assert_eq!(&key1[offset..offset + 16], &expected_bytes1[..]);
1109			assert_eq!(&key2[offset..offset + 16], &expected_bytes2[..]);
1110
1111			// Verify ordering (reversed due to DESC)
1112			assert!(key1.as_slice() > key2.as_slice());
1113		}
1114	}
1115
1116	mod identity_id {
1117		use std::{thread::sleep, time::Duration};
1118
1119		use reifydb_type::value::{identity::IdentityId, r#type::Type, uuid::Uuid7};
1120
1121		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
1122
1123		#[test]
1124		fn test_asc() {
1125			let layout = EncodedIndexLayout::new(&[Type::IdentityId], &[SortDirection::Asc]).unwrap();
1126			let mut key1 = layout.allocate_key();
1127			let mut key2 = layout.allocate_key();
1128
1129			let id1 = IdentityId::generate();
1130			// Sleep a bit to ensure different timestamps
1131			// (IdentityId wraps Uuid7)
1132			sleep(Duration::from_millis(10));
1133			let id2 = IdentityId::generate();
1134
1135			layout.set_identity_id(&mut key1, 0, id1.clone());
1136			layout.set_identity_id(&mut key2, 0, id2.clone());
1137
1138			// Check bitvec shows field is set
1139			assert!(key1.is_defined(0));
1140			assert!(key2.is_defined(0));
1141
1142			// Check values are stored correctly (16 bytes)
1143			let offset = layout.fields[0].offset;
1144			let uuid7_1: Uuid7 = id1.into();
1145			let uuid7_2: Uuid7 = id2.into();
1146			let id1_bytes: Vec<u8> = uuid7_1.as_bytes().to_vec();
1147			let id2_bytes: Vec<u8> = uuid7_2.as_bytes().to_vec();
1148
1149			assert_eq!(&key1[offset..offset + 16], &id1_bytes[..]);
1150			assert_eq!(&key2[offset..offset + 16], &id2_bytes[..]);
1151
1152			// IdentityId wraps Uuid7 which has timestamp prefix, so
1153			// later should be greater
1154			assert!(key1.as_slice() < key2.as_slice());
1155		}
1156
1157		#[test]
1158		fn test_desc() {
1159			let layout = EncodedIndexLayout::new(&[Type::IdentityId], &[SortDirection::Desc]).unwrap();
1160			let mut key1 = layout.allocate_key();
1161			let mut key2 = layout.allocate_key();
1162
1163			let id1 = IdentityId::generate();
1164			// Sleep a bit to ensure different timestamps
1165			sleep(Duration::from_millis(10));
1166			let id2 = IdentityId::generate();
1167
1168			layout.set_identity_id(&mut key1, 0, id1.clone());
1169			layout.set_identity_id(&mut key2, 0, id2.clone());
1170
1171			// Check values are inverted for DESC
1172			let offset = layout.fields[0].offset;
1173			let uuid7_1: Uuid7 = id1.into();
1174			let uuid7_2: Uuid7 = id2.into();
1175			let mut expected_bytes1 = uuid7_1.as_bytes().to_vec();
1176			let mut expected_bytes2 = uuid7_2.as_bytes().to_vec();
1177			for b in expected_bytes1.iter_mut() {
1178				*b = !*b;
1179			}
1180			for b in expected_bytes2.iter_mut() {
1181				*b = !*b;
1182			}
1183
1184			assert_eq!(&key1[offset..offset + 16], &expected_bytes1[..]);
1185			assert_eq!(&key2[offset..offset + 16], &expected_bytes2[..]);
1186
1187			// Verify ordering (reversed due to DESC)
1188			assert!(key1.as_slice() > key2.as_slice());
1189		}
1190	}
1191
1192	mod undefined {
1193		use reifydb_type::value::r#type::Type;
1194
1195		use crate::{sort::SortDirection, value::index::layout::EncodedIndexLayout};
1196
1197		#[test]
1198		fn test_undefined() {
1199			let layout = EncodedIndexLayout::new(&[Type::Int4], &[SortDirection::Asc]).unwrap();
1200			let mut key = layout.allocate_key();
1201
1202			// Set a value first
1203			layout.set_i32(&mut key, 0, 42);
1204			assert!(key.is_defined(0));
1205
1206			// Now set it to undefined
1207			layout.set_undefined(&mut key, 0);
1208			assert!(!key.is_defined(0));
1209
1210			// Check that the data is zeroed
1211			let offset = layout.fields[0].offset;
1212			assert_eq!(&key[offset..offset + 4], &[0, 0, 0, 0]);
1213		}
1214	}
1215}