use std::cmp::Ordering;
use std::sync::Arc;
use crate::options::Options;
use crate::types::SSIterator;
use integer_encoding::FixedInt;
use integer_encoding::VarInt;
pub type BlockContents = Vec<u8>;
#[derive(Clone)]
pub struct Block {
block: Arc<BlockContents>,
opt: Options,
}
impl Block {
pub fn iter(&self) -> BlockIter {
let restarts = u32::decode_fixed(&self.block[self.block.len() - 4..]);
let restart_offset = self.block.len() - 4 - 4 * restarts as usize;
BlockIter {
block: self.block.clone(),
opt: self.opt.clone(),
offset: 0,
restarts_off: restart_offset,
current_entry_offset: 0,
current_restart_ix: 0,
key: Vec::new(),
val_offset: 0,
}
}
pub fn contents(&self) -> Arc<BlockContents> {
self.block.clone()
}
pub fn new(opt: Options, contents: BlockContents) -> Block {
assert!(contents.len() > 4);
Block {
block: Arc::new(contents),
opt: opt,
}
}
}
pub struct BlockIter {
block: Arc<BlockContents>,
opt: Options,
restarts_off: usize,
offset: usize,
current_entry_offset: usize,
current_restart_ix: usize,
key: Vec<u8>,
val_offset: usize,
}
impl BlockIter {
fn number_restarts(&self) -> usize {
u32::decode_fixed(&self.block[self.block.len() - 4..]) as usize
}
fn seek_to_restart_point(&mut self, ix: usize) {
let off = self.get_restart_point(ix);
self.offset = off;
self.current_entry_offset = off;
self.current_restart_ix = ix;
let (shared, non_shared, _, head_len) = self.parse_entry_and_advance();
assert_eq!(shared, 0);
self.assemble_key(off + head_len, shared, non_shared);
assert!(self.valid());
}
fn get_restart_point(&self, ix: usize) -> usize {
let restart = self.restarts_off + 4 * ix;
u32::decode_fixed(&self.block[restart..restart + 4]) as usize
}
fn parse_entry_and_advance(&mut self) -> (usize, usize, usize, usize) {
let mut i = 0;
let (shared, sharedlen) = usize::decode_var(&self.block[self.offset..]).unwrap();
i += sharedlen;
let (non_shared, non_sharedlen) =
usize::decode_var(&self.block[self.offset + i..]).unwrap();
i += non_sharedlen;
let (valsize, valsizelen) = usize::decode_var(&self.block[self.offset + i..]).unwrap();
i += valsizelen;
self.val_offset = self.offset + i + non_shared;
self.offset = self.val_offset + valsize;
(shared, non_shared, valsize, i)
}
fn assemble_key(&mut self, off: usize, shared: usize, non_shared: usize) {
self.key.truncate(shared);
self.key
.extend_from_slice(&self.block[off..off + non_shared]);
}
pub fn seek_to_last(&mut self) {
if self.number_restarts() > 0 {
let num_restarts = self.number_restarts();
self.seek_to_restart_point(num_restarts - 1);
} else {
self.reset();
}
while self.offset < self.restarts_off {
self.advance();
}
assert!(self.valid());
}
}
impl SSIterator for BlockIter {
fn advance(&mut self) -> bool {
if self.offset >= self.restarts_off {
self.reset();
return false;
} else {
self.current_entry_offset = self.offset;
}
let current_off = self.current_entry_offset;
let (shared, non_shared, _valsize, entry_head_len) = self.parse_entry_and_advance();
self.assemble_key(current_off + entry_head_len, shared, non_shared);
let num_restarts = self.number_restarts();
while self.current_restart_ix + 1 < num_restarts
&& self.get_restart_point(self.current_restart_ix + 1) < self.current_entry_offset
{
self.current_restart_ix += 1;
}
true
}
fn reset(&mut self) {
self.offset = 0;
self.val_offset = 0;
self.current_restart_ix = 0;
self.key.clear();
}
fn prev(&mut self) -> bool {
let orig_offset = self.current_entry_offset;
if orig_offset == 0 {
self.reset();
return false;
}
while self.get_restart_point(self.current_restart_ix) >= orig_offset {
if self.current_restart_ix == 0 {
self.offset = self.restarts_off;
self.current_restart_ix = self.number_restarts();
break;
}
self.current_restart_ix -= 1;
}
self.offset = self.get_restart_point(self.current_restart_ix);
assert!(self.offset < orig_offset);
let mut result;
loop {
result = self.advance();
if self.offset >= orig_offset {
break;
}
}
result
}
fn seek(&mut self, to: &[u8]) {
self.reset();
let mut left = 0;
let mut right = if self.number_restarts() == 0 {
0
} else {
self.number_restarts() - 1
};
while left < right {
let middle = (left + right + 1) / 2;
self.seek_to_restart_point(middle);
let c = self.opt.cmp.cmp(&self.key, to);
if c == Ordering::Less {
left = middle;
} else {
right = middle - 1;
}
}
assert_eq!(left, right);
self.current_restart_ix = left;
self.offset = self.get_restart_point(left);
while let Some((k, _)) = self.next() {
if self.opt.cmp.cmp(k.as_slice(), to) >= Ordering::Equal {
return;
}
}
}
fn valid(&self) -> bool {
!self.key.is_empty() && self.val_offset > 0 && self.val_offset <= self.restarts_off
}
fn current(&self, key: &mut Vec<u8>, val: &mut Vec<u8>) -> bool {
if self.valid() {
key.clear();
val.clear();
key.extend_from_slice(&self.key);
val.extend_from_slice(&self.block[self.val_offset..self.offset]);
true
} else {
false
}
}
fn current_key(&self) -> Option<&[u8]> {
if self.valid() {
Some(&self.key)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::block_builder::BlockBuilder;
use crate::test_util::{test_iterator_properties, SSIteratorIter};
use crate::types::{current_key_val, SSIterator};
fn get_data() -> Vec<(&'static [u8], &'static [u8])> {
vec![
("key1".as_bytes(), "value1".as_bytes()),
(
"loooooooooooooooooooooooooooooooooongerkey1".as_bytes(),
"shrtvl1".as_bytes(),
),
("medium length key 1".as_bytes(), "some value 2".as_bytes()),
("prefix_key1".as_bytes(), "value".as_bytes()),
("prefix_key2".as_bytes(), "value".as_bytes()),
("prefix_key3".as_bytes(), "value".as_bytes()),
]
}
#[test]
fn test_block_iterator_properties() {
let o = Options::default();
let mut builder = BlockBuilder::new(o.clone());
let mut data = get_data();
data.truncate(4);
for &(k, v) in data.iter() {
builder.add(k, v);
}
let block_contents = builder.finish();
let block = Block::new(o.clone(), block_contents).iter();
test_iterator_properties(block);
}
#[test]
fn test_block_empty() {
let mut o = Options::default();
o.block_restart_interval = 16;
let builder = BlockBuilder::new(o);
let blockc = builder.finish();
assert_eq!(blockc.len(), 8);
assert_eq!(blockc, vec![0, 0, 0, 0, 1, 0, 0, 0]);
let block = Block::new(Options::default(), blockc);
for _ in SSIteratorIter::wrap(&mut block.iter()) {
panic!("expected 0 iterations");
}
}
#[test]
fn test_block_build_iterate() {
let data = get_data();
let mut builder = BlockBuilder::new(Options::default());
for &(k, v) in data.iter() {
builder.add(k, v);
}
let block_contents = builder.finish();
let mut block = Block::new(Options::default(), block_contents).iter();
let mut i = 0;
assert!(!block.valid());
for (k, v) in SSIteratorIter::wrap(&mut block) {
assert_eq!(&k[..], data[i].0);
assert_eq!(v, data[i].1);
i += 1;
}
assert_eq!(i, data.len());
}
#[test]
fn test_block_iterate_reverse() {
let mut o = Options::default();
o.block_restart_interval = 3;
let data = get_data();
let mut builder = BlockBuilder::new(o.clone());
for &(k, v) in data.iter() {
builder.add(k, v);
}
let block_contents = builder.finish();
let mut block = Block::new(o.clone(), block_contents).iter();
assert!(!block.valid());
assert_eq!(
block.next(),
Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))
);
assert!(block.valid());
block.next();
assert!(block.valid());
block.prev();
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))
);
block.prev();
assert!(!block.valid());
while let Some(_) = block.next() {}
block.prev();
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
"prefix_key2".as_bytes().to_vec(),
"value".as_bytes().to_vec()
))
);
}
#[test]
fn test_block_seek() {
let mut o = Options::default();
o.block_restart_interval = 3;
let data = get_data();
let mut builder = BlockBuilder::new(o.clone());
for &(k, v) in data.iter() {
builder.add(k, v);
}
let block_contents = builder.finish();
let mut block = Block::new(o.clone(), block_contents).iter();
block.seek(&"prefix_key2".as_bytes());
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
"prefix_key2".as_bytes().to_vec(),
"value".as_bytes().to_vec()
))
);
block.seek(&"prefix_key0".as_bytes());
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
"prefix_key1".as_bytes().to_vec(),
"value".as_bytes().to_vec()
))
);
block.seek(&"key1".as_bytes());
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some(("key1".as_bytes().to_vec(), "value1".as_bytes().to_vec()))
);
block.seek(&"prefix_key3".as_bytes());
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
"prefix_key3".as_bytes().to_vec(),
"value".as_bytes().to_vec()
))
);
block.seek(&"prefix_key8".as_bytes());
assert!(!block.valid());
assert_eq!(current_key_val(&block), None);
}
#[test]
fn test_block_seek_to_last() {
let mut o = Options::default();
for block_restart_interval in vec![2, 6, 10] {
o.block_restart_interval = block_restart_interval;
let data = get_data();
let mut builder = BlockBuilder::new(o.clone());
for &(k, v) in data.iter() {
builder.add(k, v);
}
let block_contents = builder.finish();
let mut block = Block::new(o.clone(), block_contents).iter();
block.seek_to_last();
assert!(block.valid());
assert_eq!(
current_key_val(&block),
Some((
"prefix_key3".as_bytes().to_vec(),
"value".as_bytes().to_vec()
))
);
}
}
}