use std::convert::TryFrom;
use std::fs::File;
use std::io::BufWriter;
use std::io::{Seek, SeekFrom, Write};
use std::path::Path;
use bincode;
use bloomfilter::Bloom;
use super::compress_ctx_writer::*;
use super::compression;
use super::ondisk_format::*;
use super::options::*;
use super::poswriter::PosWriter;
use super::result::Result;
use super::types::*;
pub trait RawSSTableWriter {
fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()>;
fn close(self) -> Result<()>;
}
pub struct SSTableWriterV2 {
file: PosWriter<Box<dyn CompressionContextWriter<PosWriter<BufWriter<File>>>>>,
meta: MetaV3_0,
meta_start: u64,
data_start: u64,
flush_every: usize,
sparse_index: Vec<(Vec<u8>, u64)>,
bloom: Bloom<[u8]>,
}
impl SSTableWriterV2 {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::new_with_options(path, &WriteOptions::default())
}
pub fn new_with_options<P: AsRef<Path>>(path: P, options: &WriteOptions) -> Result<Self> {
let file = File::create(path)?;
let mut writer = PosWriter::new(BufWriter::new(file), 0);
writer.write_all(MAGIC)?;
bincode::serialize_into(&mut writer, &VERSION_30)?;
let meta_start = writer.current_offset() as u64;
let mut meta = MetaV3_0::default();
meta.compression = options.compression;
bincode::serialize_into(&mut writer, &meta)?;
let data_start = writer.current_offset() as u64;
let file: Box<dyn CompressionContextWriter<PosWriter<BufWriter<File>>>> =
match options.compression {
Compression::None => Box::new(UncompressedWriter::new(writer)),
Compression::Zlib => Box::new(CompressionContextWriterImpl::new(
writer,
compression::ZlibCompressorFactory::new(None),
)),
Compression::Snappy => Box::new(CompressionContextWriterImpl::new(
writer,
compression::SnappyCompressorFactory::new(),
)),
};
Ok(Self {
file: PosWriter::new(file, data_start),
meta,
meta_start,
data_start,
flush_every: options.flush_every,
sparse_index: Vec::new(),
bloom: Bloom::new(
options.bloom.bitmap_size as usize,
options.bloom.items_count,
),
})
}
pub fn finish(self) -> Result<()> {
match self {
SSTableWriterV2 {
file,
mut meta,
meta_start,
data_start,
sparse_index,
bloom,
..
} => {
let mut writer = file.into_inner();
let index_start = self.data_start + writer.reset_compression_context()? as u64;
for (key, offset) in sparse_index.into_iter() {
KVOffset::new(key.len(), offset)?.serialize_into(&mut writer)?;
writer.write_all(&key)?;
}
let bloom_start = self.data_start + writer.reset_compression_context()? as u64;
writer.write_all(&bloom.bitmap())?;
let end = self.data_start + writer.reset_compression_context()? as u64;
meta.finished = true;
meta.index_len = bloom_start - index_start;
meta.data_len = index_start - data_start;
meta.bloom_len = end - bloom_start;
meta.bloom.bitmap_bytes = u32::try_from(bloom.number_of_bits() / 8)?;
meta.bloom.k_num = bloom.number_of_hash_functions();
meta.bloom.sip_keys = bloom.sip_keys();
let mut writer = writer.into_inner()?.into_inner();
writer.seek(SeekFrom::Start(meta_start as u64))?;
bincode::serialize_into(&mut writer, &meta)?;
Ok(())
}
}
}
}
impl RawSSTableWriter for SSTableWriterV2 {
#[allow(clippy::collapsible_if)]
fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
let approx_msg_len = key.len() + 5 + value.len();
if self.meta.items == 0 {
self.sparse_index.push((key.to_owned(), self.data_start));
} else {
if self.file.current_offset() + approx_msg_len as u64 >= self.flush_every as u64 {
let total_offset =
self.data_start + self.file.get_mut().reset_compression_context()? as u64;
self.file.reset_offset(0);
self.sparse_index
.push((key.to_owned(), total_offset as u64));
}
}
self.bloom.set(key);
KVLength::new(key.len(), value.len())?.serialize_into(&mut self.file)?;
self.file.write_all(key)?;
self.file.write_all(value)?;
self.meta.items += 1;
Ok(())
}
fn close(self) -> Result<()> {
self.finish()
}
}