use super::{Block, BlockHandle, GlobalTableId};
use crate::{
Cache, CompressionType, KeyRange, Table, encryption::EncryptionProvider,
file_accessor::FileAccessor, table::block::BlockType, version::run::Ranged,
};
use std::path::Path;
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
#[must_use]
pub fn aggregate_run_key_range(tables: &[Table]) -> KeyRange {
#[expect(clippy::expect_used, reason = "runs are never empty by definition")]
let lo = tables.first().expect("run should never be empty");
#[expect(clippy::expect_used, reason = "runs are never empty by definition")]
let hi = tables.last().expect("run should never be empty");
KeyRange::new((lo.key_range().min().clone(), hi.key_range().max().clone()))
}
#[derive(Debug)]
pub struct SliceIndexes(pub usize, pub usize);
#[expect(
clippy::too_many_arguments,
reason = "block loading requires table id, path, file accessor, cache, handle, block type, compression, and encryption context"
)]
pub fn load_block(
table_id: GlobalTableId,
path: &Path,
file_accessor: &FileAccessor,
cache: &Cache,
handle: &BlockHandle,
block_type: BlockType,
compression: CompressionType,
encryption: Option<&dyn EncryptionProvider>,
#[cfg(zstd_any)] zstd_dict: Option<&crate::compression::ZstdDictionary>,
#[cfg(feature = "metrics")] metrics: &Metrics,
) -> crate::Result<Block> {
#[cfg(feature = "metrics")]
use std::sync::atomic::Ordering::Relaxed;
log::trace!("load {block_type:?} block {handle:?}");
if let Some(block) = cache.get_block(table_id, handle.offset()) {
if block.header.block_type != block_type {
return Err(crate::Error::InvalidTag((
"BlockType",
block.header.block_type.into(),
)));
}
#[cfg(feature = "metrics")]
match block_type {
BlockType::Filter => {
metrics.filter_block_load_cached.fetch_add(1, Relaxed);
}
BlockType::Index => {
metrics.index_block_load_cached.fetch_add(1, Relaxed);
}
BlockType::RangeTombstone => {
metrics
.range_tombstone_block_load_cached
.fetch_add(1, Relaxed);
}
BlockType::Data | BlockType::Meta => {
metrics.data_block_load_cached.fetch_add(1, Relaxed);
}
}
return Ok(block);
}
let (fd, cache_event) = file_accessor.get_or_open_table(&table_id, path)?;
#[cfg(feature = "metrics")]
if let Some(hit) = cache_event {
if hit {
metrics.table_file_opened_cached.fetch_add(1, Relaxed);
} else {
metrics.table_file_opened_uncached.fetch_add(1, Relaxed);
}
}
#[cfg(not(feature = "metrics"))]
let _ = cache_event;
let block = Block::from_file(
fd.as_ref(),
*handle,
compression,
encryption,
#[cfg(zstd_any)]
zstd_dict,
)?;
if block.header.block_type != block_type {
return Err(crate::Error::InvalidTag((
"BlockType",
block.header.block_type.into(),
)));
}
#[cfg(feature = "metrics")]
match block_type {
BlockType::Filter => {
metrics.filter_block_load_io.fetch_add(1, Relaxed);
metrics
.filter_block_io_requested
.fetch_add(handle.size().into(), Relaxed);
}
BlockType::Index => {
metrics.index_block_load_io.fetch_add(1, Relaxed);
metrics
.index_block_io_requested
.fetch_add(handle.size().into(), Relaxed);
}
BlockType::RangeTombstone => {
metrics.range_tombstone_block_load_io.fetch_add(1, Relaxed);
metrics
.range_tombstone_block_io_requested
.fetch_add(handle.size().into(), Relaxed);
}
BlockType::Data | BlockType::Meta => {
metrics.data_block_load_io.fetch_add(1, Relaxed);
metrics
.data_block_io_requested
.fetch_add(handle.size().into(), Relaxed);
}
}
cache.insert_block(table_id, handle.offset(), block.clone());
Ok(block)
}
#[must_use]
pub fn longest_shared_prefix_length(s1: &[u8], s2: &[u8]) -> usize {
#[cfg(target_arch = "x86_64")]
{
if std::is_x86_feature_detected!("avx2") {
return unsafe { lsp_avx2(s1, s2) };
}
return unsafe { lsp_sse2(s1, s2) };
}
#[cfg(all(target_arch = "aarch64", target_endian = "little"))]
{
return unsafe { lsp_neon(s1, s2) };
}
#[cfg_attr(
any(
target_arch = "x86_64",
all(target_arch = "aarch64", target_endian = "little")
),
expect(
unreachable_code,
reason = "x86_64 SSE2 and LE aarch64 NEON arms above are unconditional; scalar tail only reached on other archs/endianness"
)
)]
lsp_scalar(s1, s2)
}
#[must_use]
pub(crate) fn lsp_scalar(s1: &[u8], s2: &[u8]) -> usize {
let min_len = s1.len().min(s2.len());
let mut i = 0;
while i + 8 <= min_len {
#[expect(unsafe_code, reason = "bounds checked by loop guard above")]
let (a, b) = unsafe {
(
s1.as_ptr().add(i).cast::<u64>().read_unaligned(),
s2.as_ptr().add(i).cast::<u64>().read_unaligned(),
)
};
let diff = a ^ b;
if diff != 0 {
#[cfg(target_endian = "little")]
let byte_off = (diff.trailing_zeros() / 8) as usize;
#[cfg(target_endian = "big")]
let byte_off = (diff.leading_zeros() / 8) as usize;
return i + byte_off;
}
i += 8;
}
while i < min_len {
#[expect(unsafe_code, reason = "i < min_len bounds-checked above")]
let (a, b) = unsafe { (*s1.get_unchecked(i), *s2.get_unchecked(i)) };
if a != b {
return i;
}
i += 1;
}
min_len
}
#[cfg(target_arch = "x86_64")]
#[target_feature(enable = "avx2")]
#[expect(unsafe_code, reason = "intrinsics require unsafe")]
#[must_use]
unsafe fn lsp_avx2(s1: &[u8], s2: &[u8]) -> usize {
use std::arch::x86_64::{__m256i, _mm256_cmpeq_epi8, _mm256_loadu_si256, _mm256_movemask_epi8};
let min_len = s1.len().min(s2.len());
let mut i = 0;
while i + 32 <= min_len {
#[expect(
clippy::cast_ptr_alignment,
reason = "_mm256_loadu_si256 explicitly performs an unaligned 32-byte load"
)]
let (va, vb) = unsafe {
(
_mm256_loadu_si256(s1.as_ptr().add(i).cast::<__m256i>()),
_mm256_loadu_si256(s2.as_ptr().add(i).cast::<__m256i>()),
)
};
let cmp = _mm256_cmpeq_epi8(va, vb);
let mask = _mm256_movemask_epi8(cmp).cast_unsigned();
if mask != u32::MAX {
return i + (!mask).trailing_zeros() as usize;
}
i += 32;
}
while i < min_len {
let (a, b) = unsafe { (*s1.get_unchecked(i), *s2.get_unchecked(i)) };
if a != b {
return i;
}
i += 1;
}
min_len
}
#[cfg(target_arch = "x86_64")]
#[target_feature(enable = "sse2")]
#[expect(unsafe_code, reason = "intrinsics require unsafe")]
#[must_use]
unsafe fn lsp_sse2(s1: &[u8], s2: &[u8]) -> usize {
use std::arch::x86_64::{__m128i, _mm_cmpeq_epi8, _mm_loadu_si128, _mm_movemask_epi8};
let min_len = s1.len().min(s2.len());
let mut i = 0;
while i + 16 <= min_len {
#[expect(
clippy::cast_ptr_alignment,
reason = "_mm_loadu_si128 explicitly performs an unaligned 16-byte load"
)]
let (va, vb) = unsafe {
(
_mm_loadu_si128(s1.as_ptr().add(i).cast::<__m128i>()),
_mm_loadu_si128(s2.as_ptr().add(i).cast::<__m128i>()),
)
};
let cmp = _mm_cmpeq_epi8(va, vb);
let mask = _mm_movemask_epi8(cmp).cast_unsigned();
if mask != 0xFFFF {
return i + (!mask).trailing_zeros() as usize;
}
i += 16;
}
while i < min_len {
let (a, b) = unsafe { (*s1.get_unchecked(i), *s2.get_unchecked(i)) };
if a != b {
return i;
}
i += 1;
}
min_len
}
#[cfg(all(target_arch = "aarch64", target_endian = "little"))]
#[target_feature(enable = "neon")]
#[expect(unsafe_code, reason = "intrinsics require unsafe")]
#[must_use]
unsafe fn lsp_neon(s1: &[u8], s2: &[u8]) -> usize {
use std::arch::aarch64::{
vandq_u8, vceqq_u8, vdupq_n_u8, vgetq_lane_u64, vld1q_u8, vreinterpretq_u64_u8,
};
let min_len = s1.len().min(s2.len());
let mut i = 0;
while i + 16 <= min_len {
let (va, vb) = unsafe { (vld1q_u8(s1.as_ptr().add(i)), vld1q_u8(s2.as_ptr().add(i))) };
let cmp = vceqq_u8(va, vb);
let masked = vandq_u8(cmp, vdupq_n_u8(0xFF));
let as_u64 = vreinterpretq_u64_u8(masked);
let lo = vgetq_lane_u64(as_u64, 0);
let hi = vgetq_lane_u64(as_u64, 1);
if lo != u64::MAX {
return i + (!lo).trailing_zeros() as usize / 8;
}
if hi != u64::MAX {
return i + 8 + (!hi).trailing_zeros() as usize / 8;
}
i += 16;
}
while i < min_len {
let (a, b) = unsafe { (*s1.get_unchecked(i), *s2.get_unchecked(i)) };
if a != b {
return i;
}
i += 1;
}
min_len
}
#[must_use]
pub fn compare_prefixed_slice(
prefix: &[u8],
suffix: &[u8],
needle: &[u8],
cmp: &dyn crate::comparator::UserComparator,
) -> std::cmp::Ordering {
if cmp.is_lexicographic() {
return compare_prefixed_slice_lexicographic(prefix, suffix, needle);
}
let total_len = prefix.len() + suffix.len();
if total_len <= 256 {
let mut buf = [0_u8; 256];
#[expect(clippy::indexing_slicing, reason = "total_len <= 256 checked above")]
{
buf[..prefix.len()].copy_from_slice(prefix);
buf[prefix.len()..total_len].copy_from_slice(suffix);
}
#[expect(clippy::indexing_slicing, reason = "total_len <= 256 checked above")]
return cmp.compare(&buf[..total_len], needle);
}
let mut full_key = Vec::with_capacity(total_len);
full_key.extend_from_slice(prefix);
full_key.extend_from_slice(suffix);
cmp.compare(&full_key, needle)
}
#[must_use]
fn compare_prefixed_slice_lexicographic(
prefix: &[u8],
suffix: &[u8],
needle: &[u8],
) -> std::cmp::Ordering {
use std::cmp::Ordering::{Equal, Greater};
if needle.is_empty() {
let combined_len = prefix.len() + suffix.len();
return if combined_len > 0 { Greater } else { Equal };
}
let max_pfx_len = prefix.len().min(needle.len());
{
#[expect(
unsafe_code,
reason = "max_pfx_len <= prefix.len() && max_pfx_len <= needle.len()"
)]
let pfx = unsafe { prefix.get_unchecked(0..max_pfx_len) };
#[expect(
unsafe_code,
reason = "max_pfx_len <= prefix.len() && max_pfx_len <= needle.len()"
)]
let ndl = unsafe { needle.get_unchecked(0..max_pfx_len) };
match pfx.cmp(ndl) {
Equal => {}
ordering => return ordering,
}
}
let rest_len = prefix.len().saturating_sub(needle.len());
if rest_len > 0 {
return Greater;
}
#[expect(
unsafe_code,
reason = "max_pfx_len <= needle.len() guaranteed by rest_len == 0 guard above"
)]
let remaining_needle = unsafe { needle.get_unchecked(max_pfx_len..) };
suffix.cmp(remaining_needle)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::comparator::DefaultUserComparator;
use test_log::test;
#[test]
fn test_longest_shared_prefix_length() {
assert_eq!(3, longest_shared_prefix_length(b"abc", b"abc"));
assert_eq!(1, longest_shared_prefix_length(b"abc", b"a"));
assert_eq!(1, longest_shared_prefix_length(b"a", b"abc"));
assert_eq!(0, longest_shared_prefix_length(b"abc", b""));
assert_eq!(0, longest_shared_prefix_length(b"", b"abc"));
assert_eq!(0, longest_shared_prefix_length(b"", b""));
assert_eq!(0, longest_shared_prefix_length(b"", b""));
assert_eq!(0, longest_shared_prefix_length(b"abc", b"def"));
assert_eq!(1, longest_shared_prefix_length(b"abc", b"acc"));
}
fn lsp_reference(s1: &[u8], s2: &[u8]) -> usize {
s1.iter().zip(s2.iter()).take_while(|(a, b)| a == b).count()
}
#[test]
fn lsp_scalar_matches_reference_on_boundaries() {
for total_len in [
0_usize, 1, 7, 8, 9, 15, 16, 17, 31, 32, 33, 63, 64, 127, 128,
] {
for mismatch_at in 0..=total_len {
let mut a = vec![0xAA; total_len];
let mut b = a.clone();
if mismatch_at < total_len {
#[expect(
clippy::expect_used,
reason = "test: mismatch_at < total_len = b.len() guarantees in-bounds"
)]
{
*b.get_mut(mismatch_at).expect("in bounds") ^= 0xFF;
}
}
let got = lsp_scalar(&a, &b);
let want = lsp_reference(&a, &b);
assert_eq!(
want, got,
"scalar @ len={total_len} mismatch_at={mismatch_at}"
);
a.truncate(mismatch_at);
let got_short = lsp_scalar(&a, &b);
let want_short = lsp_reference(&a, &b);
assert_eq!(
want_short, got_short,
"scalar asym len={mismatch_at} vs {total_len}"
);
}
}
}
#[test]
fn longest_shared_prefix_length_matches_reference_on_boundaries() {
for total_len in [
0_usize, 1, 7, 8, 9, 15, 16, 17, 31, 32, 33, 63, 64, 127, 128, 255, 256,
] {
for mismatch_at in 0..=total_len {
let mut a = vec![0xAA; total_len];
let mut b = a.clone();
if mismatch_at < total_len {
#[expect(
clippy::expect_used,
reason = "test: mismatch_at < total_len = b.len() guarantees in-bounds"
)]
{
*b.get_mut(mismatch_at).expect("in bounds") ^= 0xFF;
}
}
let got = longest_shared_prefix_length(&a, &b);
let want = lsp_reference(&a, &b);
assert_eq!(
want, got,
"dispatch @ len={total_len} mismatch_at={mismatch_at}"
);
a.truncate(mismatch_at);
let got_short = longest_shared_prefix_length(&a, &b);
let want_short = lsp_reference(&a, &b);
assert_eq!(
want_short, got_short,
"dispatch asym len={mismatch_at} vs {total_len}"
);
}
}
}
#[test]
fn lsp_extreme_byte_patterns_match_reference() {
for &(label, byte_a, byte_b) in &[
("all_zero_equal", 0x00_u8, 0x00_u8),
("all_ff_equal", 0xFF, 0xFF),
("zero_vs_ff", 0x00, 0xFF),
("alternating_match", 0x55, 0x55),
] {
for len in [0_usize, 1, 8, 15, 16, 31, 32, 33, 63, 64, 128, 1023] {
let a = vec![byte_a; len];
let b = vec![byte_b; len];
let want = lsp_reference(&a, &b);
assert_eq!(want, lsp_scalar(&a, &b), "scalar {label} len={len}");
assert_eq!(
want,
longest_shared_prefix_length(&a, &b),
"dispatch {label} len={len}"
);
}
}
for len in [0_usize, 1, 8, 32, 128, 1024] {
let nonempty = vec![0x42_u8; len];
assert_eq!(0, lsp_scalar(&nonempty, &[]));
assert_eq!(0, lsp_scalar(&[], &nonempty));
assert_eq!(0, longest_shared_prefix_length(&nonempty, &[]));
assert_eq!(0, longest_shared_prefix_length(&[], &nonempty));
}
}
fn assert_kernel_matches_reference<F: Fn(&[u8], &[u8]) -> usize>(label: &str, kernel: F) {
for total_len in [
0_usize, 1, 7, 8, 9, 15, 16, 17, 31, 32, 33, 63, 64, 127, 128, 255, 256,
] {
for mismatch_at in 0..=total_len {
let mut a = vec![0xAA; total_len];
let mut b = a.clone();
if mismatch_at < total_len {
#[expect(
clippy::expect_used,
reason = "test: mismatch_at < total_len = b.len() guarantees in-bounds"
)]
{
*b.get_mut(mismatch_at).expect("in bounds") ^= 0xFF;
}
}
let want = lsp_reference(&a, &b);
assert_eq!(
want,
kernel(&a, &b),
"{label} @ len={total_len} mismatch_at={mismatch_at}"
);
a.truncate(mismatch_at);
let want_short = lsp_reference(&a, &b);
assert_eq!(
want_short,
kernel(&a, &b),
"{label} asym len={mismatch_at} vs {total_len}"
);
}
}
}
#[cfg(target_arch = "x86_64")]
#[test]
fn lsp_sse2_matches_reference_on_boundaries() {
assert_kernel_matches_reference("sse2", |a, b| unsafe { lsp_sse2(a, b) });
}
#[cfg(target_arch = "x86_64")]
#[test]
fn lsp_avx2_matches_reference_on_boundaries() {
if !std::is_x86_feature_detected!("avx2") {
return;
}
assert_kernel_matches_reference("avx2", |a, b| unsafe { lsp_avx2(a, b) });
}
#[cfg(all(target_arch = "aarch64", target_endian = "little"))]
#[test]
fn lsp_neon_matches_reference_on_boundaries() {
assert_kernel_matches_reference("neon", |a, b| unsafe { lsp_neon(a, b) });
}
proptest::proptest! {
#[test]
fn lsp_scalar_equals_reference(
s1 in proptest::collection::vec(proptest::num::u8::ANY, 0..=1024),
s2 in proptest::collection::vec(proptest::num::u8::ANY, 0..=1024),
) {
proptest::prop_assert_eq!(lsp_scalar(&s1, &s2), lsp_reference(&s1, &s2));
}
#[test]
fn longest_shared_prefix_length_equals_reference(
s1 in proptest::collection::vec(proptest::num::u8::ANY, 0..=1024),
s2 in proptest::collection::vec(proptest::num::u8::ANY, 0..=1024),
) {
proptest::prop_assert_eq!(longest_shared_prefix_length(&s1, &s2), lsp_reference(&s1, &s2));
}
}
#[test]
fn test_compare_prefixed_slice() {
use std::cmp::Ordering::{Equal, Greater, Less};
assert_eq!(
Greater,
compare_prefixed_slice(&[0, 161], &[], &[0], &DefaultUserComparator)
);
assert_eq!(
Equal,
compare_prefixed_slice(b"abc", b"xyz", b"abcxyz", &DefaultUserComparator)
);
assert_eq!(
Equal,
compare_prefixed_slice(b"abc", b"", b"abc", &DefaultUserComparator)
);
assert_eq!(
Equal,
compare_prefixed_slice(b"abc", b"abc", b"abcabc", &DefaultUserComparator)
);
assert_eq!(
Equal,
compare_prefixed_slice(b"", b"", b"", &DefaultUserComparator)
);
assert_eq!(
Less,
compare_prefixed_slice(b"a", b"", b"y", &DefaultUserComparator)
);
assert_eq!(
Less,
compare_prefixed_slice(b"a", b"", b"yyy", &DefaultUserComparator)
);
assert_eq!(
Less,
compare_prefixed_slice(b"a", b"", b"yyy", &DefaultUserComparator)
);
assert_eq!(
Less,
compare_prefixed_slice(b"yyyy", b"a", b"yyyyb", &DefaultUserComparator)
);
assert_eq!(
Less,
compare_prefixed_slice(b"yyy", b"b", b"yyyyb", &DefaultUserComparator)
);
assert_eq!(
Less,
compare_prefixed_slice(b"abc", b"d", b"abce", &DefaultUserComparator)
);
assert_eq!(
Less,
compare_prefixed_slice(b"ab", b"", b"ac", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"a", b"", b"", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"", b"a", b"", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"a", b"a", b"", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"b", b"a", b"a", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"a", b"b", b"a", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"abc", b"xy", b"abcw", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"ab", b"cde", b"a", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"abcd", b"zz", b"abc", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"abc", b"d", b"abc", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"aaaa", b"aaab", b"aaaaaaaa", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"aaaa", b"aaba", b"aaaaaaaa", &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(b"abcd", b"x", b"abc", &DefaultUserComparator)
);
assert_eq!(
Less,
compare_prefixed_slice(&[0x7F], &[], &[0x80], &DefaultUserComparator)
);
assert_eq!(
Greater,
compare_prefixed_slice(&[0xFF], &[], &[0x10], &DefaultUserComparator)
);
}
struct ReverseComparator;
impl crate::comparator::UserComparator for ReverseComparator {
fn name(&self) -> &'static str {
"test-reverse"
}
fn compare(&self, a: &[u8], b: &[u8]) -> std::cmp::Ordering {
b.cmp(a)
}
}
#[test]
fn test_compare_prefixed_slice_custom_comparator() {
use std::cmp::Ordering::{Equal, Greater, Less};
use crate::comparator::UserComparator as _;
assert_eq!(ReverseComparator.name(), "test-reverse");
assert_eq!(
Greater,
compare_prefixed_slice(b"ab", b"c", b"xyz", &ReverseComparator)
);
assert_eq!(
Less,
compare_prefixed_slice(b"xy", b"z", b"abc", &ReverseComparator)
);
assert_eq!(
Equal,
compare_prefixed_slice(b"ab", b"c", b"abc", &ReverseComparator)
);
assert_eq!(
Equal,
compare_prefixed_slice(b"", b"", b"", &ReverseComparator)
);
assert_eq!(
Less, compare_prefixed_slice(b"a", b"", b"", &ReverseComparator)
);
}
}