use crate::{
CompressionType,
checksum::ChecksummedWriter,
encryption::EncryptionProvider,
table::{
index_block::KeyedBlockHandle,
writer::index::{BlockIndexWriter, FullIndexWriter, PartitionedIndexWriter},
},
};
use std::{
io::{Seek, Write},
sync::Arc,
};
pub const DEFAULT_SPILL_THRESHOLD: u64 = 4 * 1024 * 1024;
pub struct AdaptiveIndexWriter<W: Write + Seek + 'static> {
compression: CompressionType,
restart_interval: u8,
partition_size: u32,
encryption: Option<Arc<dyn EncryptionProvider>>,
table_id: crate::TableId,
ecc: Option<crate::table::block::EccParams>,
spill_threshold: u64,
buffer: Vec<KeyedBlockHandle>,
buffered_bytes: u64,
spilled: Option<Box<dyn BlockIndexWriter<W>>>,
}
impl<W: Write + Seek + 'static> AdaptiveIndexWriter<W> {
#[must_use]
pub fn new(spill_threshold: u64) -> Self {
Self {
compression: CompressionType::None,
restart_interval: 1,
partition_size: 4_096,
encryption: None,
table_id: 0,
ecc: None,
spill_threshold,
buffer: Vec::new(),
buffered_bytes: 0,
spilled: None,
}
}
fn configure(&self, inner: Box<dyn BlockIndexWriter<W>>) -> Box<dyn BlockIndexWriter<W>> {
inner
.use_compression(self.compression)
.use_restart_interval(self.restart_interval)
.use_partition_size(self.partition_size)
.use_encryption(self.encryption.clone())
.use_table_id(self.table_id)
.use_ecc(self.ecc)
}
fn spill(&mut self) -> crate::Result<()> {
let mut partitioned = self.configure(Box::new(PartitionedIndexWriter::new()));
for handle in self.buffer.drain(..) {
partitioned.register_data_block(handle)?;
}
self.buffered_bytes = 0;
self.spilled = Some(partitioned);
Ok(())
}
}
impl<W: Write + Seek + 'static> BlockIndexWriter<W> for AdaptiveIndexWriter<W> {
fn register_data_block(&mut self, block_handle: KeyedBlockHandle) -> crate::Result<()> {
if let Some(partitioned) = &mut self.spilled {
return partitioned.register_data_block(block_handle);
}
let entry_size =
(block_handle.end_key().len() + std::mem::size_of::<KeyedBlockHandle>()) as u64;
self.buffered_bytes += entry_size;
self.buffer.push(block_handle);
if self.buffered_bytes > self.spill_threshold {
self.spill()?;
}
Ok(())
}
fn finish(
self: Box<Self>,
file_writer: &mut crate::sfa::Writer<ChecksummedWriter<W>>,
) -> crate::Result<(usize, Vec<u8>)> {
let this = *self;
if let Some(partitioned) = this.spilled {
return partitioned.finish(file_writer);
}
let mut full = this.configure(Box::new(FullIndexWriter::new()));
for handle in this.buffer {
full.register_data_block(handle)?;
}
full.finish(file_writer)
}
fn use_compression(
mut self: Box<Self>,
compression: CompressionType,
) -> Box<dyn BlockIndexWriter<W>> {
self.compression = compression;
self
}
fn use_restart_interval(mut self: Box<Self>, interval: u8) -> Box<dyn BlockIndexWriter<W>> {
self.restart_interval = interval;
self
}
fn use_partition_size(mut self: Box<Self>, size: u32) -> Box<dyn BlockIndexWriter<W>> {
self.partition_size = size;
self
}
fn use_encryption(
mut self: Box<Self>,
encryption: Option<Arc<dyn EncryptionProvider>>,
) -> Box<dyn BlockIndexWriter<W>> {
self.encryption = encryption;
self
}
fn use_table_id(mut self: Box<Self>, table_id: crate::TableId) -> Box<dyn BlockIndexWriter<W>> {
self.table_id = table_id;
self
}
fn use_ecc(
mut self: Box<Self>,
ecc: Option<crate::table::block::EccParams>,
) -> Box<dyn BlockIndexWriter<W>> {
self.ecc = ecc;
self
}
}