use crate::CompressionType;
use crate::fs::{Fs, FsOpenOptions, StdFs};
use crate::table::meta::ParsedMeta;
use crate::table::regions::ParsedRegions;
use std::path::Path;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct TableProperties {
pub id: u64,
pub file_size: u64,
pub min_key: Vec<u8>,
pub max_key: Vec<u8>,
pub item_count: u64,
pub tombstone_count: u64,
pub weak_tombstone_count: u64,
pub weak_tombstone_reclaimable: u64,
pub data_block_count: u64,
pub index_block_count: u64,
pub data_block_compression: CompressionType,
pub index_block_compression: CompressionType,
pub created_at_nanos: u128,
pub page_ecc: bool,
pub ecc_unrecognized: bool,
pub ecc_scheme: Option<EccSchemeInfo>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum EccSchemeInfo {
Shard {
data_shards: u8,
parity_shards: u8,
},
Secded,
}
impl EccSchemeInfo {
#[must_use]
pub fn parity_trailer_len(self, payload_len: usize) -> usize {
match self {
Self::Shard {
data_shards,
parity_shards,
} => {
let ds = usize::from(data_shards);
let ps = usize::from(parity_shards);
if ds == 0 || ps == 0 {
return 0;
}
let shard = payload_len.div_ceil(ds).div_ceil(2) * 2;
shard * ps
}
Self::Secded => payload_len.div_ceil(8),
}
}
}
#[cfg(feature = "std")]
pub fn read_table_properties(path: &Path) -> crate::Result<TableProperties> {
let fs = StdFs;
let mut file = fs.open(path, &FsOpenOptions::new().read(true))?;
let sfa_reader = crate::sfa::Reader::from_reader(&mut file)?;
let toc = sfa_reader.toc();
let regions = ParsedRegions::parse_from_toc(toc)?;
let meta = match ParsedMeta::load_with_handle(&*file, ®ions.metadata, None, None) {
Ok(m) => m,
Err(tail_err) => {
if let Some(mid_handle) = regions.metadata_mid {
match ParsedMeta::load_with_handle(&*file, &mid_handle, None, None) {
Ok(mid) => mid,
Err(_) => return Err(tail_err),
}
} else {
return Err(tail_err);
}
}
};
Ok(TableProperties {
id: meta.id,
file_size: meta.file_size,
min_key: meta.key_range.min().to_vec(),
max_key: meta.key_range.max().to_vec(),
item_count: meta.item_count,
tombstone_count: meta.tombstone_count,
weak_tombstone_count: meta.weak_tombstone_count,
weak_tombstone_reclaimable: meta.weak_tombstone_reclaimable,
data_block_count: meta.data_block_count,
index_block_count: meta.index_block_count,
data_block_compression: meta.data_block_compression,
index_block_compression: meta.index_block_compression,
created_at_nanos: *meta.created_at,
page_ecc: meta.page_ecc,
ecc_unrecognized: meta.ecc_unrecognized,
ecc_scheme: meta.ecc_params.map(ecc_scheme_info),
})
}
fn ecc_scheme_info(params: crate::table::block::EccParams) -> EccSchemeInfo {
match params {
crate::table::block::EccParams::Shard {
data_shards,
parity_shards,
} => EccSchemeInfo::Shard {
data_shards,
parity_shards,
},
crate::table::block::EccParams::Secded => EccSchemeInfo::Secded,
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct IndexEntry {
pub end_key: Vec<u8>,
pub seqno: u64,
pub offset: u64,
pub size: u32,
}
#[cfg(feature = "std")]
pub fn read_top_level_index_entries(path: &Path) -> crate::Result<Vec<IndexEntry>> {
use crate::table::block_index::iter::OwnedIndexBlockIter;
use crate::table::{IndexBlock, KeyedBlockHandle};
let fs = StdFs;
let mut file = fs.open(path, &FsOpenOptions::new().read(true))?;
let sfa_reader = crate::sfa::Reader::from_reader(&mut file)?;
let toc = sfa_reader.toc();
let regions = ParsedRegions::parse_from_toc(toc)?;
let meta = match ParsedMeta::load_with_handle(&*file, ®ions.metadata, None, None) {
Ok(m) => m,
Err(tail_err) => {
if let Some(mid_handle) = regions.metadata_mid {
match ParsedMeta::load_with_handle(&*file, &mid_handle, None, None) {
Ok(mid) => mid,
Err(_) => return Err(tail_err),
}
} else {
return Err(tail_err);
}
}
};
let index_compression = meta.index_block_compression;
let ecc = meta.ecc_params;
let table_id = meta.id;
let tli_block = if let Some(tail_handle) = regions.tli_tail {
match load_index_block(&*file, tail_handle, table_id, index_compression, ecc) {
Ok(b) => b,
Err(tail_err) => {
match load_index_block(&*file, regions.tli, table_id, index_compression, ecc) {
Ok(b) => b,
Err(_) => return Err(tail_err),
}
}
}
} else {
load_index_block(&*file, regions.tli, table_id, index_compression, ecc)?
};
let block = IndexBlock::new(tli_block);
let iter = OwnedIndexBlockIter::from_block(block, crate::comparator::default_comparator())?;
let entries = iter
.map(|h: KeyedBlockHandle| IndexEntry {
end_key: h.end_key().to_vec(),
seqno: h.seqno(),
offset: *h.offset(),
size: h.size(),
})
.collect();
Ok(entries)
}
#[cfg(feature = "std")]
fn load_index_block(
file: &dyn crate::fs::FsFile,
handle: crate::table::BlockHandle,
table_id: crate::table::TableId,
compression: CompressionType,
ecc: Option<crate::table::block::EccParams>,
) -> crate::Result<crate::table::Block> {
use crate::table::block::{Block, BlockIdentity, BlockType};
let block = Block::from_file(
file,
handle,
BlockIdentity {
table_id,
block_type: BlockType::Index,
dict_id: 0,
window_log: 0,
},
&{
let t = crate::table::block::BlockTransform::from_parts(
compression,
None,
#[cfg(zstd_any)]
None,
)?;
if let Some(ecc) = ecc {
t.with_ecc(ecc)
} else {
t
}
},
)?;
if block.header.block_type != BlockType::Index {
return Err(crate::Error::InvalidTag((
"BlockType",
block.header.block_type.into(),
)));
}
Ok(block)
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct DataEntry {
pub key: Vec<u8>,
pub value: Vec<u8>,
pub seqno: u64,
pub value_type: crate::ValueType,
}
impl DataEntry {
#[must_use]
pub fn is_tombstone(&self) -> bool {
self.value_type.is_tombstone()
}
}
#[cfg(feature = "std")]
pub struct DataBlockEntryIter {
file: Box<dyn crate::fs::FsFile>,
table_id: crate::table::TableId,
data_block_compression: CompressionType,
ecc: Option<crate::table::block::EccParams>,
has_kv_footer: bool,
remaining_handles: Vec<crate::table::BlockHandle>,
current: Option<crate::table::iter::OwnedDataBlockIter>,
keys_only: bool,
}
#[cfg(feature = "std")]
impl DataBlockEntryIter {
#[must_use]
pub const fn keys_only(mut self) -> Self {
self.keys_only = true;
self
}
}
#[cfg(feature = "std")]
impl Iterator for DataBlockEntryIter {
type Item = crate::Result<DataEntry>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(iter) = self.current.as_mut() {
if let Some(internal) = iter.next() {
let value = if self.keys_only {
Vec::new()
} else {
internal.value.to_vec()
};
let entry = DataEntry {
key: internal.key.user_key.to_vec(),
value,
seqno: internal.key.seqno,
value_type: internal.key.value_type,
};
return Some(Ok(entry));
}
self.current = None;
}
let handle = self.remaining_handles.pop()?;
match load_data_block_iter(
&*self.file,
&handle,
self.table_id,
self.data_block_compression,
self.ecc,
self.has_kv_footer,
) {
Ok(iter) => {
self.current = Some(iter);
}
Err(e) => {
self.remaining_handles.clear();
return Some(Err(e));
}
}
}
}
}
#[cfg(feature = "std")]
pub fn iter_data_block_entries(path: &Path) -> crate::Result<DataBlockEntryIter> {
use crate::table::IndexBlock;
use crate::table::block_index::iter::OwnedIndexBlockIter;
let fs = StdFs;
let mut file = fs.open(path, &FsOpenOptions::new().read(true))?;
let sfa_reader = crate::sfa::Reader::from_reader(&mut file)?;
let toc = sfa_reader.toc();
let regions = ParsedRegions::parse_from_toc(toc)?;
if regions.index.is_some() {
return Err(crate::Error::Io(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"partitioned-index SST (separate `index` section present) is not yet supported \
by iter_data_block_entries; walking sub-index leaves to enumerate data blocks \
is a follow-up surface",
)));
}
let meta = match ParsedMeta::load_with_handle(&*file, ®ions.metadata, None, None) {
Ok(m) => m,
Err(tail_err) => {
if let Some(mid_handle) = regions.metadata_mid {
match ParsedMeta::load_with_handle(&*file, &mid_handle, None, None) {
Ok(mid) => mid,
Err(_) => return Err(tail_err),
}
} else {
return Err(tail_err);
}
}
};
let table_id = meta.id;
let data_block_compression = meta.data_block_compression;
let index_compression = meta.index_block_compression;
let ecc = meta.ecc_params;
let tli_block = if let Some(tail_handle) = regions.tli_tail {
match load_index_block(&*file, tail_handle, table_id, index_compression, ecc) {
Ok(b) => b,
Err(tail_err) => {
match load_index_block(&*file, regions.tli, table_id, index_compression, ecc) {
Ok(b) => b,
Err(_) => return Err(tail_err),
}
}
}
} else {
load_index_block(&*file, regions.tli, table_id, index_compression, ecc)?
};
let block = IndexBlock::new(tli_block);
let iter = OwnedIndexBlockIter::from_block(block, crate::comparator::default_comparator())?;
let mut handles: Vec<crate::table::BlockHandle> = iter
.map(crate::table::KeyedBlockHandle::into_inner)
.collect();
handles.reverse();
Ok(DataBlockEntryIter {
file,
table_id,
data_block_compression,
ecc: meta.ecc_params,
has_kv_footer: meta.kv_checksum_algo.is_some(),
remaining_handles: handles,
current: None,
keys_only: false,
})
}
#[cfg(feature = "std")]
fn load_data_block_iter(
file: &dyn crate::fs::FsFile,
handle: &crate::table::BlockHandle,
table_id: crate::table::TableId,
compression: CompressionType,
ecc: Option<crate::table::block::EccParams>,
has_kv_footer: bool,
) -> crate::Result<crate::table::iter::OwnedDataBlockIter> {
use crate::table::DataBlock;
use crate::table::block::{Block, BlockIdentity, BlockType};
use crate::table::iter::OwnedDataBlockIter;
let block = Block::from_file(
file,
*handle,
BlockIdentity {
table_id,
block_type: BlockType::Data,
dict_id: 0,
window_log: 0,
},
&{
let t = crate::table::block::BlockTransform::from_parts(
compression,
None,
#[cfg(zstd_any)]
None,
)?;
if let Some(ecc) = ecc {
t.with_ecc(ecc)
} else {
t
}
},
)?;
if block.header.block_type != BlockType::Data {
return Err(crate::Error::InvalidTag((
"BlockType",
block.header.block_type.into(),
)));
}
let data_block = DataBlock::from_loaded(block, has_kv_footer)?;
OwnedDataBlockIter::try_new(data_block, |b| {
b.try_iter(crate::comparator::default_comparator())
})
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct FilterStats {
pub filter_section_bytes: u64,
pub layer_count: u64,
pub item_count: u64,
pub bits_per_key: f64,
}
#[cfg(feature = "std")]
pub fn read_filter_stats(path: &Path) -> crate::Result<Option<FilterStats>> {
use crate::table::block::{Block, BlockIdentity, BlockType};
use crate::table::filter::ribbon::burr::BurrFilterReader;
let fs = StdFs;
let mut file = fs.open(path, &FsOpenOptions::new().read(true))?;
let sfa_reader = crate::sfa::Reader::from_reader(&mut file)?;
let toc = sfa_reader.toc();
let regions = ParsedRegions::parse_from_toc(toc)?;
if regions.filter_tli.is_some() {
return Err(crate::Error::FeatureUnsupported("filter_tli"));
}
let Some(filter_handle) = regions.filter else {
return Ok(None);
};
let filter_section_bytes = u64::from(filter_handle.size());
let meta = match ParsedMeta::load_with_handle(&*file, ®ions.metadata, None, None) {
Ok(m) => m,
Err(tail_err) => {
if let Some(mid_handle) = regions.metadata_mid {
match ParsedMeta::load_with_handle(&*file, &mid_handle, None, None) {
Ok(mid) => mid,
Err(_) => return Err(tail_err),
}
} else {
return Err(tail_err);
}
}
};
let item_count = meta.item_count;
let table_id = meta.id;
let block = Block::from_file(
&*file,
filter_handle,
BlockIdentity {
table_id,
block_type: BlockType::Filter,
dict_id: 0,
window_log: 0,
},
&{
let t = crate::table::block::BlockTransform::PLAIN;
if let Some(ecc) = meta.ecc_params {
t.with_ecc(ecc)
} else {
t
}
},
)?;
if block.header.block_type != BlockType::Filter {
return Err(crate::Error::InvalidTag((
"BlockType",
block.header.block_type.into(),
)));
}
if block.data.is_empty() {
return Ok(None);
}
let layer_count: u64 = BurrFilterReader::new(&block.data)?.layer_count() as u64;
#[expect(
clippy::cast_precision_loss,
reason = "filter stats are diagnostic; precision loss above 2^53 keys is irrelevant"
)]
let denom = item_count.max(1) as f64;
#[expect(
clippy::cast_precision_loss,
reason = "filter stats are diagnostic; precision loss above 2^53 bytes is irrelevant"
)]
let bits = (filter_section_bytes * 8) as f64;
let bits_per_key = bits / denom;
Ok(Some(FilterStats {
filter_section_bytes,
layer_count,
item_count,
bits_per_key,
}))
}
const ZSTD_FRAME_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ZstdBlockType {
Raw,
Rle,
Compressed,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct InnerZstdBlock {
pub index: u32,
pub block_type: ZstdBlockType,
pub last: bool,
pub header_offset: u64,
pub content_len: u32,
pub decompressed_len: Option<u32>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ZstdFrameCensus {
pub frame_header_len: u8,
pub window_size: Option<u64>,
pub dictionary_id: Option<u32>,
pub frame_content_size: Option<u64>,
pub content_checksum: bool,
pub blocks: Vec<InnerZstdBlock>,
}
#[must_use]
pub fn is_zstd_frame(payload: &[u8]) -> bool {
payload.starts_with(&ZSTD_FRAME_MAGIC)
}
pub fn census_zstd_frame(payload: &[u8]) -> crate::Result<ZstdFrameCensus> {
use crate::Error::InvalidHeader;
if !payload.starts_with(&ZSTD_FRAME_MAGIC) {
return Err(InvalidHeader("zstd frame magic"));
}
let desc = *payload
.get(4)
.ok_or(InvalidHeader("zstd frame descriptor"))?;
let fcs_flag = desc >> 6;
let single_segment = (desc >> 5) & 1 == 1;
let content_checksum = (desc >> 2) & 1 == 1;
let dict_id_size = match desc & 0x3 {
0 => 0usize,
1 => 1,
2 => 2,
_ => 4,
};
let fcs_size = match fcs_flag {
0 => usize::from(single_segment),
1 => 2,
2 => 4,
_ => 8,
};
let mut pos = 5usize;
let window_from_descriptor = if single_segment {
None
} else {
let wd = *payload
.get(pos)
.ok_or(InvalidHeader("zstd window descriptor"))?;
pos += 1;
let exp = u64::from(wd >> 3);
let mantissa = u64::from(wd & 0x7);
let window_base = 1u64 << (10 + exp);
Some(window_base + (window_base / 8) * mantissa)
};
let mut dictionary_id = None;
if dict_id_size > 0 {
let bytes = payload
.get(pos..pos + dict_id_size)
.ok_or(InvalidHeader("zstd dictionary id"))?;
let mut id = 0u32;
for (i, &b) in bytes.iter().enumerate() {
id |= u32::from(b) << (8 * i);
}
if id != 0 {
dictionary_id = Some(id);
}
pos += dict_id_size;
}
let mut frame_content_size = None;
if fcs_size > 0 {
let bytes = payload
.get(pos..pos + fcs_size)
.ok_or(InvalidHeader("zstd frame content size"))?;
let mut fcs = 0u64;
for (i, &b) in bytes.iter().enumerate() {
fcs |= u64::from(b) << (8 * i);
}
if fcs_size == 2 {
fcs += 256;
}
frame_content_size = Some(fcs);
pos += fcs_size;
}
let frame_header_len =
u8::try_from(pos).map_err(|_| InvalidHeader("zstd frame header too long"))?;
let window_size = window_from_descriptor.or(frame_content_size);
let mut blocks = Vec::new();
let mut index = 0u32;
loop {
let &[b0, b1, b2] = payload
.get(pos..pos + 3)
.ok_or(InvalidHeader("zstd block header"))?
else {
return Err(InvalidHeader("zstd block header"));
};
let raw = u32::from(b0) | (u32::from(b1) << 8) | (u32::from(b2) << 16);
let last = raw & 1 == 1;
let block_size = raw >> 3; let header_offset = pos as u64;
pos += 3;
let (block_type, content_len, decompressed_len) = match (raw >> 1) & 0x3 {
0 => (ZstdBlockType::Raw, block_size, Some(block_size)),
1 => (ZstdBlockType::Rle, 1u32, Some(block_size)),
2 => (ZstdBlockType::Compressed, block_size, None),
_ => return Err(InvalidHeader("zstd reserved block type")),
};
let content_end = pos
.checked_add(content_len as usize)
.ok_or(InvalidHeader("zstd block content overflow"))?;
if content_end > payload.len() {
return Err(InvalidHeader("zstd block content truncated"));
}
pos = content_end;
blocks.push(InnerZstdBlock {
index,
block_type,
last,
header_offset,
content_len,
decompressed_len,
});
index += 1;
if last {
break;
}
}
Ok(ZstdFrameCensus {
frame_header_len,
window_size,
dictionary_id,
frame_content_size,
content_checksum,
blocks,
})
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::expect_used,
reason = "test code"
)]
mod census_tests {
use super::*;
const RLE_FRAME: [u8; 10] = [0x28, 0xB5, 0x2F, 0xFD, 0x20, 0x05, 0x2B, 0x00, 0x00, 0x41];
#[test]
fn is_zstd_frame_matches_only_on_magic() {
assert!(is_zstd_frame(&RLE_FRAME));
assert!(!is_zstd_frame(b"not a frame"));
assert!(!is_zstd_frame(&[0x28, 0xB5, 0x2F])); }
#[test]
fn census_zstd_frame_decodes_single_rle_block() {
let census = census_zstd_frame(&RLE_FRAME).unwrap();
assert_eq!(census.frame_header_len, 6);
assert_eq!(census.window_size, Some(5));
assert_eq!(census.frame_content_size, Some(5));
assert!(!census.content_checksum);
assert_eq!(census.blocks.len(), 1);
let b = &census.blocks[0];
assert_eq!(b.index, 0);
assert_eq!(b.block_type, ZstdBlockType::Rle);
assert!(b.last);
assert_eq!(b.header_offset, 6);
assert_eq!(b.content_len, 1);
assert_eq!(b.decompressed_len, Some(5));
}
#[test]
fn census_zstd_frame_rejects_non_zstd_payload() {
let err = census_zstd_frame(b"definitely not zstd").unwrap_err();
assert!(matches!(err, crate::Error::InvalidHeader(_)));
}
#[test]
fn census_zstd_frame_rejects_truncated_frame() {
let err = census_zstd_frame(&RLE_FRAME[..RLE_FRAME.len() - 1]).unwrap_err();
assert!(matches!(err, crate::Error::InvalidHeader(_)));
}
#[test]
fn census_zstd_frame_rejects_reserved_block_type() {
let mut frame = RLE_FRAME;
frame[6] = 0x2F;
let err = census_zstd_frame(&frame).unwrap_err();
assert!(matches!(err, crate::Error::InvalidHeader(_)));
}
#[test]
fn ecc_scheme_parity_trailer_len() {
let rs = EccSchemeInfo::Shard {
data_shards: 4,
parity_shards: 2,
};
assert!(rs.parity_trailer_len(4096) > 0);
assert_eq!(EccSchemeInfo::Secded.parity_trailer_len(64), 8);
assert_eq!(EccSchemeInfo::Secded.parity_trailer_len(65), 9);
}
#[cfg(zstd_any)]
mod with_zstd {
use super::*;
use crate::compression::CompressionProvider;
#[test]
fn census_real_frame_walks_to_last_block() {
let frame = crate::compression::ZstdBackend::compress(&vec![0x5Au8; 8192], 3).unwrap();
assert!(is_zstd_frame(&frame));
let census = census_zstd_frame(&frame).unwrap();
assert!(!census.blocks.is_empty());
assert!(census.frame_header_len >= 5);
assert_eq!(census.blocks.iter().filter(|b| b.last).count(), 1);
assert!(census.blocks.last().unwrap().last);
let mut prev = u64::from(census.frame_header_len);
for (i, b) in census.blocks.iter().enumerate() {
assert_eq!(b.index as usize, i);
assert!(b.header_offset >= prev);
let end = b.header_offset + 3 + u64::from(b.content_len);
assert!(end <= frame.len() as u64);
prev = end;
}
}
#[test]
fn census_real_frame_rejects_truncation() {
let frame = crate::compression::ZstdBackend::compress(&vec![0x7Eu8; 4096], 3).unwrap();
let truncated = &frame[..frame.len() - 8];
let err = census_zstd_frame(truncated).unwrap_err();
assert!(matches!(err, crate::Error::InvalidHeader(_)));
}
}
}