use crate::block::BlockContents;
use crate::block_builder::BlockBuilder;
use crate::blockhandle::BlockHandle;
use crate::cmp::InternalKeyCmp;
use crate::compressor::{self, Compressor, CompressorId};
use crate::error::Result;
use crate::filter::{InternalFilterPolicy, NoFilterPolicy};
use crate::filter_block::FilterBlockBuilder;
use crate::key_types::InternalKey;
use crate::log::mask_crc;
use crate::options::Options;
use std::cmp::Ordering;
use std::io::Write;
use std::rc::Rc;
use crc::crc32;
use crc::Hasher32;
use integer_encoding::FixedIntWriter;
pub const FOOTER_LENGTH: usize = 40;
pub const FULL_FOOTER_LENGTH: usize = FOOTER_LENGTH + 8;
pub const MAGIC_FOOTER_NUMBER: u64 = 0xdb4775248b80fb57;
pub const MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb];
pub const TABLE_BLOCK_COMPRESS_LEN: usize = 1;
pub const TABLE_BLOCK_CKSUM_LEN: usize = 4;
#[derive(Debug, Clone)]
pub struct Footer {
pub meta_index: BlockHandle,
pub index: BlockHandle,
}
impl Footer {
pub fn new(metaix: BlockHandle, index: BlockHandle) -> Footer {
Footer {
meta_index: metaix,
index,
}
}
pub fn decode(from: &[u8]) -> Option<Footer> {
assert!(from.len() >= FULL_FOOTER_LENGTH);
assert_eq!(&from[FOOTER_LENGTH..], &MAGIC_FOOTER_ENCODED);
let (meta, metalen) = BlockHandle::decode(&from[0..])?;
let (ix, _) = BlockHandle::decode(&from[metalen..])?;
Some(Footer {
meta_index: meta,
index: ix,
})
}
pub fn encode(&self, to: &mut [u8]) {
assert!(to.len() >= FOOTER_LENGTH + 8);
let s1 = self.meta_index.encode_to(to);
let s2 = self.index.encode_to(&mut to[s1..]);
for i in s1 + s2..FOOTER_LENGTH {
to[i] = 0;
}
for i in FOOTER_LENGTH..FULL_FOOTER_LENGTH {
to[i] = MAGIC_FOOTER_ENCODED[i - FOOTER_LENGTH];
}
}
}
pub struct TableBuilder<Dst: Write> {
opt: Options,
dst: Dst,
offset: usize,
num_entries: usize,
prev_block_last_key: Vec<u8>,
data_block: Option<BlockBuilder>,
index_block: Option<BlockBuilder>,
filter_block: Option<FilterBlockBuilder>,
}
impl<Dst: Write> TableBuilder<Dst> {
pub fn new_no_filter(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
opt.filter_policy = Rc::new(Box::new(NoFilterPolicy::new()));
TableBuilder::new(opt, dst)
}
}
impl<Dst: Write> TableBuilder<Dst> {
pub fn new(mut opt: Options, dst: Dst) -> TableBuilder<Dst> {
opt.cmp = Rc::new(Box::new(InternalKeyCmp(opt.cmp.clone())));
opt.filter_policy = Rc::new(Box::new(InternalFilterPolicy::new(opt.filter_policy)));
TableBuilder::new_raw(opt, dst)
}
pub fn new_raw(opt: Options, dst: Dst) -> TableBuilder<Dst> {
TableBuilder {
opt: opt.clone(),
dst,
offset: 0,
prev_block_last_key: vec![],
num_entries: 0,
data_block: Some(BlockBuilder::new(opt.clone())),
filter_block: Some(FilterBlockBuilder::new(opt.filter_policy.clone())),
index_block: Some(BlockBuilder::new(opt)),
}
}
pub fn entries(&self) -> usize {
self.num_entries
}
pub fn size_estimate(&self) -> usize {
let mut size = 0;
if let Some(ref b) = self.data_block {
size += b.size_estimate();
}
if let Some(ref b) = self.index_block {
size += b.size_estimate();
}
if let Some(ref b) = self.filter_block {
size += b.size_estimate();
}
size + self.offset + FULL_FOOTER_LENGTH
}
pub fn add<'a>(&mut self, key: InternalKey<'a>, val: &[u8]) -> Result<()> {
assert!(self.data_block.is_some());
if !self.prev_block_last_key.is_empty() {
assert!(self.opt.cmp.cmp(&self.prev_block_last_key, key) == Ordering::Less);
}
if self.data_block.as_ref().unwrap().size_estimate() > self.opt.block_size {
self.write_data_block(key)?;
}
let dblock = &mut self.data_block.as_mut().unwrap();
if let Some(ref mut fblock) = self.filter_block {
fblock.add_key(key);
}
self.num_entries += 1;
dblock.add(key, val);
Ok(())
}
fn write_data_block<'b>(&mut self, next_key: InternalKey<'b>) -> Result<()> {
assert!(self.data_block.is_some());
let block = self.data_block.take().unwrap();
let sep = self.opt.cmp.find_shortest_sep(&block.last_key(), next_key);
self.prev_block_last_key = Vec::from(block.last_key());
let contents = block.finish();
let compressor_list = self.opt.compressor_list.clone();
let compressor = compressor_list.get(self.opt.compressor)?;
let handle = self.write_block(contents, (self.opt.compressor, compressor))?;
let mut handle_enc = [0 as u8; 16];
let enc_len = handle.encode_to(&mut handle_enc);
self.index_block
.as_mut()
.unwrap()
.add(&sep, &handle_enc[0..enc_len]);
self.data_block = Some(BlockBuilder::new(self.opt.clone()));
if let Some(ref mut fblock) = self.filter_block {
fblock.start_block(self.offset);
}
Ok(())
}
fn write_block(
&mut self,
block: BlockContents,
compressor_id_pair: (u8, &Box<dyn Compressor>),
) -> Result<BlockHandle> {
let (ctype, compressor) = compressor_id_pair;
let data = compressor.encode(block)?;
let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
digest.write(&data);
digest.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN]);
self.dst.write(&data)?;
self.dst.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN])?;
self.dst.write_fixedint(mask_crc(digest.sum32()))?;
let handle = BlockHandle::new(self.offset, data.len());
self.offset += data.len() + TABLE_BLOCK_COMPRESS_LEN + TABLE_BLOCK_CKSUM_LEN;
Ok(handle)
}
pub fn finish(mut self) -> Result<usize> {
assert!(self.data_block.is_some());
let compressor_list = self.opt.compressor_list.clone();
let compressor_id_pair = (
self.opt.compressor,
compressor_list.get(self.opt.compressor)?,
);
if self.data_block.as_ref().unwrap().entries() > 0 {
let key_past_last = self
.opt
.cmp
.find_short_succ(self.data_block.as_ref().unwrap().last_key());
self.write_data_block(&key_past_last)?;
}
let mut meta_ix_block = BlockBuilder::new(self.opt.clone());
if self.filter_block.is_some() {
let fblock = self.filter_block.take().unwrap();
let filter_key = format!("filter.{}", fblock.filter_name());
let fblock_data = fblock.finish();
let fblock_handle = self.write_block(
fblock_data,
(
compressor::NoneCompressor::ID,
&(Box::new(compressor::NoneCompressor) as Box<dyn Compressor>),
),
)?;
let mut handle_enc = [0 as u8; 16];
let enc_len = fblock_handle.encode_to(&mut handle_enc);
meta_ix_block.add(filter_key.as_bytes(), &handle_enc[0..enc_len]);
}
let meta_ix = meta_ix_block.finish();
let meta_ix_handle = self.write_block(meta_ix, compressor_id_pair)?;
let index_cont = self.index_block.take().unwrap().finish();
let ix_handle = self.write_block(index_cont, compressor_id_pair)?;
let footer = Footer::new(meta_ix_handle, ix_handle);
let mut buf = [0; FULL_FOOTER_LENGTH];
footer.encode(&mut buf);
self.offset += self.dst.write(&buf[..])?;
self.dst.flush()?;
Ok(self.offset)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blockhandle::BlockHandle;
use crate::options;
#[test]
fn test_footer() {
let f = Footer::new(BlockHandle::new(44, 4), BlockHandle::new(55, 5));
let mut buf = [0; 48];
f.encode(&mut buf[..]);
let f2 = Footer::decode(&buf).unwrap();
assert_eq!(f2.meta_index.offset(), 44);
assert_eq!(f2.meta_index.size(), 4);
assert_eq!(f2.index.offset(), 55);
assert_eq!(f2.index.size(), 5);
}
#[test]
fn test_table_builder() {
let mut d = Vec::with_capacity(512);
let mut opt = options::for_test();
opt.block_restart_interval = 3;
opt.compressor = compressor::SnappyCompressor::ID;
let mut b = TableBuilder::new_raw(opt, &mut d);
let data = vec![
("abc", "def"),
("abe", "dee"),
("bcd", "asa"),
("dcc", "a00"),
];
let data2 = vec![
("abd", "def"),
("abf", "dee"),
("ccd", "asa"),
("dcd", "a00"),
];
for i in 0..data.len() {
b.add(&data[i].0.as_bytes(), &data[i].1.as_bytes()).unwrap();
b.add(&data2[i].0.as_bytes(), &data2[i].1.as_bytes())
.unwrap();
}
let estimate = b.size_estimate();
assert_eq!(143, estimate);
assert!(b.filter_block.is_some());
let actual = b.finish().unwrap();
assert_eq!(223, actual);
}
#[test]
#[should_panic]
fn test_bad_input() {
let mut d = Vec::with_capacity(512);
let mut opt = options::for_test();
opt.block_restart_interval = 3;
let mut b = TableBuilder::new_raw(opt, &mut d);
let data = vec![
("abc", "def"),
("abc", "dee"),
("bcd", "asa"),
("bsr", "a00"),
];
for &(k, v) in data.iter() {
b.add(k.as_bytes(), v.as_bytes()).unwrap();
}
b.finish().unwrap();
}
}