use std::cmp;
use std::cmp::Ordering;
use std::ops::Bound;
use std::sync::Arc;
use buffertk::{Packable, Unpacker, length_free, stack_pack, v64};
use super::{
Builder, CORRUPTION, Cursor, Error, KeyRef, KeyValueDel, KeyValueEntry, KeyValuePut,
LOGIC_ERROR, block_too_small, check_key_len, check_table_size, check_value_len,
corruption_binary_search_left_ne_right, corruption_block_with_zero_restarts,
corruption_offset_exceeds_restarts_boundary, corruption_restart_point_no_key_value_pair,
logic_error_next_not_positioned, logic_error_restart_idx_exceeds_num_restarts,
logic_error_tried_taking_negative_restart_idx, sort_order, unpack_block_restarts,
unpack_key_value_pair,
};
use crate::bounds_cursor::BoundsCursor;
use crate::pruning_cursor::PruningCursor;
#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "command_line", derive(arrrg_derive::CommandLine))]
pub struct BlockBuilderOptions {
#[cfg_attr(
feature = "command_line",
arrrg(optional, "Store a complete key every this many bytes.", "BYTES")
)]
bytes_restart_interval: u64,
#[cfg_attr(
feature = "command_line",
arrrg(optional, "Store a complete key every this many keys.", "KEYS")
)]
key_value_pairs_restart_interval: u64,
}
impl BlockBuilderOptions {
pub fn bytes_restart_interval(mut self, bytes_restart_interval: u32) -> Self {
self.bytes_restart_interval = bytes_restart_interval as u64;
self
}
pub fn key_value_pairs_restart_interval(
mut self,
key_value_pairs_restart_interval: u32,
) -> Self {
self.key_value_pairs_restart_interval = key_value_pairs_restart_interval as u64;
self
}
}
impl Default for BlockBuilderOptions {
fn default() -> Self {
Self {
bytes_restart_interval: 1024,
key_value_pairs_restart_interval: 16,
}
}
}
#[derive(Clone, Debug)]
pub struct Block {
bytes: Arc<Vec<u8>>,
restarts_boundary: usize,
restarts_idx: usize,
num_restarts: usize,
}
impl Block {
pub fn new(bytes: Vec<u8>) -> Result<Self, Error> {
let bytes = Arc::new(bytes);
if bytes.len() < 4 {
return Err(block_too_small(bytes.len(), 4));
}
let mut up = Unpacker::new(&bytes[bytes.len() - 4..]);
let num_restarts: u32 = up
.unpack()
.map_err(|e: buffertk::Error| unpack_block_restarts(e.into()))?;
let num_restarts: usize = num_restarts as usize;
let capstone: usize = 1 + 4;
let footer_body: usize = num_restarts * 4;
let footer_head: usize = 1 + v64::from(footer_body).pack_sz();
let restarts_idx = bytes.len() - capstone - footer_body;
let restarts_boundary = restarts_idx - footer_head;
let block = Block {
bytes,
restarts_boundary,
restarts_idx,
num_restarts,
};
Ok(block)
}
pub fn approximate_size(&self) -> usize {
self.bytes.len()
}
pub fn as_bytes(&self) -> &[u8] {
&self.bytes
}
pub fn cursor(&self) -> BlockCursor {
BlockCursor::new(self.clone())
}
fn restart_point(&self, restart_idx: usize) -> usize {
assert!(restart_idx < self.num_restarts);
let mut restart: [u8; 4] = <[u8; 4]>::default();
let bytes = &self.bytes;
for i in 0..4 {
restart[i] = bytes[self.restarts_idx + restart_idx * 4 + i];
}
u32::from_le_bytes(restart) as usize
}
fn restart_for_offset(&self, offset: usize) -> usize {
let mut left: usize = 0usize;
let mut right: usize = self.num_restarts - 1;
while left < right {
let mid = (left + right).div_ceil(2);
let value = self.restart_point(mid);
match offset.cmp(&value) {
Ordering::Less => {
right = mid - 1;
}
Ordering::Equal => {
left = mid;
right = mid;
}
Ordering::Greater => {
left = mid;
}
}
}
left
}
pub fn load(
&self,
key: &[u8],
timestamp: u64,
is_tombstone: &mut bool,
) -> Result<Option<Vec<u8>>, Error> {
*is_tombstone = false;
let mut cursor = self.cursor();
cursor.seek(key)?;
let target = KeyRef { key, timestamp };
while let Some(kr) = cursor.key() {
if kr >= target {
break;
} else {
cursor.next()?;
}
}
if let Some(kvr) = cursor.key_value() {
if kvr.key == key {
*is_tombstone = kvr.value.is_none();
Ok(kvr.value.as_ref().map(|v| v.to_vec()))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
pub fn range_scan<T: AsRef<[u8]>>(
&self,
start_bound: &Bound<T>,
end_bound: &Bound<T>,
timestamp: u64,
) -> Result<BoundsCursor<PruningCursor<BlockCursor>>, Error> {
let pruning = PruningCursor::new(self.cursor(), timestamp)?;
BoundsCursor::new(pruning, start_bound, end_bound)
}
}
#[derive(Clone, Debug)]
pub struct BlockBuilder {
options: BlockBuilderOptions,
buffer: Vec<u8>,
last_key: Vec<u8>,
last_timestamp: u64,
restarts: Vec<u32>,
bytes_since_restart: u64,
key_value_pairs_since_restart: u64,
}
impl BlockBuilder {
pub fn new(options: BlockBuilderOptions) -> Self {
let buffer = Vec::default();
let restarts = vec![0];
BlockBuilder {
options,
buffer,
last_key: Vec::default(),
last_timestamp: u64::MAX,
restarts,
bytes_since_restart: 0,
key_value_pairs_since_restart: 0,
}
}
fn should_restart(&self) -> bool {
self.options.bytes_restart_interval <= self.bytes_since_restart
|| self.options.key_value_pairs_restart_interval <= self.key_value_pairs_since_restart
}
fn compute_key_frag<'a>(&mut self, key: &'a [u8]) -> (usize, &'a [u8]) {
let shared = if !self.should_restart() {
let max_shared: usize = cmp::min(self.last_key.len(), key.len());
let mut shared = 0;
while shared < max_shared && key[shared] == self.last_key[shared] {
shared += 1;
}
shared
} else {
self.bytes_since_restart = 0;
self.key_value_pairs_since_restart = 0;
self.restarts.push(self.buffer.len() as u32);
0
};
(shared, &key[shared..])
}
fn append(&mut self, be: KeyValueEntry<'_>) -> Result<(), Error> {
self.last_key.truncate(be.shared());
self.last_key.extend_from_slice(be.key_frag());
self.last_timestamp = be.timestamp();
let pa = stack_pack(be);
assert!(self.buffer.len() + pa.pack_sz() <= u32::MAX as usize);
pa.append_to_vec(&mut self.buffer);
self.bytes_since_restart += pa.pack_sz() as u64;
self.key_value_pairs_since_restart += 1;
Ok(())
}
fn enforce_sort_order(&self, key: &[u8], timestamp: u64) -> Result<(), Error> {
if KeyRef::new(&self.last_key, self.last_timestamp).cmp(&KeyRef::new(key, timestamp))
!= Ordering::Less
{
Err(sort_order(
self.last_key.clone(),
self.last_timestamp,
key.to_vec(),
timestamp,
))
} else {
Ok(())
}
}
}
impl Builder for BlockBuilder {
type Sealed = Block;
fn approximate_size(&self) -> usize {
self.buffer.len() + 16 + self.restarts.len() * 4
}
fn put(&mut self, key: &[u8], timestamp: u64, value: &[u8]) -> Result<(), Error> {
check_key_len(key)?;
check_value_len(value)?;
check_table_size(self.approximate_size())?;
self.enforce_sort_order(key, timestamp)?;
let (shared, key_frag) = self.compute_key_frag(key);
let kvp = KeyValuePut {
shared: shared as u64,
key_frag,
timestamp,
value,
};
let be = KeyValueEntry::Put(kvp);
self.append(be)
}
fn del(&mut self, key: &[u8], timestamp: u64) -> Result<(), Error> {
check_key_len(key)?;
check_table_size(self.approximate_size())?;
self.enforce_sort_order(key, timestamp)?;
let (shared, key_frag) = self.compute_key_frag(key);
let kvp = KeyValueDel {
shared: shared as u64,
key_frag,
timestamp,
};
let be = KeyValueEntry::Del(kvp);
self.append(be)
}
fn seal(self) -> Result<Block, Error> {
let restarts = length_free(&self.restarts);
let tag10: v64 = ((10 << 3) | 2).into();
let tag11: v64 = ((11 << 3) | 5).into();
let sz: v64 = restarts.pack_sz().into();
let pa = stack_pack(tag10);
let pa = pa.pack(sz);
let pa = pa.pack(restarts);
let pa = pa.pack(tag11);
let pa = pa.pack(self.restarts.len() as u32);
let mut contents = self.buffer;
pa.append_to_vec(&mut contents);
Block::new(contents)
}
}
#[derive(Clone, Debug)]
enum CursorPosition {
First,
Last,
Positioned {
restart_idx: usize,
offset: usize,
next_offset: usize,
key: Vec<u8>,
timestamp: u64,
},
}
impl CursorPosition {
fn is_positioned(&self) -> bool {
match self {
CursorPosition::First => false,
CursorPosition::Last => false,
CursorPosition::Positioned { .. } => true,
}
}
}
impl PartialEq for CursorPosition {
fn eq(&self, rhs: &CursorPosition) -> bool {
match (&self, &rhs) {
(&CursorPosition::First, &CursorPosition::First) => true,
(&CursorPosition::Last, &CursorPosition::Last) => true,
(
&CursorPosition::Positioned {
restart_idx: ri1,
offset: o1,
next_offset: no1,
key: k1,
timestamp: t1,
},
&CursorPosition::Positioned {
restart_idx: ri2,
offset: o2,
next_offset: no2,
key: k2,
timestamp: t2,
},
) => ri1 == ri2 && o1 == o2 && no1 == no2 && k1 == k2 && t1 == t2,
_ => false,
}
}
}
#[derive(Clone, Debug)]
pub struct BlockCursor {
block: Block,
position: CursorPosition,
}
impl BlockCursor {
pub fn new(block: Block) -> Self {
BlockCursor {
block,
position: CursorPosition::First,
}
}
pub(crate) fn offset(&self) -> usize {
match &self.position {
CursorPosition::First => 0,
CursorPosition::Last => self.block.restarts_boundary,
CursorPosition::Positioned {
restart_idx: _,
offset,
next_offset: _,
key: _,
timestamp: _,
} => *offset,
}
}
pub(crate) fn next_offset(&self) -> usize {
match &self.position {
CursorPosition::First => 0,
CursorPosition::Last => self.block.restarts_boundary,
CursorPosition::Positioned {
restart_idx: _,
offset: _,
next_offset,
key: _,
timestamp: _,
} => *next_offset,
}
}
fn restart_idx(&self) -> usize {
match &self.position {
CursorPosition::First => 0,
CursorPosition::Last => self.block.num_restarts,
CursorPosition::Positioned {
restart_idx,
offset: _,
next_offset: _,
key: _,
timestamp: _,
} => *restart_idx,
}
}
fn seek_restart(&mut self, restart_idx: usize) -> Result<Option<KeyRef<'_>>, Error> {
if restart_idx >= self.block.num_restarts {
LOGIC_ERROR.click();
return Err(logic_error_restart_idx_exceeds_num_restarts(
restart_idx,
self.block.num_restarts,
));
}
let offset = self.block.restart_point(restart_idx);
if offset >= self.block.restarts_boundary {
CORRUPTION.click();
return Err(corruption_offset_exceeds_restarts_boundary(
offset,
self.block.restarts_boundary,
));
}
let prev_key = match self.position {
CursorPosition::First => Vec::new(),
CursorPosition::Last => Vec::new(),
CursorPosition::Positioned {
restart_idx: _,
offset: _,
next_offset: _,
ref mut key,
timestamp: _,
} => {
let mut ret = Vec::new();
key.truncate(0);
std::mem::swap(&mut ret, key);
ret
}
};
self.position = BlockCursor::extract_key(&self.block, offset, prev_key)?;
self.key_ref()
}
fn key_ref(&self) -> Result<Option<KeyRef<'_>>, Error> {
match &self.position {
CursorPosition::First => Ok(None),
CursorPosition::Last => Ok(None),
CursorPosition::Positioned {
restart_idx: _,
offset: _,
next_offset: _,
key,
timestamp,
} => Ok(Some(KeyRef {
key,
timestamp: *timestamp,
})),
}
}
fn extract_key(
block: &Block,
offset: usize,
mut key: Vec<u8>,
) -> Result<CursorPosition, Error> {
if offset >= block.restarts_boundary {
return Ok(CursorPosition::Last);
}
let bytes = &block.bytes;
let mut up = Unpacker::new(&bytes[offset..block.restarts_boundary]);
let be: KeyValueEntry = up.unpack().map_err(|e| unpack_key_value_pair(e, offset))?;
let next_offset = block.restarts_boundary - up.remain().len();
let restart_idx = block.restart_for_offset(offset);
key.truncate(be.shared());
key.extend_from_slice(be.key_frag());
Ok(CursorPosition::Positioned {
restart_idx,
offset,
next_offset,
key,
timestamp: be.timestamp(),
})
}
}
impl Cursor for BlockCursor {
fn seek_to_first(&mut self) -> Result<(), Error> {
self.position = CursorPosition::First;
Ok(())
}
fn seek_to_last(&mut self) -> Result<(), Error> {
self.position = CursorPosition::Last;
Ok(())
}
fn seek(&mut self, key: &[u8]) -> Result<(), Error> {
if self.block.num_restarts == 0 {
CORRUPTION.click();
return Err(corruption_block_with_zero_restarts());
}
let mut left: usize = 0usize;
let mut right: usize = self.block.num_restarts - 1;
while left < right {
let mid = left + (right - left).div_ceil(2);
let kvp = match self.seek_restart(mid)? {
Some(x) => x,
None => {
CORRUPTION.click();
return Err(corruption_restart_point_no_key_value_pair(mid));
}
};
match key.cmp(kvp.key) {
Ordering::Less => {
right = mid - 1;
}
Ordering::Equal => {
right = mid - 1;
}
Ordering::Greater => {
left = mid;
}
};
}
if left != right {
CORRUPTION.click();
return Err(corruption_binary_search_left_ne_right(left, right));
}
let kref = match self.seek_restart(left)? {
Some(x) => x,
None => {
CORRUPTION.click();
return Err(corruption_restart_point_no_key_value_pair(left));
}
};
let mut kref = Some(kref);
while let Some(x) = kref {
if key > x.key {
self.next()?;
kref = self.key_ref()?;
} else {
break;
}
}
Ok(())
}
fn prev(&mut self) -> Result<(), Error> {
let target_next_offset = match self.position {
CursorPosition::First => {
return Ok(());
}
CursorPosition::Last => self.block.restarts_boundary,
CursorPosition::Positioned {
restart_idx: _,
offset,
next_offset: _,
key: _,
timestamp: _,
} => offset,
};
if target_next_offset == 0 {
self.position = CursorPosition::First;
return Ok(());
}
let current_restart_idx = self.restart_idx();
let restart_idx = if current_restart_idx >= self.block.num_restarts
|| target_next_offset <= self.block.restart_point(current_restart_idx)
{
if current_restart_idx == 0 {
LOGIC_ERROR.click();
return Err(logic_error_tried_taking_negative_restart_idx());
}
current_restart_idx - 1
} else {
current_restart_idx
};
self.seek_restart(restart_idx)?;
while self.next_offset() < target_next_offset {
self.next()?;
}
Ok(())
}
fn next(&mut self) -> Result<(), Error> {
if let CursorPosition::First = self.position {
self.seek_restart(0)?;
return Ok(());
}
if let CursorPosition::Last = self.position {
return Ok(());
}
let offset = self.next_offset();
if offset >= self.block.restarts_boundary {
self.position = CursorPosition::Last;
return Ok(());
}
if self.restart_idx() + 1 < self.block.num_restarts
&& self.block.restart_point(self.restart_idx() + 1) <= offset
{
self.seek_restart(self.restart_idx() + 1)?;
return Ok(());
}
if !self.position.is_positioned() {
LOGIC_ERROR.click();
return Err(logic_error_next_not_positioned());
}
let prev_key = match self.position {
CursorPosition::First => Vec::new(),
CursorPosition::Last => Vec::new(),
CursorPosition::Positioned {
restart_idx: _,
offset: _,
next_offset: _,
ref mut key,
timestamp: _,
} => std::mem::take(key),
};
self.position = BlockCursor::extract_key(&self.block, offset, prev_key)?;
Ok(())
}
fn key(&self) -> Option<KeyRef<'_>> {
match &self.position {
CursorPosition::First => None,
CursorPosition::Last => None,
CursorPosition::Positioned {
restart_idx: _,
offset: _,
next_offset: _,
key,
timestamp,
} => Some(KeyRef {
key,
timestamp: *timestamp,
}),
}
}
fn value(&self) -> Option<&[u8]> {
match &self.position {
CursorPosition::First => None,
CursorPosition::Last => None,
CursorPosition::Positioned {
restart_idx: _,
offset,
next_offset: _,
key: _,
timestamp: _,
} => {
let bytes = &self.block.bytes;
let mut up = Unpacker::new(&bytes[*offset..self.block.restarts_boundary]);
let be: KeyValueEntry = up
.unpack()
.expect("already parsed this block with extract_key; corruption");
be.value()
}
}
}
}
impl From<Block> for BlockCursor {
fn from(table: Block) -> Self {
Self::new(table)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_empty_block() {
let builder = BlockBuilder::new(BlockBuilderOptions::default());
let block = builder.seal().unwrap();
let got: &[u8] = &block.bytes;
let exp: &[u8] = &[82, 4, 0, 0, 0, 0, 93, 1, 0, 0, 0];
assert_eq!(exp, got);
assert_eq!(11, got.len());
}
#[test]
fn build_single_item_block() {
let mut builder = BlockBuilder::new(BlockBuilderOptions::default());
builder
.put("key".as_bytes(), 0xc0ffee, "value".as_bytes())
.unwrap();
let block = builder.seal().unwrap();
let got: &[u8] = &block.bytes;
let exp = &[
66,
19,
8,
0,
18,
3,
107, 101, 121, 24,
238, 255, 131, 6, 34,
5,
118, 97, 108, 117, 101, 82,
4,
0, 0, 0, 0, 93,
1, 0, 0, 0,
];
assert_eq!(exp, got);
}
#[test]
fn build_prefix_compression() {
let mut builder = BlockBuilder::new(BlockBuilderOptions::default());
builder
.put("key1".as_bytes(), 0xc0ffee, "value1".as_bytes())
.unwrap();
builder
.put("key2".as_bytes(), 0xc0ffee, "value2".as_bytes())
.unwrap();
let block = builder.seal().unwrap();
let got: &[u8] = &block.bytes;
let exp = &[
66,
21,
8,
0, 18,
4,
107, 101, 121, 49, 24,
238, 255, 131, 6, 34,
6,
118, 97, 108, 117, 101, 49, 66,
18,
8,
3, 18,
1,
50, 24,
238, 255, 131, 6, 34,
6,
118, 97, 108, 117, 101, 50, 82,
4,
0, 0, 0, 0, 93,
1, 0, 0, 0,
];
assert_eq!(exp, got);
}
#[test]
fn load_restart_points() {
let block_bytes = &[
66,
21,
8,
0, 18,
4,
107, 101, 121, 49, 24,
238, 255, 131, 6, 34,
6,
118, 97, 108, 117, 101, 49, 66,
21,
8,
0, 18,
4,
107, 101, 121, 50, 24,
238, 255, 131, 6, 34,
6,
118, 97, 108, 117, 101, 50, 82,
8,
0, 0, 0, 0, 22, 0, 0, 0, 93,
2, 0, 0, 0,
];
let block = Block::new(block_bytes.to_vec()).unwrap();
assert_eq!(2, block.num_restarts);
assert_eq!(0, block.restart_point(0));
assert_eq!(22, block.restart_point(1));
}
#[test]
fn corruption_bug_gone() {
let key = &[107, 65, 118, 119, 82, 109, 53, 69];
let timestamp = 4092481979873166344;
let value = &[120, 100, 81, 80, 75, 79, 121, 90];
let mut builder = BlockBuilder::new(BlockBuilderOptions::default());
builder.put(key, timestamp, value).unwrap();
let block = builder.seal().unwrap();
let exp = &[
66,
32,
8,
0, 18,
8,
107, 65, 118, 119, 82, 109, 53, 69, 24,
136, 136, 156, 160, 216, 213, 218, 229, 56, 34,
8,
120, 100, 81, 80, 75, 79, 121, 90, 82,
4,
0, 0, 0, 0, 93,
1, 0, 0, 0,
];
let got: &[u8] = &block.bytes;
assert_eq!(exp, got);
let mut cursor = block.cursor();
cursor.seek(&[106, 113, 67, 73, 122, 73, 98, 85]).unwrap();
}
#[test]
fn seek_bug_gone() {
let key = "kAvwRm5E";
let timestamp = 4092481979873166344;
let value = "xdQPKOyZwQUykR8i";
let mut block = BlockBuilder::new(BlockBuilderOptions::default());
block
.put(key.as_bytes(), timestamp, value.as_bytes())
.unwrap();
let block = block.seal().unwrap();
let mut cursor = block.cursor();
let target = "jqCIzIbU";
cursor.seek(target.as_bytes()).unwrap();
let key: Vec<u8> = key.into();
let kvp = cursor.key_value().unwrap();
assert_eq!(&key, kvp.key);
assert_eq!(timestamp, kvp.timestamp);
}
#[test]
fn cursor_equals() {
let lhs = CursorPosition::First;
let rhs = CursorPosition::First;
assert_eq!(lhs, rhs);
let lhs = CursorPosition::Last;
let rhs = CursorPosition::Last;
assert_eq!(lhs, rhs);
let lhs = CursorPosition::Positioned {
restart_idx: 0,
offset: 0,
next_offset: 19,
key: "E".into(),
timestamp: 17563921251225492277,
};
let rhs = CursorPosition::Positioned {
restart_idx: 0,
offset: 0,
next_offset: 19,
key: "E".into(),
timestamp: 17563921251225492277,
};
assert_eq!(lhs, rhs);
}
#[test]
fn extract_key() {
let bytes = &[
66,
18,
8,
0, 18,
1,
69, 24,
181, 182, 235, 145, 160, 170, 229, 223, 243, 1, 34,
0,
66,
17,
8,
0, 18,
1,
107, 24,
136, 136, 156, 160, 216, 213, 218, 229, 56, 34,
0,
82,
4,
0, 0, 0, 0, 93,
1, 0, 0, 0,
];
let block = Block::new(bytes.to_vec()).unwrap();
let exp = CursorPosition::Positioned {
restart_idx: 0,
offset: 0,
next_offset: 20,
key: "E".into(),
timestamp: 17563921251225492277,
};
let got = BlockCursor::extract_key(&block, 0, Vec::new()).unwrap();
assert_eq!(exp, got);
let exp = CursorPosition::Positioned {
restart_idx: 0,
offset: 20,
next_offset: 39,
key: "k".into(),
timestamp: 4092481979873166344,
};
let got = BlockCursor::extract_key(&block, 20, Vec::new()).unwrap();
assert_eq!(exp, got);
let exp = CursorPosition::Last;
let got = BlockCursor::extract_key(&block, 39, Vec::new()).unwrap();
assert_eq!(exp, got);
}
}