use super::{KEY_KIND_TAG_SIZE, KEY_PREFIX_SIZE, SEGMENT_LEN_SIZE};
use crate::{
MAX_INDEX_FIELDS,
db::index::{
IndexId, IndexKey, IndexKeyKind, PrimaryKeyEquivalenceError, RawIndexKey,
primary_key_matches_value,
},
traits::Storable,
types::{Decimal, EntityTag, Float32, Float64, Int, Principal},
value::{StorageKey, StorageKeyDecodeError, StorageKeyEncodeError, Value, ValueEnum},
};
use std::{borrow::Cow, cmp::Ordering, ops::Bound as RangeBound};
fn index_id() -> IndexId {
IndexId::new(EntityTag::new(1), 0)
}
fn other_index_id() -> IndexId {
IndexId::new(EntityTag::new(1), 1)
}
fn encode_component(value: &Value) -> Vec<u8> {
super::super::ordered::encode_canonical_index_component(value).expect("component should encode")
}
fn key_with(kind: IndexKeyKind, id: IndexId, components: Vec<Vec<u8>>, pk: Vec<u8>) -> IndexKey {
IndexKey {
key_kind: kind,
index_id: id,
components,
primary_key: pk,
}
}
fn expected_index_id_entity_email_bytes() -> Vec<u8> {
let mut out = Vec::new();
out.extend_from_slice(&1u64.to_be_bytes());
out.extend_from_slice(&0u16.to_be_bytes());
out
}
fn decode_must_fail_corrupted(bytes: Vec<u8>, label: &str) {
let raw = RawIndexKey::from_bytes(Cow::Owned(bytes));
let err = IndexKey::try_from_raw(&raw).expect_err(label);
assert!(
err.contains("corrupted"),
"decode error should classify corruption, got: {err}"
);
}
fn make_component(byte: u8) -> Vec<u8> {
vec![byte; 3]
}
fn make_pk(byte: u8) -> Vec<u8> {
vec![byte; IndexKey::MAX_PK_SIZE]
}
fn next_random_u64(state: &mut u64) -> u64 {
let mut x = *state;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
*state = x;
x
}
fn mixed_component_value(seed: u64, slot: u8) -> Value {
let selector = seed % 4;
match slot {
0 => match selector {
0 => Value::Int(-7),
1 => Value::Int(-2),
2 => Value::Int(0),
_ => Value::Int(7),
},
1 => match selector {
0 => Value::Text("aa".to_string()),
1 => Value::Text("ab".to_string()),
2 => Value::Text("mm".to_string()),
_ => Value::Text("zz".to_string()),
},
2 => match selector {
0 => Value::Int(-9),
1 => Value::Int(-1),
2 => Value::Int(1),
_ => Value::Int(9),
},
3 => match selector {
0 => Value::Text("ka".to_string()),
1 => Value::Text("kb".to_string()),
2 => Value::Text("mb".to_string()),
_ => Value::Text("zz".to_string()),
},
_ => unreachable!("randomized mixed-composite fixture uses exactly four slots"),
}
}
fn slot_min_value(slot: usize) -> Value {
match slot {
1 => Value::Text(String::new()),
0 | 2 => Value::Int(i64::MIN),
_ => unreachable!("prefix property slot must be in [0, 2]"),
}
}
fn slot_max_value(slot: usize) -> Value {
match slot {
1 => Value::Text("zz".to_string()),
0 | 2 => Value::Int(i64::MAX),
_ => unreachable!("prefix property slot must be in [0, 2]"),
}
}
fn prefix_matches(candidate: &[Value], prefix: &[Value]) -> bool {
candidate
.iter()
.zip(prefix.iter())
.all(|(candidate_value, prefix_value)| candidate_value == prefix_value)
}
fn len_offset() -> usize {
KEY_KIND_TAG_SIZE + IndexId::STORED_SIZE_USIZE
}
fn first_component_len_offset() -> usize {
KEY_PREFIX_SIZE
}
#[test]
fn index_key_rejects_undersized_bytes() {
let bytes = vec![0u8; IndexKey::MIN_STORED_SIZE_USIZE.saturating_sub(1)];
let raw = RawIndexKey::from_bytes(Cow::Borrowed(&bytes));
let err = IndexKey::try_from_raw(&raw).expect_err("undersized key should fail");
assert!(err.contains("corrupted"));
}
#[test]
fn index_key_rejects_oversized_bytes() {
let bytes = vec![0u8; IndexKey::STORED_SIZE_USIZE + 1];
let raw = RawIndexKey::from_bytes(Cow::Borrowed(&bytes));
let err = IndexKey::try_from_raw(&raw).expect_err("oversized key should fail");
assert!(err.contains("corrupted"));
}
#[test]
fn index_key_rejects_unknown_kind_tag() {
let key = IndexKey::empty(&index_id());
let mut bytes = key.to_raw().as_bytes().to_vec();
bytes[0] = 0xFF;
let raw = RawIndexKey::from_bytes(Cow::Owned(bytes));
let err = IndexKey::try_from_raw(&raw).expect_err("unknown kind tag should fail");
assert!(err.contains("corrupted"));
}
#[test]
#[expect(clippy::cast_possible_truncation)]
fn index_key_rejects_len_over_max() {
let key = IndexKey {
key_kind: IndexKeyKind::User,
index_id: index_id(),
components: vec![make_component(1)],
primary_key: make_pk(2),
};
let mut bytes = key.to_raw().as_bytes().to_vec();
bytes[len_offset()] = (MAX_INDEX_FIELDS as u8) + 1;
let raw = RawIndexKey::from_bytes(Cow::Owned(bytes));
let err = IndexKey::try_from_raw(&raw).expect_err("oversized length should fail");
assert!(err.contains("corrupted"));
}
#[test]
fn index_key_accepts_max_storable_index_id() {
let key = IndexKey::empty(&IndexId::max_storable());
let raw = key.to_raw();
let decoded = IndexKey::try_from_raw(&raw).expect("fixed-width index id should decode");
assert_eq!(decoded, key);
}
#[test]
fn index_key_rejects_truncated_key() {
let key = IndexKey {
key_kind: IndexKeyKind::User,
index_id: index_id(),
components: vec![make_component(1)],
primary_key: make_pk(2),
};
let mut bytes = key.to_raw().as_bytes().to_vec();
bytes.pop();
let raw = RawIndexKey::from_bytes(Cow::Owned(bytes));
let err = IndexKey::try_from_raw(&raw).expect_err("truncated payload should fail");
assert!(err.contains("truncated"));
}
#[test]
fn index_key_rejects_overlong_key_segments() {
let key = IndexKey {
key_kind: IndexKeyKind::User,
index_id: index_id(),
components: vec![make_component(3)],
primary_key: make_pk(4),
};
let mut bytes = key.to_raw().as_bytes().to_vec();
let offset = first_component_len_offset();
#[expect(clippy::cast_possible_truncation)]
let overlong = (IndexKey::MAX_COMPONENT_SIZE + 1) as u16;
bytes[offset..offset + SEGMENT_LEN_SIZE].copy_from_slice(&overlong.to_be_bytes());
let raw = RawIndexKey::from_bytes(Cow::Owned(bytes));
let err = IndexKey::try_from_raw(&raw).expect_err("overlong payload should fail");
assert!(err.contains("overlong"));
}
#[test]
fn index_key_rejects_trailing_bytes() {
let key = IndexKey {
key_kind: IndexKeyKind::User,
index_id: index_id(),
components: vec![make_component(3)],
primary_key: make_pk(4),
};
let mut bytes = key.to_raw().as_bytes().to_vec();
bytes.push(42);
let raw = RawIndexKey::from_bytes(Cow::Owned(bytes));
let err = IndexKey::try_from_raw(&raw).expect_err("trailing bytes should fail");
assert!(err.contains("trailing"));
}
#[test]
fn raw_index_key_validated_component_reads_requested_segment() {
let first = encode_component(&Value::Text("alpha".to_string()));
let second = encode_component(&Value::Uint(7));
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![first.clone(), second.clone()],
make_pk(9),
);
let raw = key.to_raw();
assert_eq!(
raw.validated_component(0)
.expect("first component should validate"),
Some(first.as_slice()),
);
assert_eq!(
raw.validated_component(1)
.expect("second component should validate"),
Some(second.as_slice()),
);
}
#[test]
fn raw_index_key_validated_component_returns_none_for_out_of_range_slot() {
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Text("alpha".to_string()))],
make_pk(9),
);
assert_eq!(
key.to_raw()
.validated_component(1)
.expect("out-of-range component should still validate the key"),
None,
);
}
#[test]
fn raw_index_key_validated_component_rejects_trailing_bytes() {
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Text("alpha".to_string()))],
make_pk(9),
);
let mut bytes = key.to_raw().as_bytes().to_vec();
bytes.push(42);
let raw = RawIndexKey::from_bytes(Cow::Owned(bytes));
let err = raw
.validated_component(0)
.expect_err("component extraction should reject trailing bytes");
assert!(err.contains("trailing"));
}
#[test]
fn index_key_prefix_bounds_are_isolated_between_user_and_system_kinds() {
let prefix = vec![vec![0x33u8, 0x44, 0x55]];
let (user_start, user_end) =
IndexKey::bounds_for_prefix_with_kind(&index_id(), IndexKeyKind::User, 2, &prefix);
let (system_start, system_end) =
IndexKey::bounds_for_prefix_with_kind(&index_id(), IndexKeyKind::System, 2, &prefix);
let user_start_raw = user_start.to_raw();
let user_end_raw = user_end.to_raw();
let system_start_raw = system_start.to_raw();
let system_end_raw = system_end.to_raw();
assert!(user_start_raw <= user_end_raw);
assert!(system_start_raw <= system_end_raw);
assert!(user_end_raw < system_start_raw);
}
#[test]
fn raw_bounds_for_all_components_match_canonical_wildcard_primary_key_bounds() {
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![make_component(0x11), make_component(0x22)],
make_pk(0x33),
);
let (lower, upper) = key.raw_bounds_for_all_components();
let expected_lower = key_with(
IndexKeyKind::User,
index_id(),
vec![make_component(0x11), make_component(0x22)],
IndexKey::wildcard_low_pk(),
)
.to_raw();
let expected_upper = key_with(
IndexKeyKind::User,
index_id(),
vec![make_component(0x11), make_component(0x22)],
IndexKey::wildcard_high_pk(),
)
.to_raw();
assert_eq!(lower.as_bytes(), expected_lower.as_bytes());
assert_eq!(upper.as_bytes(), expected_upper.as_bytes());
}
#[test]
fn index_key_ordering_is_stable_across_cardinality_transitions() {
let first = vec![7u8, 7u8, 7u8];
let (len_one_key, _) =
IndexKey::bounds_for_prefix(&index_id(), 1, std::slice::from_ref(&first));
let (len_two_key, _) =
IndexKey::bounds_for_prefix(&index_id(), 2, &[first, vec![0u8, 0u8, 0u8]]);
assert!(len_one_key < len_two_key);
assert!(len_one_key.to_raw() < len_two_key.to_raw());
}
#[test]
fn index_key_roundtrip_supports_max_cardinality() {
let mut prefix = Vec::with_capacity(MAX_INDEX_FIELDS);
for i in 0..MAX_INDEX_FIELDS {
#[expect(clippy::cast_possible_truncation)]
let byte = i as u8;
prefix.push(vec![byte, byte.wrapping_add(1), byte.wrapping_add(2)]);
}
let (key, _) = IndexKey::bounds_for_prefix(&index_id(), MAX_INDEX_FIELDS, &prefix);
let raw = key.to_raw();
let decoded = IndexKey::try_from_raw(&raw).expect("max-cardinality key should decode");
assert_eq!(decoded, key);
assert_eq!(decoded.to_raw().as_bytes(), raw.as_bytes());
}
#[test]
fn index_key_ordering_matches_raw_key_semantics() {
fn make_key(
kind: IndexKeyKind,
index_id: &IndexId,
components: Vec<Vec<u8>>,
pk: Vec<u8>,
) -> IndexKey {
IndexKey {
key_kind: kind,
index_id: *index_id,
components,
primary_key: pk,
}
}
let idx_a = index_id();
let idx_b = other_index_id();
let keys = vec![
make_key(
IndexKeyKind::User,
&idx_a,
vec![vec![1u8, 1u8], vec![2u8, 2u8]],
make_pk(1),
),
make_key(
IndexKeyKind::User,
&idx_a,
vec![vec![1u8, 1u8], vec![3u8, 3u8]],
make_pk(1),
),
make_key(IndexKeyKind::User, &idx_a, vec![vec![2u8, 2u8]], make_pk(1)),
make_key(
IndexKeyKind::System,
&idx_a,
vec![vec![0u8, 0u8]],
make_pk(1),
),
make_key(IndexKeyKind::User, &idx_b, vec![vec![0u8, 0u8]], make_pk(1)),
];
let mut sorted_by_ord = keys.clone();
sorted_by_ord.sort();
let mut sorted_by_raw = keys;
sorted_by_raw.sort_by_key(IndexKey::to_raw);
assert_eq!(sorted_by_ord, sorted_by_raw);
}
#[test]
fn raw_index_key_ordering_ignores_tuple_length_prefix_bytes() {
let alex = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Text("alex".to_string()))],
vec![0x01],
);
let bob = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Text("bob".to_string()))],
vec![0x02],
);
assert!(alex < bob);
assert!(alex.to_raw() < bob.to_raw());
assert!(alex.to_raw().as_bytes() > bob.to_raw().as_bytes());
}
#[test]
#[expect(clippy::cast_possible_truncation)]
fn index_key_decode_fuzz_roundtrip_is_canonical() {
const RUNS: u64 = 1_000;
let mut seed = 0xBADC_0FFE_u64;
let size_span = IndexKey::STORED_SIZE_USIZE - IndexKey::MIN_STORED_SIZE_USIZE + 1;
for _ in 0..RUNS {
seed = seed.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
let len = IndexKey::MIN_STORED_SIZE_USIZE + ((seed as usize) % size_span);
let mut bytes = vec![0u8; len];
for byte in &mut bytes {
seed = seed.wrapping_mul(6_364_136_223_846_793_005).wrapping_add(1);
*byte = (seed >> 24) as u8;
}
let raw = RawIndexKey::from_bytes(Cow::Borrowed(&bytes));
if let Ok(decoded) = IndexKey::try_from_raw(&raw) {
let reencoded = decoded.to_raw();
assert_eq!(raw.as_bytes(), reencoded.as_bytes());
}
}
}
#[test]
fn index_key_kind_is_explicit() {
let user_id = index_id();
let user_key = IndexKey::empty_with_kind(&user_id, IndexKeyKind::User);
assert_eq!(user_key.to_raw().as_bytes()[0], IndexKeyKind::User as u8);
assert!(!user_key.uses_system_namespace());
let system_id = index_id();
let system_key = IndexKey::empty_with_kind(&system_id, IndexKeyKind::System);
assert_eq!(
system_key.to_raw().as_bytes()[0],
IndexKeyKind::System as u8
);
assert!(system_key.uses_system_namespace());
let namespace_only_id = other_index_id();
let namespace_only_key = IndexKey::empty_with_kind(&namespace_only_id, IndexKeyKind::User);
assert_eq!(
namespace_only_key.to_raw().as_bytes()[0],
IndexKeyKind::User as u8
);
assert!(!namespace_only_key.uses_system_namespace());
}
#[test]
fn index_key_golden_snapshot_user_single_decimal_normalized() {
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Decimal(Decimal::new(10, 1)))],
vec![0xAA, 0xBB, 0xCC],
);
let mut expected = vec![IndexKeyKind::User as u8];
expected.extend_from_slice(&expected_index_id_entity_email_bytes());
expected.push(1);
expected.extend_from_slice(&[0x00, 0x08]);
expected.extend_from_slice(&[0x05, 0x02, 0x80, 0x00, 0x00, 0x00, 0x31, 0x00]);
expected.extend_from_slice(&[0x00, 0x03, 0xAA, 0xBB, 0xCC]);
assert_eq!(key.to_raw().as_bytes(), expected.as_slice());
}
#[test]
fn index_key_golden_snapshot_system_two_component_text_and_identifier() {
let key = key_with(
IndexKeyKind::System,
index_id(),
vec![
encode_component(&Value::Text("alpha".to_string())),
encode_component(&Value::Principal(Principal::from_slice(&[1u8, 2, 3]))),
],
vec![0x10],
);
let mut expected = vec![IndexKeyKind::System as u8];
expected.extend_from_slice(&expected_index_id_entity_email_bytes());
expected.push(2);
expected.extend_from_slice(&[0x00, 0x08]);
expected.extend_from_slice(&[0x12, b'a', b'l', b'p', b'h', b'a', 0x00, 0x00]);
expected.extend_from_slice(&[0x00, 0x06]);
expected.extend_from_slice(&[0x10, 0x01, 0x02, 0x03, 0x00, 0x00]);
expected.extend_from_slice(&[0x00, 0x01, 0x10]);
assert_eq!(key.to_raw().as_bytes(), expected.as_slice());
}
#[test]
fn index_key_golden_snapshot_user_max_cardinality_mixed_components() {
assert_eq!(
MAX_INDEX_FIELDS, 4,
"golden snapshot freezes the v0.10 max-cardinality contract"
);
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![
encode_component(&Value::Float64(
Float64::try_new(0.0).expect("finite float should construct"),
)),
encode_component(&Value::IntBig(Int::from(999i32))),
encode_component(&Value::IntBig(Int::from(-7i32))),
encode_component(&Value::Enum(
ValueEnum::new("State", Some("MyPath")).with_payload(Value::Int(7)),
)),
],
vec![0x42, 0x43],
);
let mut expected = vec![IndexKeyKind::User as u8];
expected.extend_from_slice(&expected_index_id_entity_email_bytes());
expected.push(0x04);
expected.extend_from_slice(&[0x00, 0x09]);
expected.extend_from_slice(&[0x09, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
expected.extend_from_slice(&[0x00, 0x07]);
expected.extend_from_slice(&[0x0C, 0x02, 0x00, 0x03, b'9', b'9', b'9']);
expected.extend_from_slice(&[0x00, 0x05]);
expected.extend_from_slice(&[0x0C, 0x00, 0xFF, 0xFE, 0xC8]);
expected.extend_from_slice(&[0x00, 0x1D]);
expected.extend_from_slice(&[
0x07, b'S', b't', b'a', b't', b'e', 0x00, 0x00, 0x01, b'M', b'y', b'P', b'a', b't', b'h',
0x00, 0x00, 0x01, 0x00, 0x09, 0x0A, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
]);
expected.extend_from_slice(&[0x00, 0x02, 0x42, 0x43]);
assert_eq!(key.to_raw().as_bytes(), expected.as_slice());
}
#[test]
fn index_key_component_boundary_corruption_is_rejected() {
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![vec![0xA1, 0xA2, 0xA3, 0xA4], vec![0xB1, 0xB2, 0xB3]],
vec![0xCC, 0xDD, 0xEE],
);
let base = key.to_raw().as_bytes().to_vec();
let first_len_offset = KEY_PREFIX_SIZE;
let first_payload_offset = first_len_offset + SEGMENT_LEN_SIZE;
let second_len_offset = first_payload_offset + 4;
let second_payload_offset = second_len_offset + SEGMENT_LEN_SIZE;
let pk_len_offset = second_payload_offset + 3;
let mut shorter_first = base.clone();
shorter_first[first_len_offset..first_len_offset + SEGMENT_LEN_SIZE]
.copy_from_slice(&(3u16).to_be_bytes());
decode_must_fail_corrupted(
shorter_first,
"shortened component length should reject as corrupted",
);
let mut longer_first = base.clone();
longer_first[first_len_offset..first_len_offset + SEGMENT_LEN_SIZE]
.copy_from_slice(&(5u16).to_be_bytes());
decode_must_fail_corrupted(
longer_first,
"lengthened component length should reject as corrupted",
);
let mut middle_byte_removed = base.clone();
middle_byte_removed.remove(first_payload_offset + 1);
decode_must_fail_corrupted(
middle_byte_removed,
"missing middle component byte should reject as corrupted",
);
let mut truncated_pk_len = base.clone();
truncated_pk_len.remove(pk_len_offset);
decode_must_fail_corrupted(
truncated_pk_len,
"truncated pk length segment should reject as corrupted",
);
let mut trailing = base;
trailing.push(0x99);
decode_must_fail_corrupted(trailing, "trailing bytes should reject as corrupted");
}
#[test]
fn index_key_ordering_cartesian_semantic_vs_raw_key_order() {
#[derive(Clone)]
struct Fixture {
key: IndexKey,
values: Vec<Value>,
pk: Vec<u8>,
}
let numerics = [Value::Int(-2), Value::Int(7)];
let texts = [Value::Text("aa".to_string()), Value::Text("zz".to_string())];
let decimals = [
Value::Decimal(Decimal::new(10, 1)),
Value::Decimal(Decimal::new(11, 1)),
];
let enums = [
Value::Enum(ValueEnum::new("A", Some("EnumPath"))),
Value::Enum(ValueEnum::new("B", Some("EnumPath")).with_payload(Value::Int(1))),
];
let mut fixtures = Vec::new();
let mut ordinal = 0u8;
for numeric in numerics {
for text in &texts {
for decimal in &decimals {
for enum_value in &enums {
let values = vec![
numeric.clone(),
text.clone(),
decimal.clone(),
enum_value.clone(),
];
let components = values.iter().map(encode_component).collect::<Vec<_>>();
let pk = vec![ordinal];
ordinal = ordinal.wrapping_add(1);
fixtures.push(Fixture {
key: key_with(IndexKeyKind::User, index_id(), components, pk.clone()),
values,
pk,
});
}
}
}
}
let mut semantic_sorted = fixtures.clone();
semantic_sorted.sort_by(|left, right| {
for (left_value, right_value) in left.values.iter().zip(&right.values) {
let cmp =
super::super::ordered::compare_index_component_values(left_value, right_value);
if cmp != Ordering::Equal {
return cmp;
}
}
left.pk.cmp(&right.pk)
});
let mut sorted_by_ord = fixtures.clone();
sorted_by_ord.sort_by(|left, right| left.key.cmp(&right.key));
let mut sorted_by_raw = fixtures;
sorted_by_raw.sort_by(|left, right| left.key.to_raw().cmp(&right.key.to_raw()));
let semantic_bytes = semantic_sorted
.iter()
.map(|fixture| fixture.key.to_raw().as_bytes().to_vec())
.collect::<Vec<_>>();
let ord_bytes = sorted_by_ord
.iter()
.map(|fixture| fixture.key.to_raw().as_bytes().to_vec())
.collect::<Vec<_>>();
let raw_ord_bytes = sorted_by_raw
.iter()
.map(|fixture| fixture.key.to_raw().as_bytes().to_vec())
.collect::<Vec<_>>();
assert_eq!(ord_bytes, raw_ord_bytes);
assert_eq!(semantic_bytes, raw_ord_bytes);
}
#[test]
fn index_key_ordering_randomized_mixed_composite_semantic_vs_raw_key_order() {
#[derive(Clone)]
struct Fixture {
key: IndexKey,
values: Vec<Value>,
pk: Vec<u8>,
}
const SAMPLE_COUNT: usize = 256;
const COMPONENT_COUNT: usize = 4;
let mut fixtures = Vec::with_capacity(SAMPLE_COUNT);
let mut state = 0xA11C_E5ED_0BAD_5EEDu64;
for ordinal in 0..SAMPLE_COUNT {
let values = (0..COMPONENT_COUNT)
.map(|slot| {
mixed_component_value(
next_random_u64(&mut state),
u8::try_from(slot).expect("component slot should fit u8"),
)
})
.collect::<Vec<_>>();
let components = values.iter().map(encode_component).collect::<Vec<_>>();
let pk = u16::try_from(ordinal)
.expect("sample ordinal should fit u16")
.to_be_bytes()
.to_vec();
fixtures.push(Fixture {
key: key_with(IndexKeyKind::User, index_id(), components, pk.clone()),
values,
pk,
});
}
let mut semantic_sorted = fixtures.clone();
semantic_sorted.sort_by(|left, right| {
for (left_value, right_value) in left.values.iter().zip(&right.values) {
let cmp =
super::super::ordered::compare_index_component_values(left_value, right_value);
if cmp != Ordering::Equal {
return cmp;
}
}
left.pk.cmp(&right.pk)
});
let mut raw_sorted = fixtures;
raw_sorted.sort_by(|left, right| left.key.to_raw().cmp(&right.key.to_raw()));
let semantic_bytes = semantic_sorted
.iter()
.map(|fixture| fixture.key.to_raw().as_bytes().to_vec())
.collect::<Vec<_>>();
let raw_ord_bytes = raw_sorted
.iter()
.map(|fixture| fixture.key.to_raw().as_bytes().to_vec())
.collect::<Vec<_>>();
assert_eq!(semantic_bytes, raw_ord_bytes);
}
#[test]
fn index_key_prefix_scan_simulation_matches_expected_and_is_isolated() {
let first_component = encode_component(&Value::Text("alpha".to_string()));
let second_component_low = encode_component(&Value::Int(1));
let second_component_high = encode_component(&Value::Int(2));
let other_first_component = encode_component(&Value::Text("beta".to_string()));
let user_a = key_with(
IndexKeyKind::User,
index_id(),
vec![first_component.clone(), second_component_low.clone()],
vec![0x01],
);
let user_b = key_with(
IndexKeyKind::User,
index_id(),
vec![first_component.clone(), second_component_low],
vec![0x02],
);
let user_c = key_with(
IndexKeyKind::User,
index_id(),
vec![first_component.clone(), second_component_high],
vec![0x03],
);
let user_other_component = key_with(
IndexKeyKind::User,
index_id(),
vec![other_first_component, encode_component(&Value::Int(1))],
vec![0x04],
);
let system_same_prefix = key_with(
IndexKeyKind::System,
index_id(),
vec![first_component.clone(), encode_component(&Value::Int(1))],
vec![0x05],
);
let user_other_index = key_with(
IndexKeyKind::User,
other_index_id(),
vec![first_component.clone(), encode_component(&Value::Int(1))],
vec![0x06],
);
let all_keys = [
user_a.clone(),
user_b.clone(),
user_c.clone(),
user_other_component,
system_same_prefix,
user_other_index,
];
let all_raw = all_keys.iter().map(IndexKey::to_raw).collect::<Vec<_>>();
let (start, end) = IndexKey::bounds_for_prefix(&index_id(), 2, &[first_component]);
let (start_raw, end_raw) = (start.to_raw(), end.to_raw());
let mut matched = all_raw
.iter()
.filter(|raw| **raw >= start_raw && **raw <= end_raw)
.map(|raw| raw.as_bytes().to_vec())
.collect::<Vec<_>>();
matched.sort();
let mut expected = vec![
user_a.to_raw().as_bytes().to_vec(),
user_b.to_raw().as_bytes().to_vec(),
user_c.to_raw().as_bytes().to_vec(),
];
expected.sort();
assert_eq!(matched, expected);
}
#[test]
fn index_key_pk_terminal_tie_break_and_prefix_visibility() {
let components = vec![
encode_component(&Value::Text("dup".to_string())),
encode_component(&Value::Int(9)),
];
let lower_pk = key_with(
IndexKeyKind::User,
index_id(),
components.clone(),
vec![0x00, 0x01],
);
let higher_pk = key_with(IndexKeyKind::User, index_id(), components, vec![0x00, 0xFF]);
assert!(lower_pk < higher_pk);
assert!(lower_pk.to_raw() < higher_pk.to_raw());
let prefix = vec![
encode_component(&Value::Text("dup".to_string())),
encode_component(&Value::Int(9)),
];
let (start, end) = IndexKey::bounds_for_prefix(&index_id(), 2, &prefix);
let start_raw = start.to_raw();
let end_raw = end.to_raw();
let mut hits = vec![lower_pk.to_raw(), higher_pk.to_raw()]
.into_iter()
.filter(|raw| *raw >= start_raw && *raw <= end_raw)
.map(|raw| raw.as_bytes().to_vec())
.collect::<Vec<_>>();
hits.sort();
assert_eq!(hits.len(), 2);
}
#[test]
fn index_prefix_scan_bounds_are_consistent_with_key_encoding() {
let prefix_component = encode_component(&Value::Text("alpha".to_string()));
let (start, end) =
IndexKey::bounds_for_prefix(&index_id(), 2, std::slice::from_ref(&prefix_component));
let Some(start_second_component) = start.component(1) else {
panic!("prefix envelope start key should contain wildcard suffix component");
};
let Some(end_second_component) = end.component(1) else {
panic!("prefix envelope end key should contain wildcard suffix component");
};
assert_eq!(
start_second_component,
&[0],
"prefix envelope lower wildcard component must stay low sentinel",
);
assert_eq!(
end_second_component.len(),
IndexKey::MAX_COMPONENT_SIZE,
"prefix envelope upper wildcard component must stay max-width sentinel",
);
assert!(
end_second_component.iter().all(|byte| *byte == 0xFF),
"prefix envelope upper wildcard component must stay 0xFF-filled sentinel",
);
let start_raw = start.to_raw();
let end_raw = end.to_raw();
assert!(start_raw < end_raw, "prefix envelope must remain non-empty");
let matching_keys = [
key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_component.clone(), encode_component(&Value::Uint(1))],
StorageKey::Uint(1)
.to_bytes()
.expect("storage key encoding should succeed")
.to_vec(),
),
key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_component, encode_component(&Value::Uint(2))],
StorageKey::Uint(2)
.to_bytes()
.expect("storage key encoding should succeed")
.to_vec(),
),
];
for matching in matching_keys {
let raw = matching.to_raw();
assert!(
raw >= start_raw,
"matching prefix key must be >= prefix start"
);
assert!(
raw < end_raw,
"matching prefix key must be < prefix end sentinel"
);
}
let before_prefix = key_with(
IndexKeyKind::User,
index_id(),
vec![
encode_component(&Value::Text("aalph".to_string())),
encode_component(&Value::Uint(1)),
],
StorageKey::Uint(3)
.to_bytes()
.expect("storage key encoding should succeed")
.to_vec(),
)
.to_raw();
let after_prefix = key_with(
IndexKeyKind::User,
index_id(),
vec![
encode_component(&Value::Text("zebra".to_string())),
encode_component(&Value::Uint(1)),
],
StorageKey::Uint(4)
.to_bytes()
.expect("storage key encoding should succeed")
.to_vec(),
)
.to_raw();
assert!(
before_prefix < start_raw,
"non-matching lower prefix key must sort before prefix start",
);
assert!(
after_prefix >= end_raw,
"non-matching higher prefix key must sort at-or-after prefix end sentinel",
);
}
#[test]
fn index_key_primary_key_equivalence_accepts_matching_and_rejects_mismatched_boundaries() {
let encoded_pk = StorageKey::Int(-7)
.to_bytes()
.expect("storage key encoding should succeed")
.to_vec();
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Int(9))],
encoded_pk,
);
let matches = primary_key_matches_value(&key, &Value::Int(-7))
.expect("row-identity equivalence check should decode valid primary key");
let mismatches = primary_key_matches_value(&key, &Value::Int(-8))
.expect("row-identity equivalence check should decode valid primary key");
assert!(matches);
assert!(!mismatches);
}
#[test]
fn index_key_primary_key_equivalence_fails_closed_on_corrupted_anchor_payload() {
let mut corrupted_pk = vec![0u8; StorageKey::STORED_SIZE_USIZE];
corrupted_pk[0] = 0xFF;
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Int(9))],
corrupted_pk,
);
let err = primary_key_matches_value(&key, &Value::Int(9))
.expect_err("corrupted index-key primary-key payload should fail closed");
assert!(matches!(
err,
PrimaryKeyEquivalenceError::AnchorDecode {
source: StorageKeyDecodeError::InvalidTag
}
));
}
#[test]
fn index_key_primary_key_equivalence_rejects_non_storage_key_boundary_values() {
let encoded_pk = StorageKey::Uint(11)
.to_bytes()
.expect("storage key encoding should succeed")
.to_vec();
let key = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Int(9))],
encoded_pk,
);
let err = primary_key_matches_value(&key, &Value::Text("not-keyable".to_string()))
.expect_err("non-storage-key boundary values must fail closed");
assert!(matches!(
err,
PrimaryKeyEquivalenceError::BoundaryEncode {
source: StorageKeyEncodeError::UnsupportedValueKind { kind: "Text" }
}
));
}
#[test]
fn index_key_partial_truncation_cases_fail_closed() {
let key_with_truncated_pk = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Int(9))],
vec![0x01, 0x02, 0x03],
);
let missing_pk_err = primary_key_matches_value(&key_with_truncated_pk, &Value::Int(9))
.expect_err("missing PK bytes must fail closed in primary-key equivalence");
assert!(matches!(
missing_pk_err,
PrimaryKeyEquivalenceError::AnchorDecode {
source: StorageKeyDecodeError::InvalidSize
}
));
let baseline = key_with(
IndexKeyKind::User,
index_id(),
vec![encode_component(&Value::Int(9))],
StorageKey::Uint(1)
.to_bytes()
.expect("storage key encoding should succeed")
.to_vec(),
)
.to_raw()
.as_bytes()
.to_vec();
let mut missing_namespace_prefix = baseline.clone();
missing_namespace_prefix.remove(KEY_KIND_TAG_SIZE);
decode_must_fail_corrupted(
missing_namespace_prefix,
"missing namespace-prefix byte must fail closed",
);
let mut truncated_key = baseline;
truncated_key.pop();
decode_must_fail_corrupted(truncated_key, "truncated index key must fail closed");
}
#[test]
#[expect(clippy::too_many_lines)]
fn index_key_prefix_lowering_matches_direct_prefix_membership_property() {
let mut seed = 0x0D15_EA5E_BAAD_F00D_u64;
let index_len = 3usize;
for case_idx in 0..512 {
let prefix_len = usize::try_from(next_random_u64(&mut seed) % 4).unwrap_or(0);
let prefix_values = (0..prefix_len)
.map(|slot| {
mixed_component_value(
next_random_u64(&mut seed),
u8::try_from(slot).expect("prefix slot index must fit in u8"),
)
})
.collect::<Vec<_>>();
let candidate_values = (0..index_len)
.map(|slot| {
mixed_component_value(
next_random_u64(&mut seed),
u8::try_from(slot).expect("candidate slot index must fit in u8"),
)
})
.collect::<Vec<_>>();
let prefix_components = prefix_values
.iter()
.map(encode_component)
.collect::<Vec<_>>();
let (lower, upper) =
IndexKey::bounds_for_prefix(&index_id(), index_len, &prefix_components);
let lower_raw = lower.to_raw();
let upper_raw = upper.to_raw();
let pk_lo = u8::try_from(next_random_u64(&mut seed) & 0xFF).unwrap_or(0);
let pk_hi = u8::try_from((next_random_u64(&mut seed) >> 8) & 0xFF).unwrap_or(0);
let candidate_key = key_with(
IndexKeyKind::User,
index_id(),
candidate_values
.iter()
.map(encode_component)
.collect::<Vec<_>>(),
vec![pk_lo, pk_hi],
);
let candidate_raw = candidate_key.to_raw();
let lowered_contains = candidate_raw >= lower_raw && candidate_raw <= upper_raw;
let direct_contains = prefix_matches(&candidate_values, &prefix_values);
assert_eq!(
lowered_contains, direct_contains,
"prefix lowering must match direct prefix membership (case={case_idx}, prefix_len={prefix_len})"
);
let mut min_edge_values = prefix_values.clone();
let mut max_edge_values = prefix_values.clone();
for slot in prefix_len..index_len {
min_edge_values.push(slot_min_value(slot));
max_edge_values.push(slot_max_value(slot));
}
let min_edge_key = key_with(
IndexKeyKind::User,
index_id(),
min_edge_values
.iter()
.map(encode_component)
.collect::<Vec<_>>(),
vec![0x00],
);
let max_edge_key = key_with(
IndexKeyKind::User,
index_id(),
max_edge_values
.iter()
.map(encode_component)
.collect::<Vec<_>>(),
vec![0xFF],
);
let min_edge_raw = min_edge_key.to_raw();
let max_edge_raw = max_edge_key.to_raw();
assert!(
min_edge_raw >= lower_raw && min_edge_raw <= upper_raw,
"lowered bounds must include minimum suffix edge (case={case_idx}, prefix_len={prefix_len})",
);
assert!(
max_edge_raw >= lower_raw && max_edge_raw <= upper_raw,
"lowered bounds must include maximum suffix edge (case={case_idx}, prefix_len={prefix_len})",
);
if prefix_len > 0 {
let mut outside_values = min_edge_values.clone();
outside_values[0] = if outside_values[0] == slot_min_value(0) {
slot_max_value(0)
} else {
slot_min_value(0)
};
assert_ne!(
outside_values[0], prefix_values[0],
"outside sample must mutate first prefix component",
);
let outside_key = key_with(
IndexKeyKind::User,
index_id(),
outside_values
.iter()
.map(encode_component)
.collect::<Vec<_>>(),
vec![0x7F],
);
let outside_raw = outside_key.to_raw();
let lowered_contains_outside = outside_raw >= lower_raw && outside_raw <= upper_raw;
assert!(
!lowered_contains_outside,
"lowered bounds must exclude keys outside the prefix domain (case={case_idx}, prefix_len={prefix_len})",
);
}
}
}
fn in_range(
raw: &RawIndexKey,
lower: &RangeBound<RawIndexKey>,
upper: &RangeBound<RawIndexKey>,
) -> bool {
let lower_ok = match lower {
RangeBound::Unbounded => true,
RangeBound::Included(bound) => raw >= bound,
RangeBound::Excluded(bound) => raw > bound,
};
let upper_ok = match upper {
RangeBound::Unbounded => true,
RangeBound::Included(bound) => raw <= bound,
RangeBound::Excluded(bound) => raw < bound,
};
lower_ok && upper_ok
}
#[test]
fn index_key_component_range_excluded_upper_skips_entire_upper_value_group() {
let prefix_a = encode_component(&Value::Uint(7));
let b10 = encode_component(&Value::Uint(10));
let b11 = encode_component(&Value::Uint(11));
let b20 = encode_component(&Value::Uint(20));
let c1 = encode_component(&Value::Uint(1));
let c9 = encode_component(&Value::Uint(9));
let k_b10_lo = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_a.clone(), b10.clone(), c1.clone()],
vec![0x01],
);
let k_b10_hi = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_a.clone(), b10, c9.clone()],
vec![0xFF],
);
let k_b11 = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_a.clone(), b11, c1],
vec![0x44],
);
let k_b20 = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_a, b20, c9],
vec![0x99],
);
let (lower, upper) = IndexKey::bounds_for_prefix_component_range(
&index_id(),
3,
&[encode_component(&Value::Uint(7))],
&RangeBound::Included(encode_component(&Value::Uint(10))),
&RangeBound::Excluded(encode_component(&Value::Uint(20))),
);
let keys = [
k_b10_lo.to_raw(),
k_b10_hi.to_raw(),
k_b11.to_raw(),
k_b20.to_raw(),
];
let hits = keys
.iter()
.filter(|raw| {
in_range(
raw,
&raw_index_key_bound(lower.clone()),
&raw_index_key_bound(upper.clone()),
)
})
.count();
assert_eq!(
hits, 3,
"b=10 and b=11 should match; b=20 should be excluded"
);
}
#[test]
fn index_key_component_range_excluded_lower_skips_entire_lower_value_group() {
let b10 = encode_component(&Value::Uint(10));
let b11 = encode_component(&Value::Uint(11));
let b20 = encode_component(&Value::Uint(20));
let k_b10 = key_with(
IndexKeyKind::User,
index_id(),
vec![
encode_component(&Value::Uint(7)),
b10,
encode_component(&Value::Uint(1)),
],
vec![0x01],
);
let k_b11 = key_with(
IndexKeyKind::User,
index_id(),
vec![
encode_component(&Value::Uint(7)),
b11,
encode_component(&Value::Uint(1)),
],
vec![0x02],
);
let k_b20 = key_with(
IndexKeyKind::User,
index_id(),
vec![
encode_component(&Value::Uint(7)),
b20,
encode_component(&Value::Uint(1)),
],
vec![0x03],
);
let (lower, upper) = IndexKey::bounds_for_prefix_component_range(
&index_id(),
3,
&[encode_component(&Value::Uint(7))],
&RangeBound::Excluded(encode_component(&Value::Uint(10))),
&RangeBound::Included(encode_component(&Value::Uint(20))),
);
let keys = [k_b10.to_raw(), k_b11.to_raw(), k_b20.to_raw()];
let hits = keys
.iter()
.filter(|raw| {
in_range(
raw,
&raw_index_key_bound(lower.clone()),
&raw_index_key_bound(upper.clone()),
)
})
.count();
assert_eq!(
hits, 2,
"b=10 should be excluded; b=11 and b=20 should match"
);
}
#[test]
fn index_key_component_range_inclusive_extremes_cover_min_and_max_groups() {
let prefix_7 = encode_component(&Value::Uint(7));
let b0 = encode_component(&Value::Uint(0));
let b1 = encode_component(&Value::Uint(1));
let b_max = encode_component(&Value::Uint(u64::from(u32::MAX)));
let k_b0 = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_7.clone(), b0, encode_component(&Value::Uint(1))],
vec![0x11],
);
let k_b1 = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_7.clone(), b1, encode_component(&Value::Uint(1))],
vec![0x12],
);
let k_b_max = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_7.clone(), b_max, encode_component(&Value::Uint(1))],
vec![0x13],
);
let k_other_prefix = key_with(
IndexKeyKind::User,
index_id(),
vec![
encode_component(&Value::Uint(8)),
encode_component(&Value::Uint(0)),
encode_component(&Value::Uint(1)),
],
vec![0x14],
);
let (lower, upper) = IndexKey::bounds_for_prefix_component_range(
&index_id(),
3,
&[prefix_7],
&RangeBound::Included(encode_component(&Value::Uint(0))),
&RangeBound::Included(encode_component(&Value::Uint(u64::from(u32::MAX)))),
);
let keys = [
k_b0.to_raw(),
k_b1.to_raw(),
k_b_max.to_raw(),
k_other_prefix.to_raw(),
];
let hits = keys
.iter()
.filter(|raw| {
in_range(
raw,
&raw_index_key_bound(lower.clone()),
&raw_index_key_bound(upper.clone()),
)
})
.count();
assert_eq!(
hits, 3,
"inclusive [0, u32::MAX] range should include min/max groups for the selected prefix"
);
}
#[test]
fn index_key_component_range_exclusive_extremes_skip_min_and_max_groups() {
let prefix_7 = encode_component(&Value::Uint(7));
let b0 = encode_component(&Value::Uint(0));
let b1 = encode_component(&Value::Uint(1));
let b_max_minus_1 = encode_component(&Value::Uint(u64::from(u32::MAX) - 1));
let b_max = encode_component(&Value::Uint(u64::from(u32::MAX)));
let k_b0 = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_7.clone(), b0, encode_component(&Value::Uint(1))],
vec![0x21],
);
let k_b1 = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_7.clone(), b1, encode_component(&Value::Uint(1))],
vec![0x22],
);
let k_b_max_minus_1 = key_with(
IndexKeyKind::User,
index_id(),
vec![
prefix_7.clone(),
b_max_minus_1,
encode_component(&Value::Uint(1)),
],
vec![0x23],
);
let k_b_max = key_with(
IndexKeyKind::User,
index_id(),
vec![prefix_7, b_max, encode_component(&Value::Uint(1))],
vec![0x24],
);
let (lower, upper) = IndexKey::bounds_for_prefix_component_range(
&index_id(),
3,
&[encode_component(&Value::Uint(7))],
&RangeBound::Excluded(encode_component(&Value::Uint(0))),
&RangeBound::Excluded(encode_component(&Value::Uint(u64::from(u32::MAX)))),
);
let keys = [
k_b0.to_raw(),
k_b1.to_raw(),
k_b_max_minus_1.to_raw(),
k_b_max.to_raw(),
];
let hits = keys
.iter()
.filter(|raw| {
in_range(
raw,
&raw_index_key_bound(lower.clone()),
&raw_index_key_bound(upper.clone()),
)
})
.count();
assert_eq!(
hits, 2,
"exclusive (0, u32::MAX) range should skip both edge groups"
);
}
fn raw_index_key_bound(bound: RangeBound<IndexKey>) -> RangeBound<RawIndexKey> {
match bound {
RangeBound::Unbounded => RangeBound::Unbounded,
RangeBound::Included(key) => RangeBound::Included(key.to_raw()),
RangeBound::Excluded(key) => RangeBound::Excluded(key.to_raw()),
}
}
#[test]
fn index_key_float_nan_policy_and_zero_canonicalization_are_frozen() {
assert!(Float64::try_new(f64::NAN).is_none());
assert!(Float32::try_new(f32::NAN).is_none());
let plus_zero = encode_component(&Value::Float64(
Float64::try_new(0.0).expect("finite float should construct"),
));
let minus_zero = encode_component(&Value::Float64(
Float64::try_new(-0.0).expect("finite float should construct"),
));
assert_eq!(plus_zero, minus_zero);
assert_eq!(
plus_zero,
vec![0x09, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
);
}
#[test]
fn index_key_size_bound_enforcement_accepts_max_and_rejects_over_max() {
let components = (0..MAX_INDEX_FIELDS)
.map(|_| vec![0xAB; IndexKey::MAX_COMPONENT_SIZE])
.collect::<Vec<_>>();
let key = key_with(
IndexKeyKind::User,
index_id(),
components,
vec![0xCD; IndexKey::MAX_PK_SIZE],
);
let raw = key.to_raw();
assert_eq!(raw.as_bytes().len(), IndexKey::STORED_SIZE_USIZE);
let decoded = IndexKey::try_from_raw(&raw).expect("max-sized key should decode");
assert_eq!(decoded.to_raw().as_bytes(), raw.as_bytes());
let mut over_max = raw.as_bytes().to_vec();
over_max.push(0x00);
decode_must_fail_corrupted(over_max, "over-max key bytes should be rejected");
}
#[test]
fn index_key_cross_index_isolation_keeps_ranges_separate() {
let component = encode_component(&Value::Text("same".to_string()));
let idx_a = index_id();
let idx_b = other_index_id();
let a1 = key_with(
IndexKeyKind::User,
idx_a,
vec![component.clone()],
vec![0x01],
);
let a2 = key_with(
IndexKeyKind::User,
idx_a,
vec![component.clone()],
vec![0x02],
);
let b1 = key_with(
IndexKeyKind::User,
idx_b,
vec![component.clone()],
vec![0x01],
);
let b2 = key_with(IndexKeyKind::User, idx_b, vec![component], vec![0x02]);
let mut raws = vec![a1.to_raw(), a2.to_raw(), b1.to_raw(), b2.to_raw()];
raws.sort();
let a_bytes = [
a1.to_raw().as_bytes().to_vec(),
a2.to_raw().as_bytes().to_vec(),
];
let b_bytes = [
b1.to_raw().as_bytes().to_vec(),
b2.to_raw().as_bytes().to_vec(),
];
if idx_a < idx_b {
assert!(a_bytes.iter().all(|a| b_bytes.iter().all(|b| a < b)));
} else {
assert!(b_bytes.iter().all(|b| a_bytes.iter().all(|a| b < a)));
}
let (start, end) = IndexKey::bounds_for_prefix(
&idx_a,
1,
&[encode_component(&Value::Text("same".to_string()))],
);
let (start_raw, end_raw) = (start.to_raw(), end.to_raw());
let matched = raws
.into_iter()
.filter(|raw| *raw >= start_raw && *raw <= end_raw)
.collect::<Vec<_>>();
assert_eq!(matched.len(), 2);
for raw in matched {
let decoded = IndexKey::try_from_raw(&raw).expect("matched key should decode");
assert_eq!(decoded.index_id, idx_a);
}
}