#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use super::{Block, BlockHandle, GlobalTableId};
use crate::path::Path;
use crate::{
Cache, CompressionType, KeyRange, Table, encryption::EncryptionProvider,
file_accessor::FileAccessor, table::block::BlockType, version::run::Ranged,
};
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
#[must_use]
pub fn aggregate_run_key_range(tables: &[Table]) -> KeyRange {
#[expect(clippy::expect_used, reason = "runs are never empty by definition")]
let lo = tables.first().expect("run should never be empty");
#[expect(clippy::expect_used, reason = "runs are never empty by definition")]
let hi = tables.last().expect("run should never be empty");
KeyRange::new((lo.key_range().min().clone(), hi.key_range().max().clone()))
}
#[derive(Debug)]
pub struct SliceIndexes(pub usize, pub usize);
#[expect(
clippy::too_many_arguments,
reason = "block loading requires table id, path, file accessor, cache, handle, block type, compression, and heal context"
)]
pub fn load_block(
table_id: GlobalTableId,
path: &Path,
file_accessor: &FileAccessor,
cache: &Cache,
handle: &BlockHandle,
block_type: BlockType,
compression: CompressionType,
encryption: Option<&dyn EncryptionProvider>,
ecc: Option<crate::table::block::EccParams>,
#[cfg(zstd_any)] zstd_dict: Option<&crate::compression::ZstdDictionary>,
heal_hints: Option<&crate::heal_hints::HealHints>,
#[cfg(feature = "metrics")] metrics: &Metrics,
) -> crate::Result<Block> {
#[cfg(feature = "metrics")]
use core::sync::atomic::Ordering::Relaxed;
log::trace!("load {block_type:?} block {handle:?}");
if matches!(block_type, BlockType::Manifest | BlockType::ManifestFooter) {
return Err(crate::Error::InvalidTag(("BlockType", block_type.into())));
}
if let Some(block) = cache.get_block(table_id, handle.offset()) {
if block.header.block_type != block_type {
return Err(crate::Error::InvalidTag((
"BlockType",
block.header.block_type.into(),
)));
}
#[cfg(feature = "metrics")]
match block_type {
BlockType::Filter => {
metrics.filter_block_load_cached.fetch_add(1, Relaxed);
}
BlockType::Index => {
metrics.index_block_load_cached.fetch_add(1, Relaxed);
}
BlockType::RangeTombstone => {
metrics
.range_tombstone_block_load_cached
.fetch_add(1, Relaxed);
}
BlockType::Data | BlockType::Meta | BlockType::Columnar => {
metrics.data_block_load_cached.fetch_add(1, Relaxed);
}
BlockType::Manifest
| BlockType::ManifestFooter
| BlockType::BlockLayout
| BlockType::Locator
| BlockType::SeqnoBounds
| BlockType::ZoneMap
| BlockType::DeleteBitmap => {}
}
return Ok(block);
}
let (fd, cache_event) = file_accessor.get_or_open_table(&table_id, path)?;
#[cfg(feature = "metrics")]
if let Some(hit) = cache_event {
if hit {
metrics.table_file_opened_cached.fetch_add(1, Relaxed);
} else {
metrics.table_file_opened_uncached.fetch_add(1, Relaxed);
}
}
#[cfg(not(feature = "metrics"))]
let _ = cache_event;
let transform = build_block_transform(
compression,
encryption,
ecc,
#[cfg(zstd_any)]
zstd_dict,
)?;
let (block, ecc_status, recovery) = Block::from_file_with_recovery(
fd.as_ref(),
*handle,
crate::table::block::BlockIdentity {
table_id: table_id.table_id(),
block_type,
dict_id: compression.dict_id(),
window_log: 0,
},
&transform,
)?;
#[cfg(feature = "metrics")]
if let Some(kind) = recovery {
metrics.record_ecc_recovery(kind);
}
#[cfg(not(feature = "metrics"))]
let _ = recovery;
let corrected = matches!(ecc_status, crate::table::block::EccStatus::Corrected);
if block.header.block_type != block_type {
return Err(crate::Error::InvalidTag((
"BlockType",
block.header.block_type.into(),
)));
}
#[cfg(feature = "metrics")]
match block_type {
BlockType::Filter => {
metrics.filter_block_load_io.fetch_add(1, Relaxed);
metrics
.filter_block_io_requested
.fetch_add(handle.size().into(), Relaxed);
}
BlockType::Index => {
metrics.index_block_load_io.fetch_add(1, Relaxed);
metrics
.index_block_io_requested
.fetch_add(handle.size().into(), Relaxed);
}
BlockType::RangeTombstone => {
metrics.range_tombstone_block_load_io.fetch_add(1, Relaxed);
metrics
.range_tombstone_block_io_requested
.fetch_add(handle.size().into(), Relaxed);
}
BlockType::Data | BlockType::Meta | BlockType::Columnar => {
metrics.data_block_load_io.fetch_add(1, Relaxed);
metrics
.data_block_io_requested
.fetch_add(handle.size().into(), Relaxed);
}
BlockType::Manifest
| BlockType::ManifestFooter
| BlockType::BlockLayout
| BlockType::Locator
| BlockType::SeqnoBounds
| BlockType::ZoneMap
| BlockType::DeleteBitmap => {}
}
if corrected {
maybe_record_persistent_heal(
table_id,
path,
file_accessor,
handle,
block_type,
compression,
encryption,
ecc,
#[cfg(zstd_any)]
zstd_dict,
heal_hints,
#[cfg(feature = "metrics")]
metrics,
);
}
cache.insert_block(table_id, handle.offset(), block.clone());
Ok(block)
}
#[expect(
clippy::too_many_arguments,
reason = "mirrors load_block's decode context (id, cache, type, compression, encryption, ecc, dict)"
)]
pub fn decode_prewarmed_blocks(
table_id: GlobalTableId,
cache: &Cache,
handles: &[BlockHandle],
buffers: &[Vec<u8>],
block_type: BlockType,
compression: CompressionType,
encryption: Option<&dyn EncryptionProvider>,
ecc: Option<crate::table::block::EccParams>,
#[cfg(zstd_any)] zstd_dict: Option<&crate::compression::ZstdDictionary>,
) {
let Ok(transform) = build_block_transform(
compression,
encryption,
ecc,
#[cfg(zstd_any)]
zstd_dict,
) else {
return;
};
let identity = crate::table::block::BlockIdentity {
table_id: table_id.table_id(),
block_type,
dict_id: compression.dict_id(),
window_log: 0,
};
debug_assert!(
ecc.is_none(),
"prewarm must not cache ECC blocks: plan_prewarm gates ECC tables out, and \
from_reader would silently cache an ECC-corrected block as if it were clean"
);
for (handle, buf) in handles.iter().zip(buffers.iter()) {
let mut reader = crate::io::Cursor::new(buf.as_slice());
if let Ok(block) = Block::from_reader(&mut reader, identity, &transform)
&& block.header.block_type == block_type
{
cache.insert_block(table_id, handle.offset(), block);
}
}
}
#[expect(
clippy::too_many_arguments,
reason = "mirrors the block read context needed for the confirming re-read"
)]
pub(crate) fn maybe_record_persistent_heal(
table_id: GlobalTableId,
path: &Path,
file_accessor: &FileAccessor,
handle: &BlockHandle,
block_type: BlockType,
compression: CompressionType,
encryption: Option<&dyn EncryptionProvider>,
ecc: Option<crate::table::block::EccParams>,
#[cfg(zstd_any)] zstd_dict: Option<&crate::compression::ZstdDictionary>,
heal_hints: Option<&crate::heal_hints::HealHints>,
#[cfg(feature = "metrics")] metrics: &Metrics,
) -> bool {
let Some(hints) = heal_hints else {
return false;
};
if !hints.is_enabled() {
return false;
}
match reread_block_is_corrected(
table_id,
path,
file_accessor,
handle,
block_type,
compression,
encryption,
ecc,
#[cfg(zstd_any)]
zstd_dict,
) {
Ok(true) => {
if hints.record(table_id) {
#[cfg(feature = "metrics")]
metrics
.ecc_auto_heal_scheduled
.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
log::warn!(
"Persistent ECC correction on table {table_id:?} block {handle:?}; \
queued for healing recompaction"
);
return true;
}
false
}
Ok(false) => {
log::debug!(
"Transient ECC correction on table {table_id:?} block {handle:?}; \
re-read clean, not scheduling"
);
false
}
Err(e) => {
log::debug!(
"ECC re-read confirmation for table {table_id:?} block {handle:?} failed: {e:?}"
);
false
}
}
}
#[cfg(feature = "std")]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub(crate) enum BlockScrubOutcome {
Clean,
Corrected {
scheduled: bool,
},
}
#[cfg(feature = "std")]
#[expect(
clippy::too_many_arguments,
reason = "mirrors load_block's read context minus the cache"
)]
pub(crate) fn scrub_block(
table_id: GlobalTableId,
path: &Path,
file_accessor: &FileAccessor,
handle: &BlockHandle,
block_type: BlockType,
compression: CompressionType,
encryption: Option<&dyn EncryptionProvider>,
ecc: Option<crate::table::block::EccParams>,
#[cfg(zstd_any)] zstd_dict: Option<&crate::compression::ZstdDictionary>,
heal_hints: Option<&crate::heal_hints::HealHints>,
#[cfg(feature = "metrics")] metrics: &Metrics,
) -> crate::Result<BlockScrubOutcome> {
let (fd, _cache_event) = file_accessor.get_or_open_table(&table_id, path)?;
let transform = build_block_transform(
compression,
encryption,
ecc,
#[cfg(zstd_any)]
zstd_dict,
)?;
let (_block, ecc_status, recovery) = Block::from_file_with_recovery(
fd.as_ref(),
*handle,
crate::table::block::BlockIdentity {
table_id: table_id.table_id(),
block_type,
dict_id: compression.dict_id(),
window_log: 0,
},
&transform,
)?;
Ok(match ecc_status {
crate::table::block::EccStatus::Corrected => {
#[cfg(feature = "metrics")]
if let Some(kind) = recovery {
metrics.record_ecc_recovery(kind);
}
#[cfg(not(feature = "metrics"))]
let _ = recovery;
let scheduled = maybe_record_persistent_heal(
table_id,
path,
file_accessor,
handle,
block_type,
compression,
encryption,
ecc,
#[cfg(zstd_any)]
zstd_dict,
heal_hints,
#[cfg(feature = "metrics")]
metrics,
);
BlockScrubOutcome::Corrected { scheduled }
}
crate::table::block::EccStatus::Ok | crate::table::block::EccStatus::Unrecognized => {
BlockScrubOutcome::Clean
}
})
}
pub(crate) fn build_block_transform<'a>(
compression: CompressionType,
encryption: Option<&'a dyn EncryptionProvider>,
ecc: Option<crate::table::block::EccParams>,
#[cfg(zstd_any)] zstd_dict: Option<&'a crate::compression::ZstdDictionary>,
) -> crate::Result<crate::table::block::BlockTransform<'a>> {
let t = crate::table::block::BlockTransform::from_parts(
compression,
encryption,
#[cfg(zstd_any)]
zstd_dict,
)?;
Ok(if let Some(ecc) = ecc {
t.with_ecc(ecc)
} else {
t
})
}
#[expect(
clippy::too_many_arguments,
reason = "mirrors load_block's read context minus the cache"
)]
fn reread_block_is_corrected(
table_id: GlobalTableId,
path: &Path,
file_accessor: &FileAccessor,
handle: &BlockHandle,
block_type: BlockType,
compression: CompressionType,
encryption: Option<&dyn EncryptionProvider>,
ecc: Option<crate::table::block::EccParams>,
#[cfg(zstd_any)] zstd_dict: Option<&crate::compression::ZstdDictionary>,
) -> crate::Result<bool> {
let (fd, _cache_event) = file_accessor.get_or_open_table(&table_id, path)?;
let transform = build_block_transform(
compression,
encryption,
ecc,
#[cfg(zstd_any)]
zstd_dict,
)?;
let (_block, ecc_status) = Block::from_file_with_status(
fd.as_ref(),
*handle,
crate::table::block::BlockIdentity {
table_id: table_id.table_id(),
block_type,
dict_id: compression.dict_id(),
window_log: 0,
},
&transform,
)?;
Ok(matches!(
ecc_status,
crate::table::block::EccStatus::Corrected
))
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
cpufeatures::new!(cpu_avx512bw, "avx512bw");
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
cpufeatures::new!(cpu_avx2, "avx2");
#[cfg(target_arch = "x86")]
cpufeatures::new!(cpu_sse2, "sse2");
#[must_use]
pub fn longest_shared_prefix_length(s1: &[u8], s2: &[u8]) -> usize {
unsafe { resolve_lsp_kernel()(s1, s2) }
}
pub type LspKernel = unsafe fn(&[u8], &[u8]) -> usize;
#[must_use]
pub fn resolve_lsp_kernel() -> LspKernel {
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
{
select_lsp_kernel(LspCpuFeatures {
avx512bw: cpu_avx512bw::get(),
avx2: cpu_avx2::get(),
#[cfg(target_arch = "x86_64")]
sse2: true,
#[cfg(target_arch = "x86")]
sse2: cpu_sse2::get(),
})
}
#[cfg(all(target_arch = "aarch64", target_endian = "little"))]
{
lsp_neon
}
#[cfg(not(any(
target_arch = "x86_64",
target_arch = "x86",
all(target_arch = "aarch64", target_endian = "little")
)))]
{
lsp_scalar
}
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
#[derive(Clone, Copy)]
struct LspCpuFeatures {
avx512bw: bool,
avx2: bool,
sse2: bool,
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
fn select_lsp_kernel(f: LspCpuFeatures) -> LspKernel {
if f.avx512bw {
lsp_avx512
} else if f.avx2 {
lsp_avx2
} else if f.sse2 {
lsp_sse2
} else {
lsp_scalar
}
}
#[cfg_attr(
all(target_arch = "aarch64", target_endian = "little", not(test)),
expect(
dead_code,
reason = "portable LSP fallback; aarch64-LE always selects the NEON kernel \
(unit tests still call it directly, hence not(test))"
)
)]
#[must_use]
pub(crate) fn lsp_scalar(s1: &[u8], s2: &[u8]) -> usize {
let min_len = s1.len().min(s2.len());
let mut i = 0;
while i + 8 <= min_len {
#[expect(unsafe_code, reason = "bounds checked by loop guard above")]
let (a, b) = unsafe {
(
s1.as_ptr().add(i).cast::<u64>().read_unaligned(),
s2.as_ptr().add(i).cast::<u64>().read_unaligned(),
)
};
let diff = a ^ b;
if diff != 0 {
#[cfg(target_endian = "little")]
let byte_off = (diff.trailing_zeros() / 8) as usize;
#[cfg(target_endian = "big")]
let byte_off = (diff.leading_zeros() / 8) as usize;
return i + byte_off;
}
i += 8;
}
while i < min_len {
#[expect(unsafe_code, reason = "i < min_len bounds-checked above")]
let (a, b) = unsafe { (*s1.get_unchecked(i), *s2.get_unchecked(i)) };
if a != b {
return i;
}
i += 1;
}
min_len
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
#[target_feature(enable = "avx2")]
#[expect(unsafe_code, reason = "intrinsics require unsafe")]
#[must_use]
unsafe fn lsp_avx2(s1: &[u8], s2: &[u8]) -> usize {
#[cfg(target_arch = "x86")]
use core::arch::x86::{__m256i, _mm256_cmpeq_epi8, _mm256_loadu_si256, _mm256_movemask_epi8};
#[cfg(target_arch = "x86_64")]
use core::arch::x86_64::{
__m256i, _mm256_cmpeq_epi8, _mm256_loadu_si256, _mm256_movemask_epi8,
};
let min_len = s1.len().min(s2.len());
let mut i = 0;
while i + 32 <= min_len {
#[expect(
clippy::cast_ptr_alignment,
reason = "_mm256_loadu_si256 explicitly performs an unaligned 32-byte load"
)]
let (va, vb) = unsafe {
(
_mm256_loadu_si256(s1.as_ptr().add(i).cast::<__m256i>()),
_mm256_loadu_si256(s2.as_ptr().add(i).cast::<__m256i>()),
)
};
let cmp = _mm256_cmpeq_epi8(va, vb);
let mask = _mm256_movemask_epi8(cmp).cast_unsigned();
if mask != u32::MAX {
return i + (!mask).trailing_zeros() as usize;
}
i += 32;
}
while i < min_len {
let (a, b) = unsafe { (*s1.get_unchecked(i), *s2.get_unchecked(i)) };
if a != b {
return i;
}
i += 1;
}
min_len
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
#[target_feature(enable = "avx512bw,avx512f")]
#[expect(unsafe_code, reason = "intrinsics require unsafe")]
#[must_use]
unsafe fn lsp_avx512(s1: &[u8], s2: &[u8]) -> usize {
#[cfg(target_arch = "x86")]
use core::arch::x86::{__m512i, _mm512_cmpeq_epi8_mask, _mm512_loadu_si512};
#[cfg(target_arch = "x86_64")]
use core::arch::x86_64::{__m512i, _mm512_cmpeq_epi8_mask, _mm512_loadu_si512};
let min_len = s1.len().min(s2.len());
let mut i = 0;
while i + 64 <= min_len {
#[expect(
clippy::cast_ptr_alignment,
reason = "_mm512_loadu_si512 explicitly performs an unaligned 64-byte load"
)]
let (va, vb) = unsafe {
(
_mm512_loadu_si512(s1.as_ptr().add(i).cast::<__m512i>()),
_mm512_loadu_si512(s2.as_ptr().add(i).cast::<__m512i>()),
)
};
let mask = _mm512_cmpeq_epi8_mask(va, vb);
if mask != u64::MAX {
return i + (!mask).trailing_zeros() as usize;
}
i += 64;
}
while i < min_len {
let (a, b) = unsafe { (*s1.get_unchecked(i), *s2.get_unchecked(i)) };
if a != b {
return i;
}
i += 1;
}
min_len
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
#[target_feature(enable = "sse2")]
#[expect(unsafe_code, reason = "intrinsics require unsafe")]
#[must_use]
unsafe fn lsp_sse2(s1: &[u8], s2: &[u8]) -> usize {
#[cfg(target_arch = "x86")]
use core::arch::x86::{__m128i, _mm_cmpeq_epi8, _mm_loadu_si128, _mm_movemask_epi8};
#[cfg(target_arch = "x86_64")]
use core::arch::x86_64::{__m128i, _mm_cmpeq_epi8, _mm_loadu_si128, _mm_movemask_epi8};
let min_len = s1.len().min(s2.len());
let mut i = 0;
while i + 16 <= min_len {
#[expect(
clippy::cast_ptr_alignment,
reason = "_mm_loadu_si128 explicitly performs an unaligned 16-byte load"
)]
let (va, vb) = unsafe {
(
_mm_loadu_si128(s1.as_ptr().add(i).cast::<__m128i>()),
_mm_loadu_si128(s2.as_ptr().add(i).cast::<__m128i>()),
)
};
let cmp = _mm_cmpeq_epi8(va, vb);
let mask = _mm_movemask_epi8(cmp).cast_unsigned();
if mask != 0xFFFF {
return i + (!mask).trailing_zeros() as usize;
}
i += 16;
}
while i < min_len {
let (a, b) = unsafe { (*s1.get_unchecked(i), *s2.get_unchecked(i)) };
if a != b {
return i;
}
i += 1;
}
min_len
}
#[cfg(all(target_arch = "aarch64", target_endian = "little"))]
#[target_feature(enable = "neon")]
#[expect(unsafe_code, reason = "intrinsics require unsafe")]
#[must_use]
unsafe fn lsp_neon(s1: &[u8], s2: &[u8]) -> usize {
use core::arch::aarch64::{
vandq_u8, vceqq_u8, vdupq_n_u8, vgetq_lane_u64, vld1q_u8, vreinterpretq_u64_u8,
};
let min_len = s1.len().min(s2.len());
let mut i = 0;
while i + 16 <= min_len {
let (va, vb) = unsafe { (vld1q_u8(s1.as_ptr().add(i)), vld1q_u8(s2.as_ptr().add(i))) };
let cmp = vceqq_u8(va, vb);
let masked = vandq_u8(cmp, vdupq_n_u8(0xFF));
let as_u64 = vreinterpretq_u64_u8(masked);
let lo = vgetq_lane_u64(as_u64, 0);
let hi = vgetq_lane_u64(as_u64, 1);
if lo != u64::MAX {
return i + (!lo).trailing_zeros() as usize / 8;
}
if hi != u64::MAX {
return i + 8 + (!hi).trailing_zeros() as usize / 8;
}
i += 16;
}
while i < min_len {
let (a, b) = unsafe { (*s1.get_unchecked(i), *s2.get_unchecked(i)) };
if a != b {
return i;
}
i += 1;
}
min_len
}
#[must_use]
pub fn compare_prefixed_slice(
prefix: &[u8],
suffix: &[u8],
needle: &[u8],
cmp: &dyn crate::comparator::UserComparator,
) -> core::cmp::Ordering {
if cmp.is_lexicographic() {
return compare_prefixed_slice_lexicographic(prefix, suffix, needle);
}
let total_len = prefix.len() + suffix.len();
if total_len <= 256 {
let mut buf = [0_u8; 256];
#[expect(clippy::indexing_slicing, reason = "total_len <= 256 checked above")]
{
buf[..prefix.len()].copy_from_slice(prefix);
buf[prefix.len()..total_len].copy_from_slice(suffix);
}
#[expect(clippy::indexing_slicing, reason = "total_len <= 256 checked above")]
return cmp.compare(&buf[..total_len], needle);
}
let mut full_key = Vec::with_capacity(total_len);
full_key.extend_from_slice(prefix);
full_key.extend_from_slice(suffix);
cmp.compare(&full_key, needle)
}
#[must_use]
fn compare_prefixed_slice_lexicographic(
prefix: &[u8],
suffix: &[u8],
needle: &[u8],
) -> core::cmp::Ordering {
use core::cmp::Ordering::{Equal, Greater};
if needle.is_empty() {
let combined_len = prefix.len() + suffix.len();
return if combined_len > 0 { Greater } else { Equal };
}
let max_pfx_len = prefix.len().min(needle.len());
{
#[expect(
unsafe_code,
reason = "max_pfx_len <= prefix.len() && max_pfx_len <= needle.len()"
)]
let pfx = unsafe { prefix.get_unchecked(0..max_pfx_len) };
#[expect(
unsafe_code,
reason = "max_pfx_len <= prefix.len() && max_pfx_len <= needle.len()"
)]
let ndl = unsafe { needle.get_unchecked(0..max_pfx_len) };
match pfx.cmp(ndl) {
Equal => {}
ordering => return ordering,
}
}
let rest_len = prefix.len().saturating_sub(needle.len());
if rest_len > 0 {
return Greater;
}
#[expect(
unsafe_code,
reason = "max_pfx_len <= needle.len() guaranteed by rest_len == 0 guard above"
)]
let remaining_needle = unsafe { needle.get_unchecked(max_pfx_len..) };
suffix.cmp(remaining_needle)
}
#[cfg(test)]
mod tests;