use crate::{
CompressionType,
checksum::ChecksummedWriter,
encryption::EncryptionProvider,
table::{
Block, BlockHandle, BlockOffset, IndexBlock, index_block::KeyedBlockHandle,
writer::index::BlockIndexWriter,
},
};
use alloc::sync::Arc;
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, vec::Vec};
#[cfg(not(feature = "std"))]
use crate::io::{Seek, Write};
#[cfg(feature = "std")]
use std::io::{Seek, Write};
pub struct PartitionedIndexWriter {
relative_file_pos: u64,
compression: CompressionType,
restart_interval: u8,
tli_handles: Vec<KeyedBlockHandle>,
data_block_handles: Vec<KeyedBlockHandle>,
buffer_size: u32,
partition_size: u32,
index_block_count: usize,
block_buffer: Vec<u8>,
final_write_buffer: Vec<u8>,
encryption: Option<Arc<dyn EncryptionProvider>>,
table_id: crate::TableId,
ecc: Option<crate::table::block::EccParams>,
}
impl PartitionedIndexWriter {
pub fn new() -> Self {
Self {
relative_file_pos: 0,
buffer_size: 0,
index_block_count: 0,
partition_size: 4_096,
compression: CompressionType::None,
restart_interval: 1,
tli_handles: Vec::new(),
data_block_handles: Vec::new(),
block_buffer: Vec::with_capacity(4_096),
final_write_buffer: Vec::new(),
encryption: None,
table_id: 0,
ecc: None,
}
}
fn cut_index_block(&mut self) -> crate::Result<()> {
let mut bytes = vec![];
IndexBlock::encode_into_with_restart_interval(
&mut bytes,
&self.data_block_handles,
self.restart_interval,
)?;
let header = Block::write_into(
&mut self.block_buffer,
&bytes,
crate::table::block::BlockIdentity {
table_id: self.table_id,
block_type: crate::table::block::BlockType::Index,
dict_id: 0,
window_log: 0,
},
&{
let t = crate::table::block::BlockTransform::from_parts(
self.compression,
self.encryption.as_deref(),
#[cfg(zstd_any)]
None,
)?;
if let Some(ecc) = self.ecc {
t.with_ecc(ecc)
} else {
t
}
},
)?;
let bytes_written = header.on_disk_size_with(self.ecc);
#[expect(clippy::expect_used, reason = "chunk is not empty")]
let last = self
.data_block_handles
.pop()
.expect("Chunk should not be empty");
let index_block_handle = KeyedBlockHandle::new(
last.end_key().clone(),
last.seqno(),
BlockHandle::new(BlockOffset(self.relative_file_pos), bytes_written),
);
log::trace!(
"Built Bloom filter partition ({bytes_written}B) with end_key={:?} at +{:#X?}",
last.end_key(),
self.relative_file_pos,
);
self.tli_handles.push(index_block_handle);
self.final_write_buffer.append(&mut self.block_buffer);
self.index_block_count += 1;
self.relative_file_pos += u64::from(bytes_written);
self.data_block_handles.clear();
self.buffer_size = 0;
Ok(())
}
fn write_top_level_index<WR: Write + Seek>(
&mut self,
file_writer: &mut crate::sfa::Writer<ChecksummedWriter<WR>>,
index_base_offset: BlockOffset,
) -> crate::Result<Vec<u8>> {
file_writer.start("tli")?;
for item in &mut self.tli_handles {
item.shift(index_base_offset);
}
let mut bytes = vec![];
IndexBlock::encode_into_with_restart_interval(
&mut bytes,
&self.tli_handles,
self.restart_interval,
)?;
let header = Block::write_into(
file_writer,
&bytes,
crate::table::block::BlockIdentity {
table_id: self.table_id,
block_type: crate::table::block::BlockType::Index,
dict_id: 0,
window_log: 0,
},
&{
let t = crate::table::block::BlockTransform::from_parts(
self.compression,
self.encryption.as_deref(),
#[cfg(zstd_any)]
None,
)?;
if let Some(ecc) = self.ecc {
t.with_ecc(ecc)
} else {
t
}
},
)?;
let bytes_written = header.on_disk_size_with(self.ecc);
debug_assert!(bytes_written > 0, "Top level index should never be empty");
log::trace!(
"Written top level index, with {} pointers ({bytes_written} bytes)",
self.tli_handles.len(),
);
Ok(bytes)
}
}
impl<W: crate::io::Write + crate::io::Seek> BlockIndexWriter<W> for PartitionedIndexWriter {
fn use_encryption(
mut self: Box<Self>,
encryption: Option<Arc<dyn EncryptionProvider>>,
) -> Box<dyn BlockIndexWriter<W>> {
self.encryption = encryption;
self
}
fn use_partition_size(mut self: Box<Self>, size: u32) -> Box<dyn BlockIndexWriter<W>> {
self.partition_size = size;
self
}
fn use_restart_interval(mut self: Box<Self>, interval: u8) -> Box<dyn BlockIndexWriter<W>> {
self.restart_interval = interval;
self
}
fn use_table_id(mut self: Box<Self>, table_id: crate::TableId) -> Box<dyn BlockIndexWriter<W>> {
self.table_id = table_id;
self
}
fn use_ecc(
mut self: Box<Self>,
ecc: Option<crate::table::block::EccParams>,
) -> Box<dyn BlockIndexWriter<W>> {
self.ecc = ecc;
self
}
fn use_compression(
mut self: Box<Self>,
compression: CompressionType,
) -> Box<dyn BlockIndexWriter<W>> {
self.compression = compression;
self
}
fn register_data_block(&mut self, block_handle: KeyedBlockHandle) -> crate::Result<()> {
log::trace!(
"Registering block at {:?} with size {} [end_key={:?}]",
block_handle.offset(),
block_handle.size(),
block_handle.end_key(),
);
#[expect(
clippy::cast_possible_truncation,
reason = "key is u16 max, so we can not exceed u32::MAX"
)]
let block_handle_size =
(block_handle.end_key().len() + core::mem::size_of::<KeyedBlockHandle>()) as u32;
self.buffer_size += block_handle_size;
self.data_block_handles.push(block_handle);
if self.buffer_size >= self.partition_size {
self.cut_index_block()?;
}
Ok(())
}
fn finish(
mut self: Box<Self>,
file_writer: &mut crate::sfa::Writer<ChecksummedWriter<W>>,
) -> crate::Result<(usize, Vec<u8>)> {
if self.buffer_size > 0 {
self.cut_index_block()?;
}
let index_base_offset = BlockOffset(file_writer.get_mut().stream_position()?);
file_writer.start("index")?;
file_writer.write_all(&self.final_write_buffer)?;
log::trace!("Concatted index partitions onto blocks file");
let tli_bytes = self.write_top_level_index(file_writer, index_base_offset)?;
Ok((self.index_block_count, tli_bytes))
}
}