use crate::comparator::SharedComparator;
use crate::table::DataBlock;
use crate::table::block::{Block, Decoder, Header, ParsedItem};
use crate::table::data_block::DataBlockParsedItem;
use crate::{Checksum, InternalValue, Slice, UserKey};
use std::sync::Arc;
use structured_zstd::decoding::{FrameDecoder, ResumeInput, ResumeState};
#[derive(Clone)]
pub struct PartialResume {
pub window_prime: Slice,
pub decoded_blocks: u32,
pub state: Option<Arc<ResumeState>>,
pub compressed_cursor: u64,
}
pub struct LazyBlock {
source: std::io::Cursor<Vec<u8>>,
decoder: FrameDecoder,
ends: Vec<u32>,
decoded_blocks: u32,
decompressed: Vec<u8>,
resume_state: Option<Arc<ResumeState>>,
compressed_cursor: u64,
}
impl LazyBlock {
pub fn new(frame: Vec<u8>, ends: Vec<u32>) -> crate::Result<Self> {
let mut source = std::io::Cursor::new(frame);
let mut decoder = FrameDecoder::new();
decoder
.reset(&mut source)
.map_err(|e| crate::Error::Io(crate::io::Error::other(e.to_string())))?;
Ok(Self {
source,
decoder,
ends,
decoded_blocks: 0,
decompressed: Vec::new(),
resume_state: None,
compressed_cursor: 0,
})
}
pub fn from_resume(frame: Vec<u8>, ends: Vec<u32>, resume: PartialResume) -> Self {
Self {
source: std::io::Cursor::new(frame),
decoder: FrameDecoder::new(),
ends,
decoded_blocks: resume.decoded_blocks,
decompressed: resume.window_prime.to_vec(),
resume_state: resume.state,
compressed_cursor: resume.compressed_cursor,
}
}
pub fn total_len(&self) -> usize {
self.ends.last().copied().unwrap_or(0) as usize
}
pub fn decoded(&self) -> &[u8] {
&self.decompressed
}
#[cfg_attr(
not(test),
expect(dead_code, reason = "laziness assertion helper, used in tests")
)]
pub fn decoded_blocks(&self) -> u32 {
self.decoded_blocks
}
pub fn resume_payload(&self) -> PartialResume {
PartialResume {
window_prime: Slice::from(self.decompressed.as_slice()),
decoded_blocks: self.decoded_blocks,
state: self.resume_state.clone(),
compressed_cursor: self.compressed_cursor,
}
}
pub fn ensure_decoded_to(&mut self, upto: usize) -> crate::Result<()> {
if upto <= self.decompressed.len() {
return Ok(());
}
let probe = upto.saturating_sub(1);
let target = self
.ends
.partition_point(|&e| (e as usize) <= probe)
.min(self.ends.len().saturating_sub(1));
#[expect(
clippy::cast_possible_truncation,
reason = "inner-block index is bounded by ends.len(), well within u32"
)]
let end_block = (target + 1) as u32;
if end_block <= self.decoded_blocks {
return Ok(());
}
self.source.set_position(0);
self.decoder
.reset(&mut self.source)
.map_err(|e| crate::Error::Io(crate::io::Error::other(e.to_string())))?;
let resuming = self.resume_state.is_some();
let pd = if let Some(state) = self.resume_state.as_deref() {
self.source.set_position(self.compressed_cursor);
self.decoder
.decode_blocks_partial(
&mut self.source,
state.block_index(),
end_block,
Some(ResumeInput {
window_prime: &self.decompressed,
state,
}),
true,
)
.map_err(|e| crate::Error::Io(crate::io::Error::other(e.to_string())))?
} else {
self.decoder
.decode_blocks_partial(&mut self.source, 0, end_block, None, true)
.map_err(|e| crate::Error::Io(crate::io::Error::other(e.to_string())))?
};
if let Some((idx, err)) = pd.stopped_at {
return Err(crate::Error::Io(crate::io::Error::other(format!(
"lazy partial decode stopped at inner block {idx}: {err:?}"
))));
}
let consumed = self.decoder.bytes_read_from_source();
if resuming {
self.decompressed.extend_from_slice(&pd.data);
self.compressed_cursor += consumed;
} else {
self.decompressed.clear();
self.decompressed.extend_from_slice(&pd.data);
self.compressed_cursor = consumed;
}
self.decoded_blocks = pd.start_block + pd.blocks_decoded;
self.resume_state = pd.resume_state.map(Arc::new);
Ok(())
}
}
fn synthesize_block_bytes(prefix: &[u8], restart_interval: u8) -> crate::Result<Vec<u8>> {
use crate::table::block::TRAILER_START_MARKER;
use crate::table::block::binary_index::Builder as BinaryIndexBuilder;
let probe = Block {
header: synthetic_header(prefix.len()),
data: Slice::from(prefix),
};
let (restart_offsets, item_count, entries_end) =
Decoder::<InternalValue, DataBlockParsedItem>::new_forward_headerless(
&probe,
restart_interval,
prefix.len(),
)
.scan_restart_offsets();
let mut out =
Vec::with_capacity(entries_end + 1 + restart_offsets.len() * 4 + TRAILER_FOOTER_SIZE);
out.extend_from_slice(prefix.get(..entries_end).unwrap_or(prefix));
out.push(TRAILER_START_MARKER);
#[expect(
clippy::cast_possible_truncation,
reason = "block offsets are far below u32::MAX"
)]
let binary_index_offset = out.len() as u32;
let mut bib = BinaryIndexBuilder::new(restart_offsets.len());
for off in restart_offsets {
bib.insert(off);
}
let (step_size, binary_index_len) = bib.write(&mut out)?;
out.push(restart_interval);
out.push(step_size);
#[expect(
clippy::cast_possible_truncation,
reason = "index pointers <= item count, far below u32::MAX"
)]
out.extend_from_slice(&(binary_index_len as u32).to_le_bytes());
out.extend_from_slice(&binary_index_offset.to_le_bytes());
out.extend_from_slice(&0u32.to_le_bytes()); out.extend_from_slice(&0u32.to_le_bytes()); out.push(1); out.push(0); out.extend_from_slice(&0u16.to_le_bytes()); out.push(0); out.extend_from_slice(&0u32.to_le_bytes()); #[expect(
clippy::cast_possible_truncation,
reason = "item count far below u32::MAX for a single block"
)]
out.extend_from_slice(&(item_count as u32).to_le_bytes());
Ok(out)
}
const TRAILER_FOOTER_SIZE: usize = 31;
fn synthetic_header(len: usize) -> Header {
use crate::table::block::BlockType;
#[expect(
clippy::cast_possible_truncation,
reason = "block length is far below u32::MAX"
)]
let len = len as u32;
Header {
block_type: BlockType::Data,
block_flags: 0,
checksum: Checksum::from_raw(0),
data_length: len,
uncompressed_length: len,
}
}
pub fn synthesize_data_block(prefix: &[u8], restart_interval: u8) -> crate::Result<DataBlock> {
let bytes = synthesize_block_bytes(prefix, restart_interval)?;
Ok(DataBlock::new(Block {
header: synthetic_header(bytes.len()),
data: Slice::from(bytes),
}))
}
pub fn partial_data_block(
frame: Vec<u8>,
ends: Vec<u32>,
restart_interval: u8,
comparator: &SharedComparator,
upper: &[u8],
resume: Option<PartialResume>,
) -> crate::Result<(DataBlock, Option<UserKey>, PartialResume)> {
let mut lazy = match resume {
Some(payload) => LazyBlock::from_resume(frame, ends, payload),
None => LazyBlock::new(frame, ends)?,
};
let total = lazy.total_len();
loop {
if lazy.decoded().len() >= total
|| (!lazy.decoded().is_empty()
&& prefix_reaches_past(lazy.decoded(), restart_interval, comparator, upper))
{
break;
}
let extent = if lazy.decoded().is_empty() {
(64 * 1024).min(total)
} else {
lazy.decoded().len().saturating_mul(2).min(total)
};
lazy.ensure_decoded_to(extent)?;
}
let covered_upper = last_complete_key(lazy.decoded(), restart_interval);
let block = synthesize_data_block(lazy.decoded(), restart_interval)?;
let payload = lazy.resume_payload();
Ok((block, covered_upper, payload))
}
fn last_complete_key(prefix: &[u8], restart_interval: u8) -> Option<UserKey> {
let probe = Block {
header: synthetic_header(prefix.len()),
data: Slice::from(prefix),
};
let (_offsets, _count, entries_end) =
Decoder::<InternalValue, DataBlockParsedItem>::new_forward_headerless(
&probe,
restart_interval,
prefix.len(),
)
.scan_restart_offsets();
Decoder::<InternalValue, DataBlockParsedItem>::new_forward_headerless(
&probe,
restart_interval,
entries_end,
)
.fold(None, |_, item| {
Some(item.materialize(&probe.data).key.user_key)
})
}
fn prefix_reaches_past(
prefix: &[u8],
restart_interval: u8,
comparator: &SharedComparator,
upper: &[u8],
) -> bool {
let probe = Block {
header: synthetic_header(prefix.len()),
data: Slice::from(prefix),
};
Decoder::<InternalValue, DataBlockParsedItem>::new_forward_headerless(
&probe,
restart_interval,
prefix.len(),
)
.any(|item| {
comparator.compare(item.materialize(&probe.data).key.user_key.as_ref(), upper)
== std::cmp::Ordering::Greater
})
}
#[cfg(test)]
#[expect(
clippy::expect_used,
clippy::indexing_slicing,
clippy::cast_possible_truncation,
reason = "test code"
)]
mod tests;