use std::cmp::Ordering;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, ErrorKind, Read, Seek, SeekFrom, Write as IoWrite};
use std::os::fd::{AsRawFd, RawFd};
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool};
use biometrics::{Collector, Counter};
use buffertk::{Packable, Unpackable, stack_pack, v64};
use prototk_derive::Message;
use sync42::work_coalescing_queue::{WorkCoalescingCore, WorkCoalescingQueue};
use super::setsum::Setsum;
use super::{
Builder, Error, KeyRef, KeyValueDel, KeyValueEntry, KeyValuePair, KeyValuePut, KeyValueRef,
TABLE_FULL_SIZE, check_key_len, check_table_size, check_value_len,
corruption_crc_checksum_failed, corruption_entry_size_exceeds_max, corruption_fsync_failed,
corruption_header_size_exceeds_max, corruption_invalid_discriminant, corruption_log_poisoned,
corruption_shared_not_zero, corruption_true_up_exceeds_header_max,
corruption_truncation_no_second_header, empty_batch, error_with_path, io_result,
io_result_with_context, logic_error_buf_writer_into_inner_failed, system_error, table_full,
unpack_key_value_entry_prototk, unpack_log_header,
};
static APPEND: Counter = Counter::new("sst.log.append");
static FSYNC: Counter = Counter::new("sst.log.fsync");
pub fn register_biometrics(collector: &Collector) {
collector.register_counter(&APPEND);
collector.register_counter(&FSYNC);
}
pub const MAX_BATCH_SIZE: u64 = BLOCK_SIZE - 2 * HEADER_MAX_SIZE;
const BLOCK_BITS: u64 = 20;
const BLOCK_SIZE: u64 = 1 << BLOCK_BITS;
const DEFAULT_BUFFER_SIZE: u64 = BLOCK_SIZE * 2;
fn block_offset(offset: u64) -> u64 {
offset >> BLOCK_BITS
}
fn compute_true_up(offset: u64) -> u64 {
if offset == block_offset(offset) << BLOCK_BITS {
offset
} else {
next_boundary(offset)
}
}
fn next_boundary(offset: u64) -> u64 {
(block_offset(offset) + 1) << BLOCK_BITS
}
fn check_batch_size(size: usize) -> Result<(), Error> {
if size as u64 > BLOCK_SIZE {
Err(table_full(size, BLOCK_SIZE as usize))
} else {
Ok(())
}
}
fn check_batch_size_plus<P: Packable>(buffer: &[u8], pa: P) -> Result<(), Error> {
let size = buffer.len() + pa.pack_sz();
check_batch_size(size)
}
#[derive(Clone, Debug, Default, Message)]
struct Header {
#[prototk(10, uint64)]
size: u64,
#[prototk(11, uint32)]
discriminant: u32,
#[prototk(12, fixed32)]
crc32c: u32,
}
pub const HEADER_MAX_SIZE: u64 = 1 + 1 + 10 + 1 + 1 + 1 + 4; const HEADER_WHOLE: u32 = 1;
const HEADER_FIRST: u32 = 2;
const HEADER_SECOND: u32 = 3;
#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "command_line", derive(arrrg_derive::CommandLine))]
pub struct LogOptions {
#[cfg_attr(feature = "command_line", arrrg(optional, "Size of the write buffer."))]
pub(crate) write_buffer: usize,
#[cfg_attr(feature = "command_line", arrrg(optional, "Size of the read buffer."))]
pub(crate) read_buffer: usize,
#[cfg_attr(
feature = "command_line",
arrrg(optional, "Roll over logs that exceed this number of bytes.")
)]
pub(crate) rollover_size: usize,
}
impl Default for LogOptions {
fn default() -> Self {
Self {
write_buffer: DEFAULT_BUFFER_SIZE as usize,
read_buffer: DEFAULT_BUFFER_SIZE as usize,
rollover_size: 1 << 30,
}
}
}
pub trait Write: std::io::Write {
fn fsync(&mut self) -> Result<(), Error>;
}
impl Write for File {
fn fsync(&mut self) -> Result<(), Error> {
io_result_with_context(self.sync_data(), "log fsync")
}
}
impl Write for &mut Vec<u8> {
fn fsync(&mut self) -> Result<(), Error> {
Ok(())
}
}
impl<W: Write> Write for BufWriter<W> {
fn fsync(&mut self) -> Result<(), Error> {
self.get_mut().fsync()
}
}
#[derive(Clone, Debug, Default)]
pub struct WriteBatch {
buffer: Vec<u8>,
setsum: Setsum,
}
impl WriteBatch {
pub fn insert(&mut self, kvr: KeyValueRef<'_>) -> Result<(), Error> {
if let Some(value) = kvr.value {
self.put(kvr.key, kvr.timestamp, value)
} else {
self.del(kvr.key, kvr.timestamp)
}
}
pub fn merge(&mut self, wb: &WriteBatch) -> Result<(), Error> {
check_batch_size(self.buffer.len() + wb.buffer.len())?;
self.buffer.extend_from_slice(&wb.buffer);
self.setsum += wb.setsum;
Ok(())
}
}
impl Builder for WriteBatch {
type Sealed = Self;
fn approximate_size(&self) -> usize {
self.buffer.len()
}
fn put(&mut self, key: &[u8], timestamp: u64, value: &[u8]) -> Result<(), Error> {
check_key_len(key)?;
check_value_len(value)?;
self.setsum.put(key, timestamp, value);
let put = KeyValuePut {
shared: 0,
key_frag: key,
timestamp,
value,
};
let pa = stack_pack(KeyValueEntry::Put(put));
check_batch_size_plus(&self.buffer, &pa)?;
pa.append_to_vec(&mut self.buffer);
Ok(())
}
fn del(&mut self, key: &[u8], timestamp: u64) -> Result<(), Error> {
check_key_len(key)?;
self.setsum.del(key, timestamp);
let del = KeyValueDel {
shared: 0,
key_frag: key,
timestamp,
};
let pa = stack_pack(KeyValueEntry::Del(del));
check_batch_size_plus(&self.buffer, &pa)?;
pa.append_to_vec(&mut self.buffer);
Ok(())
}
fn seal(self) -> Result<Self::Sealed, Error> {
Ok(self)
}
}
pub struct LogBuilder<W: Write> {
options: LogOptions,
output: BufWriter<W>,
bytes_written: u64,
setsum: Setsum,
}
impl LogBuilder<File> {
pub fn new<P: AsRef<Path>>(options: LogOptions, file_name: P) -> Result<Self, Error> {
let file: File = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.open(file_name.as_ref())
.map_err(|err| {
error_with_path(system_error(err), file_name.as_ref().to_string_lossy())
})?;
Self::from_write(options, file)
}
pub fn fsync(&mut self) -> Result<(), Error> {
FSYNC.click();
io_result_with_context(self.output.flush(), "log builder flush")?;
io_result_with_context(self.output.get_mut().sync_data(), "log builder sync_data")
}
}
impl<W: Write> LogBuilder<W> {
pub fn from_write(options: LogOptions, write: W) -> Result<Self, Error> {
let output = BufWriter::with_capacity(options.write_buffer, write);
Ok(Self {
options,
output,
bytes_written: 0,
setsum: Setsum::default(),
})
}
pub fn flush(&mut self) -> Result<(), Error> {
io_result_with_context(self.output.flush(), "log builder flush")
}
pub fn append(&mut self, write_batch: &WriteBatch) -> Result<(), Error> {
if write_batch.buffer.is_empty() {
return Err(empty_batch());
}
assert_ne!(write_batch.setsum, Setsum::default());
self.setsum += write_batch.setsum;
self._append(&write_batch.buffer)
}
fn _append(&mut self, buffer: &[u8]) -> Result<(), Error> {
let header = Header {
size: buffer.len() as u64,
crc32c: crc32c::crc32c(buffer),
discriminant: HEADER_WHOLE,
};
let header_sz: v64 = header.pack_sz().into();
let header_pa = stack_pack(header_sz);
let header_pa = header_pa.pack(header);
let nb = next_boundary(self.bytes_written);
let new_offset = self.bytes_written + (header_pa.pack_sz() + buffer.len()) as u64;
check_table_size(new_offset as usize)?;
if new_offset > self.options.rollover_size as u64 {
return Err(table_full(new_offset as usize, self.options.rollover_size));
}
if new_offset > nb {
self.append_split(buffer)
} else {
let header_buf = header_pa.to_vec();
self.write(&header_buf)?;
self.write(buffer)?;
assert!(self.bytes_written <= nb);
Ok(())
}
}
fn append_split(&mut self, buffer: &[u8]) -> Result<(), Error> {
let nb = next_boundary(self.bytes_written);
let roundup = nb - self.bytes_written;
if roundup <= HEADER_MAX_SIZE {
self.true_up(nb)?;
return self._append(buffer);
}
let first_bytes = (roundup - HEADER_MAX_SIZE) as usize;
let first = &buffer[..first_bytes];
let second = &buffer[first_bytes..];
let first_header = Header {
size: first.len() as u64,
crc32c: crc32c::crc32c(first),
discriminant: HEADER_FIRST,
};
let second_header = Header {
size: second.len() as u64,
crc32c: crc32c::crc32c(second),
discriminant: HEADER_SECOND,
};
let first_header_sz: v64 = first_header.pack_sz().into();
let second_header_sz: v64 = second_header.pack_sz().into();
let first_header_buf = stack_pack(first_header_sz).pack(first_header).to_vec();
let second_header_buf = stack_pack(second_header_sz).pack(second_header).to_vec();
assert!(first_header_buf.len() as u64 <= HEADER_MAX_SIZE);
assert!(second_header_buf.len() as u64 <= HEADER_MAX_SIZE);
self.write(&first_header_buf)?;
self.write(first)?;
self.true_up(nb)?;
self.write(&second_header_buf)?;
self.write(second)?;
Ok(())
}
fn true_up(&mut self, nb: u64) -> Result<(), Error> {
assert!(nb >= self.bytes_written);
let roundup = (nb - self.bytes_written) as usize;
assert!(roundup as u64 <= HEADER_MAX_SIZE);
let buf: &[u8] = &[0u8; HEADER_MAX_SIZE as usize][..roundup];
if !buf.is_empty() {
self.write(buf)
} else {
Ok(())
}
}
fn write(&mut self, buffer: &[u8]) -> Result<(), Error> {
io_result_with_context(self.output.write_all(buffer), "log write_all")?;
self.bytes_written += buffer.len() as u64;
Ok(())
}
}
impl<W: Write> Builder for LogBuilder<W> {
type Sealed = (Setsum, W);
fn approximate_size(&self) -> usize {
self.bytes_written as usize
}
fn put(&mut self, key: &[u8], timestamp: u64, value: &[u8]) -> Result<(), Error> {
let mut wb = WriteBatch::default();
wb.put(key, timestamp, value)?;
self.append(&wb)?;
Ok(())
}
fn del(&mut self, key: &[u8], timestamp: u64) -> Result<(), Error> {
let mut wb = WriteBatch::default();
wb.del(key, timestamp)?;
self.append(&wb)?;
Ok(())
}
fn seal(mut self) -> Result<Self::Sealed, Error> {
self.flush()?;
Ok((
self.setsum,
self.output
.into_inner()
.map_err(|_| logic_error_buf_writer_into_inner_failed())?,
))
}
}
struct WriteCoalescingCore<W: Write> {
builder: LogBuilder<W>,
written: u64,
}
impl<W: Write> WorkCoalescingCore<Arc<WriteBatch>, Result<u64, Error>> for WriteCoalescingCore<W> {
type InputAccumulator = WriteBatch;
type OutputIterator<'a>
= std::iter::Take<std::iter::Repeat<Result<u64, Error>>>
where
W: 'a;
fn can_batch(&self, acc: &WriteBatch, other: &Arc<WriteBatch>) -> bool {
check_batch_size(acc.buffer.len().saturating_add(other.buffer.len())).is_ok()
}
fn batch(&mut self, mut acc: WriteBatch, other: Arc<WriteBatch>) -> Self::InputAccumulator {
acc.merge(&other)
.expect("can_batch should ensure this is impossible");
acc
}
fn work(&mut self, taken: usize, acc: Self::InputAccumulator) -> Self::OutputIterator<'_> {
self.written += acc.buffer.len() as u64;
if let Err(err) = self.builder.append(&acc) {
std::iter::repeat(Err(err)).take(taken)
} else if let Err(err) = self.builder.flush() {
std::iter::repeat(Err(err)).take(taken)
} else {
std::iter::repeat(Ok(self.written)).take(taken)
}
}
}
struct FsyncCoalescingCore {
raw_builder: RawFd,
synced: u64,
}
impl WorkCoalescingCore<u64, bool> for FsyncCoalescingCore {
type InputAccumulator = u64;
type OutputIterator<'a> = std::iter::Take<std::iter::Repeat<bool>>;
#[allow(clippy::needless_bool)]
fn can_batch(&self, acc: &u64, input: &u64) -> bool {
if *acc > 0 && *acc <= self.synced && *input <= self.synced {
true
} else if *acc > 0 && *acc <= self.synced {
false
} else {
true
}
}
fn batch(&mut self, acc: u64, seen: u64) -> Self::InputAccumulator {
std::cmp::max(acc, seen)
}
fn work(&mut self, taken: usize, acc: Self::InputAccumulator) -> Self::OutputIterator<'_> {
FSYNC.click();
if self.synced >= acc {
std::iter::repeat(true).take(taken)
} else {
#[cfg(not(target_os = "linux"))]
fn fsync(fd: RawFd) -> bool {
(unsafe { libc::fsync(fd) }) >= 0
}
#[cfg(target_os = "linux")]
fn fsync(fd: RawFd) -> bool {
(unsafe { libc::fdatasync(fd) }) >= 0
}
let ret = fsync(self.raw_builder);
if ret {
self.synced = acc;
}
std::iter::repeat(ret).take(taken)
}
}
}
pub struct ConcurrentLogBuilder<W: Write> {
write_cq: WorkCoalescingQueue<Arc<WriteBatch>, Result<u64, Error>, WriteCoalescingCore<W>>,
fsync_cq: WorkCoalescingQueue<u64, bool, FsyncCoalescingCore>,
poison: AtomicBool,
_phantom_w: std::marker::PhantomData<W>,
}
impl ConcurrentLogBuilder<File> {
pub fn new<P: AsRef<Path>>(options: LogOptions, file_name: P) -> Result<Self, Error> {
let builder = LogBuilder::new(options, file_name)?;
Self::from_builder(builder)
}
pub fn fsync(&self) -> Result<(), Error> {
if !self.fsync_cq.do_work(0) {
Err(corruption_log_poisoned())
} else {
Ok(())
}
}
}
impl<W: Write + AsRawFd> ConcurrentLogBuilder<W> {
pub fn from_write(options: LogOptions, write: W) -> Result<Self, Error> {
let builder = LogBuilder::from_write(options, write)?;
Self::from_builder(builder)
}
pub fn from_builder(builder: LogBuilder<W>) -> Result<Self, Error> {
let raw_builder = builder.output.get_ref().as_raw_fd();
let write_cq = WorkCoalescingQueue::new(WriteCoalescingCore {
builder,
written: 0,
});
let fsync_cq = WorkCoalescingQueue::new(FsyncCoalescingCore {
raw_builder,
synced: 0,
});
let poison = AtomicBool::new(false);
let _phantom_w = std::marker::PhantomData;
Ok(Self {
write_cq,
fsync_cq,
poison,
_phantom_w,
})
}
pub fn flush(&self) -> Result<(), Error> {
let mut write = self.write_cq.get_core();
write.builder.flush()
}
pub fn approximate_size(&self) -> usize {
let write = self.write_cq.get_core();
write.builder.approximate_size()
}
pub fn append(&self, write_batch: WriteBatch) -> Result<(), Error> {
if write_batch.buffer.is_empty() {
return Err(empty_batch());
}
let written = match self.write_cq.do_work(Arc::new(write_batch)) {
Ok(written) => written,
Err(err) => {
self.poison.store(true, atomic::Ordering::Relaxed);
return Err(err);
}
};
if !self.fsync_cq.do_work(written) {
self.poison.store(true, atomic::Ordering::Relaxed);
return Err(corruption_fsync_failed());
}
Ok(())
}
pub fn put(&self, key: &[u8], timestamp: u64, value: &[u8]) -> Result<(), Error> {
let mut wb = WriteBatch::default();
wb.put(key, timestamp, value)?;
self.append(wb)?;
Ok(())
}
pub fn del(&self, key: &[u8], timestamp: u64) -> Result<(), Error> {
let mut wb = WriteBatch::default();
wb.del(key, timestamp)?;
self.append(wb)?;
Ok(())
}
pub fn seal(self) -> Result<(Setsum, W), Error> {
let core = self.write_cq.into_inner();
core.builder.seal()
}
}
impl<W: Write> std::fmt::Debug for ConcurrentLogBuilder<W> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "ConcurrentLogBuilder")
}
}
pub struct LogIterator<R: Read + Seek> {
input: BufReader<R>,
buffer: Vec<u8>,
buffer_idx: usize,
}
impl LogIterator<File> {
pub fn new<P: AsRef<Path>>(options: LogOptions, file_name: P) -> Result<Self, Error> {
let file: File = OpenOptions::new()
.create(false)
.read(true)
.open(file_name.as_ref())
.map_err(|err| {
error_with_path(system_error(err), file_name.as_ref().to_string_lossy())
})?;
Self::from_reader(options, file)
}
}
impl<R: Read + Seek> LogIterator<R> {
pub fn from_reader(options: LogOptions, reader: R) -> Result<Self, Error> {
let input = BufReader::with_capacity(options.read_buffer, reader);
Ok(Self {
input,
buffer: vec![],
buffer_idx: 0,
})
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<Option<KeyValueRef<'_>>, Error> {
if self.buffer_idx < self.buffer.len() {
return self.next_from_buffer();
}
self.buffer_idx = 0;
self.buffer.clear();
let header = match self.next_frame()? {
Some(header) => header,
None => {
return Ok(None);
}
};
if header.discriminant == HEADER_WHOLE {
} else if header.discriminant == HEADER_FIRST {
self.true_up()?;
let header2 = match self.next_frame()? {
Some(header) => header,
None => {
return Err(corruption_truncation_no_second_header(
self.input.stream_position().unwrap_or(0),
));
}
};
if header2.discriminant != HEADER_SECOND {
return Err(corruption_invalid_discriminant(
header2.discriminant,
self.input.stream_position().unwrap_or(0),
));
}
} else {
return Err(corruption_invalid_discriminant(
header.discriminant,
self.input.stream_position().unwrap_or(0),
));
}
self.next_from_buffer()
}
fn next_from_buffer(&mut self) -> Result<Option<KeyValueRef<'_>>, Error> {
if self.buffer_idx >= self.buffer.len() {
return Err(empty_batch());
}
let (kve, rem) = <KeyValueEntry as Unpackable>::unpack(&self.buffer[self.buffer_idx..])
.map_err(unpack_key_value_entry_prototk)?;
self.buffer_idx = self.buffer.len() - rem.len();
fn check_shared(shared: u64) -> Result<(), Error> {
if shared != 0 {
Err(corruption_shared_not_zero())
} else {
Ok(())
}
}
match &kve {
KeyValueEntry::Put(KeyValuePut {
shared,
key_frag,
timestamp,
value,
}) => {
check_shared(*shared)?;
Ok(Some(KeyValueRef {
key: key_frag,
timestamp: *timestamp,
value: Some(value),
}))
}
KeyValueEntry::Del(KeyValueDel {
shared,
key_frag,
timestamp,
}) => {
check_shared(*shared)?;
Ok(Some(KeyValueRef {
key: key_frag,
timestamp: *timestamp,
value: None,
}))
}
}
}
fn next_frame(&mut self) -> Result<Option<Header>, Error> {
let header = match self.next_header()? {
Some(header) => header,
None => {
return Ok(None);
}
};
let buffer_start_sz = self.buffer.len();
let buffer_new_sz = buffer_start_sz + header.size as usize;
self.buffer.resize(buffer_new_sz, 0);
let buffer = &mut self.buffer[buffer_start_sz..];
io_result(self.input.read_exact(buffer))?;
let crc = crc32c::crc32c(buffer);
if crc != header.crc32c {
return Err(corruption_crc_checksum_failed(
header.crc32c,
crc,
self.input.stream_position().unwrap_or(0),
));
}
Ok(Some(header))
}
fn next_header(&mut self) -> Result<Option<Header>, Error> {
'looping: loop {
let header_sz: &mut [u8] = &mut [0; 1];
let header: &mut [u8] = &mut [0; HEADER_MAX_SIZE as usize];
match self.input.read_exact(header_sz) {
Ok(_) => (),
Err(err) => {
if err.kind() == ErrorKind::UnexpectedEof {
return Ok(None);
} else {
return Err(system_error(err));
}
}
};
let header_sz: usize = header_sz[0] as usize;
if header_sz == 0 {
self.true_up()?;
continue 'looping;
}
if header_sz as u64 > HEADER_MAX_SIZE {
return Err(corruption_header_size_exceeds_max(
header_sz,
self.input.stream_position().unwrap_or(0),
));
}
let header = &mut header[..header_sz];
io_result(self.input.read_exact(header))?;
let header: Header = <Header as Unpackable>::unpack(header)
.map_err(unpack_log_header)?
.0;
if header.size > TABLE_FULL_SIZE as u64 {
return Err(corruption_entry_size_exceeds_max(
header.size,
self.input.stream_position().unwrap_or(0),
));
}
return Ok(Some(header));
}
}
fn true_up(&mut self) -> Result<(), Error> {
let offset = io_result(self.input.stream_position())?;
let trued_up = compute_true_up(offset);
if trued_up - offset > HEADER_MAX_SIZE {
return Err(corruption_true_up_exceeds_header_max(offset, trued_up));
}
io_result(self.input.seek(SeekFrom::Start(trued_up)))?;
Ok(())
}
}
pub fn log_to_builder<P: AsRef<Path>, B: Builder>(
log_options: LogOptions,
log_path: P,
mut builder: B,
) -> Result<Option<B::Sealed>, Error> {
let mut log_iter = LogIterator::new(log_options, log_path)?;
let mut kvrs = Vec::new();
while let Some(kvr) = log_iter.next().unwrap() {
kvrs.push(KeyValuePair::from(kvr));
}
fn sort_key(lhs: &KeyValuePair, rhs: &KeyValuePair) -> Ordering {
KeyRef::new(&lhs.key, lhs.timestamp).cmp(&KeyRef::new(&rhs.key, rhs.timestamp))
}
kvrs.sort_by(sort_key);
if kvrs.is_empty() {
return Ok(None);
}
for kvr in kvrs.into_iter() {
match kvr.value {
Some(v) => {
builder.put(&kvr.key, kvr.timestamp, &v)?;
}
None => {
builder.del(&kvr.key, kvr.timestamp)?;
}
}
}
builder.seal().map(Some)
}
pub fn log_to_setsum<P: AsRef<Path>>(
log_options: LogOptions,
log_path: P,
) -> Result<Setsum, Error> {
let mut log_iter = LogIterator::new(log_options, log_path)?;
let mut acc = Setsum::default();
while let Some(kvr) = log_iter.next().unwrap() {
if let Some(value) = kvr.value.as_ref() {
acc.put(kvr.key, kvr.timestamp, value);
} else {
acc.del(kvr.key, kvr.timestamp);
}
}
Ok(acc)
}
pub fn truncate_final_partial_frame<P: AsRef<Path>>(
log_options: LogOptions,
log_path: P,
) -> Result<Option<u64>, Error> {
let mut iter = LogIterator::new(log_options, log_path)?;
let mut offset = 0;
let mut last_was_valid = true;
while let Some(header) = iter.next_frame()? {
if header.discriminant == HEADER_SECOND || header.discriminant == HEADER_WHOLE {
offset = io_result(iter.input.stream_position())?;
last_was_valid = true;
} else {
last_was_valid = false;
}
}
if !last_was_valid {
Ok(Some(offset))
} else {
Ok(None)
}
}
#[cfg(test)]
mod offsets {
use super::*;
#[test]
fn offsets() {
assert_eq!(0, block_offset(0));
assert_eq!(0, block_offset(1048575));
assert_eq!(1, block_offset(1048576));
assert_eq!(1, block_offset(2097151));
}
#[test]
fn boundaries() {
assert_eq!(1048576, next_boundary(0));
assert_eq!(1048576, next_boundary(1048575));
assert_eq!(2097152, next_boundary(1048576));
assert_eq!(2097152, next_boundary(2097151));
}
#[test]
fn true_ups() {
assert_eq!(0, compute_true_up(0));
assert_eq!(1048576, compute_true_up(1));
assert_eq!(1048576, compute_true_up(1048576));
assert_eq!(2097152, compute_true_up(1048577));
assert_eq!(2097152, compute_true_up(2097152));
}
}
#[cfg(test)]
mod builder {
use super::*;
#[test]
fn empty() {
let mut write = Vec::new();
let log =
LogBuilder::from_write(LogOptions::default(), &mut write).expect("should not fail");
log.seal().expect("seal should not fail");
let exp: &[u8] = &[];
let got: &[u8] = &write;
assert_eq!(exp, got);
}
#[test]
fn header_one() {
let header = Header {
size: 19,
discriminant: 1,
crc32c: 0x5475cb53,
};
let buf = stack_pack(header).to_vec();
let exp: &[u8] = &[
80, 19, 88, 1, 101, 83, 203, 117, 84, ];
let got: &[u8] = &buf;
assert_eq!(exp, got);
}
#[test]
fn crc32c_one() {
let buf: &[u8] = &[
66, 19, 8, 0, 18, 3, 101, 102, 103, 24, 42, 34, 8, 1, 2, 3, 4, 5, 6, 7, 8,
];
assert_eq!(0x5acc2712, crc32c::crc32c(buf));
}
#[test]
fn insert_one() {
let mut write = Vec::new();
let mut log =
LogBuilder::from_write(LogOptions::default(), &mut write).expect("should not fail");
log.put(&[101, 102, 103], 42, &[1, 2, 3, 4, 5, 6, 7, 8])
.unwrap();
log.flush().unwrap();
drop(log);
let exp: &[u8] = &[
9, 80, 21, 88, 1, 101, 18, 39, 204, 90, 66, 19, 8, 0, 18, 3, 101, 102, 103, 24, 42, 34, 8, 1, 2, 3, 4, 5, 6, 7, 8, ];
let got: &[u8] = &write;
assert_eq!(exp, got);
}
#[test]
fn insert_across_boundary() {
let mut buffer = Vec::new();
let mut log =
LogBuilder::from_write(LogOptions::default(), &mut buffer).expect("should not fail");
let key = vec![b'A'; 64];
let value = vec![b'B'; 32768];
for _ in 0..33 {
log.put(&key, 42, &value).unwrap();
}
log.flush().unwrap();
drop(log);
let block_size = BLOCK_SIZE as usize;
assert_eq!(
&[
66, 66, 66, 66, 66, 66, 66, 66, 66, 66, 66, 66, 66, 0, 0, 0, 0, 0, 0, 0, 10, 80,
199, 22, 88, 3, 101, 44, 249, 80, 107, 66, 66, 66, 66, 66, 66, 66, 66, 66
],
&buffer[block_size - 20..block_size + 20]
);
}
}