#![allow(clippy::precedence, clippy::verbose_bit_mask)]
use super::util;
use crate::storage::*;
use byteorder::{BigEndian, ByteOrder};
use bytes::{Bytes, BytesMut};
use futures::{future, prelude::*};
use std::{cmp::Ordering, convert::TryFrom, error, fmt, io};
use tokio::codec::{Decoder, FramedRead};
const _: usize = 0 - !(std::mem::size_of::<usize>() >= 32 >> 3) as usize;
#[derive(Clone)]
pub struct LogArray {
first: u32,
len: u32,
width: u8,
input_buf: Bytes,
}
#[derive(Debug, PartialEq)]
pub enum LogArrayError {
InputBufferTooSmall(usize),
WidthTooLarge(u8),
UnexpectedInputBufferSize(u64, u64, u32, u8),
}
impl LogArrayError {
fn validate_input_buf_size(input_buf_size: usize) -> Result<(), Self> {
if input_buf_size < 8 {
return Err(LogArrayError::InputBufferTooSmall(input_buf_size));
}
Ok(())
}
fn validate_len_and_width(input_buf_size: usize, len: u32, width: u8) -> Result<(), Self> {
if width > 64 {
return Err(LogArrayError::WidthTooLarge(width));
}
let expected_buf_size = u64::from(len) * u64::from(width) + 127 >> 6 << 3;
let input_buf_size = u64::try_from(input_buf_size).unwrap();
if input_buf_size != expected_buf_size {
return Err(LogArrayError::UnexpectedInputBufferSize(
input_buf_size,
expected_buf_size,
len,
width,
));
}
Ok(())
}
}
impl fmt::Display for LogArrayError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use LogArrayError::*;
match self {
InputBufferTooSmall(input_buf_size) => {
write!(f, "expected input buffer size ({}) >= 8", input_buf_size)
}
WidthTooLarge(width) => write!(f, "expected width ({}) <= 64", width),
UnexpectedInputBufferSize(input_buf_size, expected_buf_size, len, width) => write!(
f,
"expected input buffer size ({}) to be {} for {} elements and width {}",
input_buf_size, expected_buf_size, len, width
),
}
}
}
impl error::Error for LogArrayError {}
impl From<LogArrayError> for io::Error {
fn from(err: LogArrayError) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, err)
}
}
pub struct LogArrayIterator {
logarray: LogArray,
pos: usize,
end: usize,
}
impl Iterator for LogArrayIterator {
type Item = u64;
fn next(&mut self) -> Option<u64> {
if self.pos == self.end {
None
} else {
let result = self.logarray.entry(self.pos);
self.pos += 1;
Some(result)
}
}
}
fn read_control_word(buf: &[u8], input_buf_size: usize) -> Result<(u32, u8), LogArrayError> {
let len = BigEndian::read_u32(buf);
let width = buf[4];
LogArrayError::validate_len_and_width(input_buf_size, len, width)?;
Ok((len, width))
}
impl LogArray {
pub fn parse(input_buf: Bytes) -> Result<LogArray, LogArrayError> {
let input_buf_size = input_buf.len();
LogArrayError::validate_input_buf_size(input_buf_size)?;
let (len, width) = read_control_word(&input_buf[input_buf_size - 8..], input_buf_size)?;
Ok(LogArray {
first: 0,
len,
width,
input_buf,
})
}
pub fn len(&self) -> usize {
usize::try_from(self.len).unwrap()
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn width(&self) -> u8 {
self.width
}
pub fn entry(&self, index: usize) -> u64 {
assert!(
index < self.len(),
"expected index ({}) < length ({})",
index,
self.len
);
let bit_index = usize::from(self.width) * (usize::try_from(self.first).unwrap() + index);
let (first_word, second_word) = {
let byte_index = bit_index >> 6 << 3;
let buf = &self.input_buf;
let first_word = BigEndian::read_u64(&buf[byte_index..]);
let second_word = BigEndian::read_u64(&buf[byte_index + 8..]);
(first_word, second_word)
};
let leading_zeros = 64 - self.width;
let offset = (bit_index & 0b11_1111) as u8;
if offset + self.width <= 64 {
return first_word << offset >> leading_zeros;
}
let first_width = 64 - offset;
let second_width = self.width - first_width;
let first_part = first_word << offset >> offset << second_width;
let second_part = second_word >> 64 - second_width;
first_part | second_part
}
pub fn iter(&self) -> LogArrayIterator {
LogArrayIterator {
logarray: self.clone(),
pos: 0,
end: self.len(),
}
}
pub fn slice(&self, offset: usize, len: usize) -> LogArray {
let offset = u32::try_from(offset)
.unwrap_or_else(|_| panic!("expected 32-bit slice offset ({})", offset));
let len =
u32::try_from(len).unwrap_or_else(|_| panic!("expected 32-bit slice length ({})", len));
let slice_end = offset.checked_add(len).unwrap_or_else(|| {
panic!("overflow from slice offset ({}) + length ({})", offset, len)
});
assert!(
slice_end <= self.len,
"expected slice offset ({}) + length ({}) <= source length ({})",
offset,
len,
self.len
);
LogArray {
first: self.first + offset,
len,
width: self.width,
input_buf: self.input_buf.clone(),
}
}
}
pub struct LogArrayFileBuilder<W: tokio::io::AsyncWrite> {
file: W,
width: u8,
current: u64,
offset: u8,
count: u32,
}
impl<W: tokio::io::AsyncWrite> LogArrayFileBuilder<W> {
pub fn new(w: W, width: u8) -> LogArrayFileBuilder<W> {
LogArrayFileBuilder {
file: w,
width,
current: 0,
offset: 0,
count: 0,
}
}
pub fn count(&self) -> u32 {
self.count
}
pub fn push(self, val: u64) -> impl Future<Item = LogArrayFileBuilder<W>, Error = io::Error> {
let LogArrayFileBuilder {
file,
width,
current,
offset,
count,
} = self;
let leading_zeros = 64 - width;
future::result(if val.leading_zeros() < u32::from(leading_zeros) {
Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("expected value ({}) to fit in {} bits", val, width),
))
} else {
Ok(())
})
.and_then(move |_| {
let count = count + 1;
let current = current | val << leading_zeros >> offset;
let offset = offset + width;
if offset >= 64 {
future::Either::A(util::write_u64(file, current).map(move |file| {
let offset = offset - 64;
let current = if offset == 0 {
0
} else {
val << 64 - offset
};
LogArrayFileBuilder {
file,
width,
count,
current,
offset,
}
}))
} else {
future::Either::B(future::ok(LogArrayFileBuilder {
file,
width,
count,
current,
offset,
}))
}
})
}
pub fn push_all<S: Stream<Item = u64, Error = io::Error>>(
self,
vals: S,
) -> impl Future<Item = LogArrayFileBuilder<W>, Error = io::Error> {
vals.fold(self, |x, val| x.push(val))
}
fn finalize_data(self) -> impl Future<Item = W, Error = io::Error> {
if u64::from(self.count) * u64::from(self.width) & 0b11_1111 == 0 {
future::Either::A(future::ok(self.file))
} else {
future::Either::B(util::write_u64(self.file, self.current))
}
}
pub fn finalize(self) -> impl Future<Item = W, Error = io::Error> {
let len = self.count;
let width = self.width;
self.finalize_data()
.and_then(move |file| {
let mut buf = [0; 8];
BigEndian::write_u32(&mut buf, len);
buf[4] = width;
util::write_all(file, buf)
})
.and_then(tokio::io::flush)
}
}
struct LogArrayDecoder {
current: u64,
width: u8,
offset: u8,
remaining: u32,
}
impl fmt::Debug for LogArrayDecoder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LogArrayDecoder {{ current: ")?;
write!(f, "{:#066b}", self.current)?;
write!(f, ", width: ")?;
write!(f, "{:?}", self.width)?;
write!(f, ", offset: ")?;
write!(f, "{:?}", self.offset)?;
write!(f, ", remaining: ")?;
write!(f, "{:?}", self.remaining)?;
write!(f, " }}")
}
}
impl LogArrayDecoder {
fn new_unchecked(width: u8, remaining: u32) -> Self {
LogArrayDecoder {
current: 0,
offset: 64,
width,
remaining,
}
}
}
impl Decoder for LogArrayDecoder {
type Item = u64;
type Error = io::Error;
fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<u64>, io::Error> {
if self.remaining == 0 {
bytes.clear();
return Ok(None);
}
let first_word = self.current;
let offset = self.offset;
let width = self.width;
let leading_zeros = 64 - width;
if offset + width <= 64 {
self.offset += width;
self.remaining -= 1;
return Ok(Some(first_word << offset >> leading_zeros));
}
if bytes.len() < 8 {
return Ok(None);
}
let second_word = BigEndian::read_u64(&bytes.split_to(8));
self.current = second_word;
self.remaining -= 1;
if offset == 64 {
self.offset = width;
return Ok(Some(second_word >> leading_zeros));
}
let first_width = 64 - offset;
let second_width = width - first_width;
let first_part = first_word << offset >> offset << second_width;
let second_part = second_word >> 64 - second_width;
self.offset = second_width;
Ok(Some(first_part | second_part))
}
}
pub fn logarray_file_get_length_and_width<F: FileLoad>(
f: F,
) -> impl Future<Item = (F, u32, u8), Error = io::Error> {
LogArrayError::validate_input_buf_size(f.size())
.map_or_else(|e| Err(e.into()), |_| Ok(f))
.into_future()
.and_then(|f| {
tokio::io::read_exact(f.open_read_from(f.size() - 8), [0; 8]).map(|(_, buf)| (f, buf))
})
.and_then(|(f, control_word)| {
read_control_word(&control_word, f.size())
.map_or_else(|e| Err(e.into()), |(len, width)| Ok((f, len, width)))
.into_future()
})
}
pub fn logarray_stream_entries<F: FileLoad>(f: F) -> impl Stream<Item = u64, Error = io::Error> {
logarray_file_get_length_and_width(f)
.map(|(f, len, width)| {
FramedRead::new(f.open_read(), LogArrayDecoder::new_unchecked(width, len))
})
.into_stream()
.flatten()
}
#[derive(Clone)]
pub struct MonotonicLogArray(LogArray);
impl MonotonicLogArray {
pub fn from_logarray(logarray: LogArray) -> MonotonicLogArray {
if cfg!(debug_assertions) {
let mut iter = logarray.iter();
if let Some(mut pred) = iter.next() {
for succ in iter {
assert!(
pred <= succ,
"not monotonic: expected predecessor ({}) <= successor ({})",
pred,
succ
);
pred = succ;
}
}
}
MonotonicLogArray(logarray)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn entry(&self, index: usize) -> u64 {
self.0.entry(index)
}
pub fn iter(&self) -> LogArrayIterator {
self.0.iter()
}
pub fn index_of(&self, element: u64) -> Option<usize> {
if self.is_empty() {
return None;
}
let mut min = 0;
let mut max = self.len() - 1;
while min <= max {
let mid = (min + max) / 2;
match element.cmp(&self.entry(mid)) {
Ordering::Equal => return Some(mid),
Ordering::Greater => min = mid + 1,
Ordering::Less => {
if mid == 0 {
return None;
}
max = mid - 1
}
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::memory::*;
use futures::stream;
#[test]
fn log_array_error() {
assert_eq!(
"expected input buffer size (7) >= 8",
LogArrayError::InputBufferTooSmall(7).to_string()
);
assert_eq!(
"expected width (69) <= 64",
LogArrayError::WidthTooLarge(69).to_string()
);
assert_eq!(
"expected input buffer size (9) to be 8 for 0 elements and width 17",
LogArrayError::UnexpectedInputBufferSize(9, 8, 0, 17).to_string()
);
assert_eq!(
io::Error::new(
io::ErrorKind::InvalidData,
LogArrayError::InputBufferTooSmall(7)
)
.to_string(),
io::Error::from(LogArrayError::InputBufferTooSmall(7)).to_string()
);
}
#[test]
fn validate_input_buf_size() {
let val = |buf_size| LogArrayError::validate_input_buf_size(buf_size);
let err = |buf_size| Err(LogArrayError::InputBufferTooSmall(buf_size));
assert_eq!(err(7), val(7));
assert_eq!(Ok(()), val(8));
assert_eq!(Ok(()), val(9));
assert_eq!(Ok(()), val(usize::max_value()));
}
#[test]
fn validate_len_and_width() {
let val =
|buf_size, len, width| LogArrayError::validate_len_and_width(buf_size, len, width);
let err = |width| Err(LogArrayError::WidthTooLarge(width));
assert_eq!(err(65), val(0, 0, 65));
let err = |buf_size, expected, len, width| {
Err(LogArrayError::UnexpectedInputBufferSize(
buf_size, expected, len, width,
))
};
assert_eq!(err(0, 8, 0, 0), val(0, 0, 0));
assert_eq!(Ok(()), val(8, 0, 1));
assert_eq!(err(9, 8, 0, 1), val(9, 0, 1));
assert_eq!(Ok(()), val(16, 1, 1));
assert_eq!(Ok(()), val(16, 1, 64));
assert_eq!(err(16, 24, 2, 64), val(16, 2, 64));
assert_eq!(err(24, 16, 1, 64), val(24, 1, 64));
#[cfg(target_pointer_width = "64")]
assert_eq!(
Ok(()),
val(
usize::try_from(u64::from(u32::max_value()) + 1 << 3).unwrap(),
u32::max_value(),
64
)
);
assert_eq!(err(16, 24, 13, 5), val(16, 13, 5));
assert_eq!(Ok(()), val(24, 13, 5));
}
#[test]
pub fn empty() {
let logarray = LogArray::parse(Bytes::from([0u8; 8].as_ref())).unwrap();
assert!(logarray.is_empty());
assert!(MonotonicLogArray::from_logarray(logarray).is_empty());
}
#[test]
#[should_panic(expected = "expected value (8) to fit in 3 bits")]
fn log_array_file_builder_panic() {
let store = MemoryBackedStore::new();
let builder = LogArrayFileBuilder::new(store.open_write(), 3);
builder.push(8).wait().unwrap();
}
#[test]
fn generate_then_parse_works() {
let store = MemoryBackedStore::new();
let builder = LogArrayFileBuilder::new(store.open_write(), 5);
builder
.push_all(stream::iter_ok(vec![1, 3, 2, 5, 12, 31, 18]))
.and_then(|b| b.finalize())
.wait()
.unwrap();
let content = store.map().wait().unwrap();
let logarray = LogArray::parse(content).unwrap();
assert_eq!(1, logarray.entry(0));
assert_eq!(3, logarray.entry(1));
assert_eq!(2, logarray.entry(2));
assert_eq!(5, logarray.entry(3));
assert_eq!(12, logarray.entry(4));
assert_eq!(31, logarray.entry(5));
assert_eq!(18, logarray.entry(6));
}
const TEST0_DATA: [u8; 8] = [
0b00000000,
0b00000000,
0b1_0000000,
0b00000000,
0b10_000000,
0b00000000,
0b011_00000,
0b00000000,
];
const TEST0_CONTROL: [u8; 8] = [0, 0, 0, 3, 17, 0, 0, 0];
const TEST1_DATA: [u8; 8] = [
0b0100_0000,
0b00000000,
0b00101_000,
0b00000000,
0b000110_00,
0b00000000,
0b0000111_0,
0b00000000,
];
fn test0_logarray() -> LogArray {
let mut content = Vec::new();
content.extend_from_slice(&TEST0_DATA);
content.extend_from_slice(&TEST0_CONTROL);
LogArray::parse(Bytes::from(content)).unwrap()
}
#[test]
#[should_panic(expected = "expected index (3) < length (3)")]
fn entry_panic() {
let _ = test0_logarray().entry(3);
}
#[test]
#[should_panic(expected = "expected slice offset (2) + length (2) <= source length (3)")]
fn slice_panic1() {
let _ = test0_logarray().slice(2, 2);
}
#[test]
#[should_panic(expected = "expected 32-bit slice offset (4294967296)")]
#[cfg(target_pointer_width = "64")]
fn slice_panic2() {
let _ = test0_logarray().slice(usize::try_from(u32::max_value()).unwrap() + 1, 2);
}
#[test]
#[should_panic(expected = "expected 32-bit slice length (4294967296)")]
#[cfg(target_pointer_width = "64")]
fn slice_panic3() {
let _ = test0_logarray().slice(0, usize::try_from(u32::max_value()).unwrap() + 1);
}
#[test]
#[should_panic(expected = "overflow from slice offset (4294967295) + length (1)")]
fn slice_panic4() {
let _ = test0_logarray().slice(usize::try_from(u32::max_value()).unwrap(), 1);
}
#[test]
#[should_panic(expected = "expected index (2) < length (2)")]
fn slice_entry_panic() {
let _ = test0_logarray().slice(1, 2).entry(2);
}
#[test]
#[should_panic(expected = "not monotonic: expected predecessor (2) <= successor (1)")]
fn monotonic_panic() {
let content = [0u8, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 32, 0, 0, 0].as_ref();
MonotonicLogArray::from_logarray(LogArray::parse(Bytes::from(content)).unwrap());
}
#[test]
fn decode() {
let mut decoder = LogArrayDecoder::new_unchecked(17, 1);
let mut bytes = BytesMut::from(TEST0_DATA.as_ref());
assert_eq!(Some(1), Decoder::decode(&mut decoder, &mut bytes).unwrap());
assert_eq!(None, Decoder::decode(&mut decoder, &mut bytes).unwrap());
decoder = LogArrayDecoder::new_unchecked(17, 4);
bytes = BytesMut::from(TEST0_DATA.as_ref());
assert_eq!(Some(1), Decoder::decode(&mut decoder, &mut bytes).unwrap());
assert_eq!(
"LogArrayDecoder { current: \
0b0000000000000000100000000000000010000000000000000110000000000000, width: 17, \
offset: 17, remaining: 3 }",
format!("{:?}", decoder)
);
assert_eq!(Some(2), Decoder::decode(&mut decoder, &mut bytes).unwrap());
assert_eq!(Some(3), Decoder::decode(&mut decoder, &mut bytes).unwrap());
assert_eq!(None, Decoder::decode(&mut decoder, &mut bytes).unwrap());
bytes.extend(TEST1_DATA.iter());
assert_eq!(Some(4), Decoder::decode(&mut decoder, &mut bytes).unwrap());
assert_eq!(None, Decoder::decode(&mut decoder, &mut bytes).unwrap());
}
#[test]
fn logarray_file_get_length_and_width_errors() {
let store = MemoryBackedStore::new();
let _ = tokio::io::write_all(store.open_write(), [0, 0, 0]).wait();
assert_eq!(
io::Error::from(LogArrayError::InputBufferTooSmall(3)).to_string(),
logarray_file_get_length_and_width(store)
.wait()
.err()
.unwrap()
.to_string()
);
let store = MemoryBackedStore::new();
let _ = tokio::io::write_all(store.open_write(), [0, 0, 0, 0, 65, 0, 0, 0]).wait();
assert_eq!(
io::Error::from(LogArrayError::WidthTooLarge(65)).to_string(),
logarray_file_get_length_and_width(store)
.wait()
.err()
.unwrap()
.to_string()
);
let store = MemoryBackedStore::new();
let _ = tokio::io::write_all(store.open_write(), [0, 0, 0, 1, 17, 0, 0, 0]).wait();
assert_eq!(
io::Error::from(LogArrayError::UnexpectedInputBufferSize(8, 16, 1, 17)).to_string(),
logarray_file_get_length_and_width(store)
.wait()
.err()
.unwrap()
.to_string()
);
}
#[test]
fn generate_then_stream_works() {
let store = MemoryBackedStore::new();
let builder = LogArrayFileBuilder::new(store.open_write(), 5);
builder
.push_all(stream::iter_ok(0..31))
.and_then(|b| b.finalize())
.wait()
.unwrap();
let entries: Vec<u64> = logarray_stream_entries(store).collect().wait().unwrap();
let expected: Vec<u64> = (0..31).collect();
assert_eq!(expected, entries);
}
#[test]
fn iterate_over_logarray() {
let store = MemoryBackedStore::new();
let builder = LogArrayFileBuilder::new(store.open_write(), 5);
let original = vec![1, 3, 2, 5, 12, 31, 18];
builder
.push_all(stream::iter_ok(original.clone()))
.and_then(|b| b.finalize())
.wait()
.unwrap();
let content = store.map().wait().unwrap();
let logarray = LogArray::parse(content).unwrap();
let result: Vec<u64> = logarray.iter().collect();
assert_eq!(original, result);
}
#[test]
fn iterate_over_logarray_slice() {
let store = MemoryBackedStore::new();
let builder = LogArrayFileBuilder::new(store.open_write(), 5);
let original = vec![1, 3, 2, 5, 12, 31, 18];
builder
.push_all(stream::iter_ok(original))
.and_then(|b| b.finalize())
.wait()
.unwrap();
let content = store.map().wait().unwrap();
let logarray = LogArray::parse(content).unwrap();
let slice = logarray.slice(2, 3);
let result: Vec<u64> = slice.iter().collect();
assert_eq!([2, 5, 12], result.as_ref());
}
#[test]
fn monotonic_logarray_index_lookup() {
let store = MemoryBackedStore::new();
let builder = LogArrayFileBuilder::new(store.open_write(), 5);
let original = vec![1, 3, 5, 6, 7, 10, 11, 15, 16, 18, 20, 25, 31];
builder
.push_all(stream::iter_ok(original.clone()))
.and_then(|b| b.finalize())
.wait()
.unwrap();
let content = store.map().wait().unwrap();
let logarray = LogArray::parse(content).unwrap();
let monotonic = MonotonicLogArray::from_logarray(logarray);
for (i, &val) in original.iter().enumerate() {
assert_eq!(i, monotonic.index_of(val).unwrap());
}
assert_eq!(None, monotonic.index_of(12));
assert_eq!(original.len(), monotonic.len());
}
#[test]
fn writing_64_bits_of_data() {
let store = MemoryBackedStore::new();
let original = vec![1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8];
let builder = LogArrayFileBuilder::new(store.open_write(), 4);
builder
.push_all(stream::iter_ok(original.clone()))
.and_then(|b| b.finalize())
.wait()
.unwrap();
let content = store.map().wait().unwrap();
let logarray = LogArray::parse(content).unwrap();
assert_eq!(original, logarray.iter().collect::<Vec<_>>());
assert_eq!(16, logarray.len());
assert_eq!(4, logarray.width());
}
}