use std::fmt::{Debug, Display, Formatter};
use std::path::Path;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use std::{io, thread};
use clokwerk::{ScheduleHandle, Scheduler, TimeUnits};
use crate::internal::{
acquire_lock, get_current_timestamp, initialize_db_folder, slice_to_array, BufferPool,
DbFileHeader, Header, InvertedIndex, KeyValueEntry, ValueEntry,
};
const DEFAULT_DB_FILE: &str = "dump.scdb";
const DEFAULT_SEARCH_INDEX_FILE: &str = "index.iscdb";
const ZERO_U64_BYTES: [u8; 8] = 0u64.to_be_bytes();
const DEFAULT_MAX_INDEX_KEY_LEN: u32 = 3;
pub struct Store {
buffer_pool: Arc<Mutex<BufferPool>>,
header: DbFileHeader,
scheduler: Option<ScheduleHandle>,
search_index: Option<Arc<Mutex<InvertedIndex>>>,
}
impl Store {
pub fn new(
store_path: &str,
max_keys: Option<u64>,
redundant_blocks: Option<u16>,
pool_capacity: Option<usize>,
compaction_interval: Option<u32>,
is_search_enabled: bool,
) -> io::Result<Self> {
let db_folder = Path::new(store_path);
let db_file_path = db_folder.join(DEFAULT_DB_FILE);
let search_idx_file_path = db_folder.join(DEFAULT_SEARCH_INDEX_FILE);
initialize_db_folder(db_folder)?;
let mut buffer_pool = BufferPool::new(
pool_capacity,
&db_file_path,
max_keys,
redundant_blocks,
None,
)?;
if is_search_enabled {}
let search_index = if is_search_enabled {
let idx = InvertedIndex::new(
&search_idx_file_path,
Some(DEFAULT_MAX_INDEX_KEY_LEN),
max_keys,
redundant_blocks,
)?;
let idx = Arc::new(Mutex::new(idx));
Some(idx)
} else {
None
};
let header = extract_header_from_buffer_pool(&mut buffer_pool)?;
let buffer_pool = Arc::new(Mutex::new(buffer_pool));
let scheduler = initialize_scheduler(compaction_interval, &buffer_pool, &search_index);
let store = Self {
buffer_pool,
header,
scheduler,
search_index,
};
Ok(store)
}
pub fn set(&mut self, k: &[u8], v: &[u8], ttl: Option<u64>) -> io::Result<()> {
let expiry = match ttl {
None => 0u64,
Some(expiry) => get_current_timestamp() + expiry,
};
let mut index_block = 0;
let index_offset = self.header.get_index_offset(k);
let mut buffer_pool: MutexGuard<'_, BufferPool> = acquire_lock!(self.buffer_pool)?;
while index_block < self.header.number_of_index_blocks {
let index_offset = self
.header
.get_index_offset_in_nth_block(index_offset, index_block)?;
let kv_offset_in_bytes = buffer_pool.read_index(index_offset)?;
if kv_offset_in_bytes == ZERO_U64_BYTES
|| buffer_pool.addr_belongs_to_key(&kv_offset_in_bytes, k)?
{
let kv = KeyValueEntry::new(k, v, expiry);
let mut kv_bytes = kv.as_bytes();
let prev_last_offset = buffer_pool.append(&mut kv_bytes)?;
let kv_address = prev_last_offset.to_be_bytes();
buffer_pool.update_index(index_offset, &kv_address)?;
if let Some(idx) = &self.search_index {
let mut idx: MutexGuard<'_, InvertedIndex> = acquire_lock!(idx)?;
idx.add(k, prev_last_offset, expiry)?;
}
return Ok(());
}
index_block += 1;
}
Err(io::Error::new(
io::ErrorKind::Other,
format!("CollisionSaturatedError: no free slot for key: {:?}", k),
))
}
pub fn get(&mut self, k: &[u8]) -> io::Result<Option<Vec<u8>>> {
let mut index_block = 0;
let index_offset = self.header.get_index_offset(k);
let mut buffer_pool: MutexGuard<'_, BufferPool> = acquire_lock!(self.buffer_pool)?;
while index_block < self.header.number_of_index_blocks {
let index_offset = self
.header
.get_index_offset_in_nth_block(index_offset, index_block)?;
let kv_offset_in_bytes = buffer_pool.read_index(index_offset)?;
if kv_offset_in_bytes != ZERO_U64_BYTES {
let entry_offset = u64::from_be_bytes(slice_to_array(&kv_offset_in_bytes)?);
if let Some(v) = buffer_pool.get_value(entry_offset, k)? {
return if v.is_stale {
Ok(None)
} else {
Ok(Some(v.data))
};
}
}
index_block += 1;
}
Ok(None)
}
pub fn delete(&mut self, k: &[u8]) -> io::Result<()> {
let mut index_block = 0;
let index_offset = self.header.get_index_offset(k);
let mut buffer_pool: MutexGuard<'_, BufferPool> = acquire_lock!(self.buffer_pool)?;
let search_handle = self.search_index.as_ref().map(|idx| {
let idx = idx.clone();
let k = k.to_vec();
thread::spawn(move || {
let mut idx: MutexGuard<'_, InvertedIndex> = acquire_lock!(idx)?;
idx.remove(&k)
})
});
while index_block < self.header.number_of_index_blocks {
let index_offset = self
.header
.get_index_offset_in_nth_block(index_offset, index_block)?;
let kv_offset_in_bytes = buffer_pool.read_index(index_offset)?;
if kv_offset_in_bytes != ZERO_U64_BYTES {
let entry_offset = u64::from_be_bytes(slice_to_array(&kv_offset_in_bytes)?);
if let Some(()) = buffer_pool.try_delete_kv_entry(entry_offset, k)? {
return Ok(());
}
}
index_block += 1;
}
if let Some(handle) = search_handle {
handle.join().unwrap()?;
}
Ok(())
}
pub fn clear(&mut self) -> io::Result<()> {
let search_handle = self.search_index.as_ref().map(|idx| {
let idx = idx.clone();
thread::spawn(move || {
let mut idx: MutexGuard<'_, InvertedIndex> = acquire_lock!(idx)?;
idx.clear()
})
});
let mut buffer_pool: MutexGuard<'_, BufferPool> = acquire_lock!(self.buffer_pool)?;
buffer_pool.clear_file()?;
if let Some(handle) = search_handle {
handle.join().unwrap()?;
}
Ok(())
}
pub fn compact(&mut self) -> io::Result<()> {
let mut buffer_pool: MutexGuard<'_, BufferPool> = acquire_lock!(self.buffer_pool)?;
let mut search_index = match &self.search_index {
None => None,
Some(idx) => {
let idx: MutexGuard<'_, InvertedIndex> = acquire_lock!(idx)?;
Some(idx)
}
};
buffer_pool.compact_file(&mut (search_index.as_deref_mut()))
}
pub fn search(
&mut self,
term: &[u8],
skip: u64,
limit: u64,
) -> io::Result<Vec<(Vec<u8>, Vec<u8>)>> {
if let Some(idx) = &self.search_index {
let mut search_index = acquire_lock!(idx)?;
let offsets = search_index.search(term, skip, limit)?;
let mut buffer_pool: MutexGuard<'_, BufferPool> = acquire_lock!(self.buffer_pool)?;
buffer_pool.get_many_key_values(&offsets)
} else {
Err(io::Error::from(io::ErrorKind::Unsupported))
}
}
}
impl Debug for Store {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Store {{ buffer_pool: {:?}, header: {}}}",
self.buffer_pool, self.header
)
}
}
impl Display for Store {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl Drop for Store {
fn drop(&mut self) {
if let Some(scheduler) = self.scheduler.take() {
scheduler.stop();
}
}
}
fn initialize_scheduler(
interval: Option<u32>,
buffer_pool: &Arc<Mutex<BufferPool>>,
search_index: &Option<Arc<Mutex<InvertedIndex>>>,
) -> Option<ScheduleHandle> {
let interval = interval.unwrap_or(3_600u32);
if interval > 0 {
let mut scheduler = Scheduler::new();
let buffer_pool = buffer_pool.clone();
let search_index = search_index.as_ref().cloned();
scheduler.every(interval.seconds()).run(move || {
let mut buffer_pool: MutexGuard<'_, BufferPool> =
acquire_lock!(buffer_pool).expect("get lock on buffer pool");
let mut search_index: Option<MutexGuard<'_, InvertedIndex>> = search_index
.as_ref()
.map(|v| acquire_lock!(v).expect("get lock on search index"));
buffer_pool
.compact_file(&mut (search_index.as_deref_mut()))
.expect("compact db file in thread");
});
let handle = scheduler.watch_thread(Duration::from_millis(200));
Some(handle)
} else {
None
}
}
fn extract_header_from_buffer_pool(buffer_pool: &mut BufferPool) -> io::Result<DbFileHeader> {
DbFileHeader::from_file(&mut buffer_pool.file)
}
#[cfg(test)]
mod tests {
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom};
use std::{fs, io, thread};
use serial_test::serial;
use super::*;
const STORE_PATH: &str = "db";
macro_rules! assert_list_eq {
($expected:expr, $got:expr) => {
for (got, expected) in $got.into_iter().zip($expected) {
assert_eq!(got.as_ref().unwrap(), expected.as_ref().unwrap());
}
};
}
macro_rules! str_to_bytes {
($v:expr) => {
$v.to_string().into_bytes()
};
}
macro_rules! to_byte_arrays_vector {
($data:expr) => {
$data.map(|v| str_to_bytes!(v)).to_vec()
};
}
#[test]
#[serial]
fn set_works() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = get_keys();
let values = get_values();
insert_test_data(&mut store, &keys, &values, None);
let received_values = get_values_for_keys(&mut store, &keys);
let expected_values = wrap_values_in_result(&values);
assert_list_eq!(&expected_values, &received_values);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn set_with_ttl_works() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = get_keys();
let values = get_values();
insert_test_data(&mut store, &keys[0..2].to_vec(), &values, None);
insert_test_data(&mut store, &keys[2..].to_vec(), &values, Some(1));
thread::sleep(Duration::from_secs(2));
let received_values = get_values_for_keys(&mut store, &keys);
let mut expected_values = wrap_values_in_result(&values[..2]);
for _ in 2..keys.len() {
expected_values.push(Ok(None));
}
assert_list_eq!(&expected_values, &received_values);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn set_can_update() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = get_keys();
let values = get_values();
let unchanged_values = values[2..].to_vec();
let updated_keys = keys[0..2].to_vec();
let updated_values: Vec<Vec<u8>> = values[0..2]
.iter()
.map(|v| v.iter().chain(b"bear").map(|v| v.to_owned()).collect())
.collect();
insert_test_data(&mut store, &keys, &values, None);
insert_test_data(&mut store, &updated_keys, &updated_values, None);
let received_values = get_values_for_keys(&mut store, &keys);
let received_unchanged_values = &received_values[2..];
let received_updated_values = &received_values[0..2];
let expected_unchanged_values = wrap_values_in_result(&unchanged_values);
let expected_updated_values = wrap_values_in_result(&updated_values);
assert_list_eq!(&expected_unchanged_values, &received_unchanged_values);
assert_list_eq!(&expected_updated_values, &received_updated_values);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn delete_works() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = get_keys();
let values = get_values();
let keys_to_delete = keys[2..].to_vec();
insert_test_data(&mut store, &keys, &values, None);
delete_keys(&mut store, &keys_to_delete);
let received_values = get_values_for_keys(&mut store, &keys);
let mut expected_values = wrap_values_in_result(&values[..2]);
for _ in 0..keys_to_delete.len() {
expected_values.push(Ok(None));
}
assert_list_eq!(&expected_values, &received_values);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn clear_works() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = get_keys();
let values = get_values();
insert_test_data(&mut store, &keys, &values, None);
store.clear().expect("store cleared");
let received_values = get_values_for_keys(&mut store, &keys);
let expected_values: Vec<io::Result<Option<Vec<u8>>>> =
keys.iter().map(|_| Ok(None)).collect();
assert_list_eq!(&expected_values, &received_values);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn search_errs_when_disabled() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = to_byte_arrays_vector!(["foo", "fore", "bar", "band", "pig"]);
let values = to_byte_arrays_vector!(["eng", "span", "port", "nyoro", "dan"]);
insert_test_data(&mut store, &keys, &values, None);
assert!(store.search(&b"f".to_vec(), 0, 0).is_err());
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn search_works() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), true).expect("create store");
store.clear().expect("store failed to clear");
let keys = to_byte_arrays_vector!(["foo", "fore", "bar", "band", "pig"]);
let values = to_byte_arrays_vector!(["eng", "span", "port", "nyoro", "dan"]);
insert_test_data(&mut store, &keys, &values, None);
let test_data = [
("f", vec![("foo", "eng"), ("fore", "span")]),
("fo", vec![("foo", "eng"), ("fore", "span")]),
("foo", vec![("foo", "eng")]),
("for", vec![("fore", "span")]),
("b", vec![("bar", "port"), ("band", "nyoro")]),
("ba", vec![("bar", "port"), ("band", "nyoro")]),
("bar", vec![("bar", "port")]),
("ban", vec![("band", "nyoro")]),
("band", vec![("band", "nyoro")]),
("p", vec![("pig", "dan")]),
("pi", vec![("pig", "dan")]),
("pig", vec![("pig", "dan")]),
("pigg", vec![]),
("food", vec![]),
("bandana", vec![]),
("bare", vec![]),
];
for (term, expected) in test_data {
let expected: Vec<(Vec<u8>, Vec<u8>)> = expected
.into_iter()
.map(|(k, v)| (str_to_bytes!(k), str_to_bytes!(v)))
.collect();
let got = store
.search(&str_to_bytes!(term), 0, 0)
.expect(&format!("search for {}", term));
assert_eq!(&expected, &got);
}
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn search_works_after_expire() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), true).expect("create store");
store.clear().expect("store failed to clear");
let keys = to_byte_arrays_vector!(["foo", "bar", "fore", "band", "pig"]);
let values = to_byte_arrays_vector!(["eng", "port", "span", "nyoro", "dan"]);
insert_test_data(&mut store, &keys.to_vec(), &values.to_vec(), Some(1));
insert_test_data(&mut store, &keys[2..].to_vec(), &values[2..].to_vec(), None);
thread::sleep(Duration::from_secs(2));
let test_data = [
("f", vec![("fore", "span")]),
("fo", vec![("fore", "span")]),
("foo", vec![]),
("for", vec![("fore", "span")]),
("b", vec![("band", "nyoro")]),
("ba", vec![("band", "nyoro")]),
("bar", vec![]),
("ban", vec![("band", "nyoro")]),
("band", vec![("band", "nyoro")]),
("p", vec![("pig", "dan")]),
("pi", vec![("pig", "dan")]),
("pig", vec![("pig", "dan")]),
("pigg", vec![]),
("food", vec![]),
("bandana", vec![]),
("bare", vec![]),
];
for (term, expected) in test_data {
let expected: Vec<(Vec<u8>, Vec<u8>)> = expected
.into_iter()
.map(|(k, v)| (str_to_bytes!(k), str_to_bytes!(v)))
.collect();
let got = store
.search(&str_to_bytes!(term), 0, 0)
.expect(&format!("search for {}", term));
assert_eq!(&expected, &got);
}
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn search_works_after_delete() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), true).expect("create store");
store.clear().expect("store failed to clear");
let keys = to_byte_arrays_vector!(["foo", "fore", "bar", "band", "pig"]);
let values = to_byte_arrays_vector!(["eng", "span", "port", "nyoro", "dan"]);
insert_test_data(&mut store, &keys, &values, None);
delete_keys(&mut store, &to_byte_arrays_vector!(["foo", "bar", "band"]));
let test_data = [
("f", vec![("fore", "span")]),
("fo", vec![("fore", "span")]),
("foo", vec![]),
("for", vec![("fore", "span")]),
("b", vec![]),
("ba", vec![]),
("bar", vec![]),
("ban", vec![]),
("band", vec![]),
("p", vec![("pig", "dan")]),
("pi", vec![("pig", "dan")]),
("pig", vec![("pig", "dan")]),
("pigg", vec![]),
("food", vec![]),
("bandana", vec![]),
("bare", vec![]),
];
for (term, expected) in test_data {
let expected: Vec<(Vec<u8>, Vec<u8>)> = expected
.into_iter()
.map(|(k, v)| (str_to_bytes!(k), str_to_bytes!(v)))
.collect();
let got = store
.search(&str_to_bytes!(term), 0, 0)
.expect(&format!("search for {}", term));
assert_eq!(&expected, &got);
}
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn search_works_after_clear() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), true).expect("create store");
store.clear().expect("store failed to clear");
let keys = to_byte_arrays_vector!(["foo", "fore", "bar", "band", "pig"]);
let values = to_byte_arrays_vector!(["eng", "span", "port", "nyoro", "dan"]);
insert_test_data(&mut store, &keys, &values, None);
let test_data = [
"f", "fo", "foo", "for", "b", "ba", "bar", "ban", "band", "p", "pi", "pig", "pigg",
"food", "bandana", "bare",
];
store.clear().expect("store cleared");
let expected: Vec<(Vec<u8>, Vec<u8>)> = vec![];
for term in test_data {
let got = store
.search(&str_to_bytes!(term), 0, 0)
.expect(&format!("search for {}", term));
assert_eq!(&expected, &got);
}
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn paginated_search_works() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), true).expect("create store");
store.clear().expect("store failed to clear");
let keys = to_byte_arrays_vector!(["foo", "fore", "food", "bar", "band", "pig"]);
let values = to_byte_arrays_vector!(["eng", "span", "lug", "port", "nyoro", "dan"]);
insert_test_data(&mut store, &keys, &values, None);
let test_data = [
(
"fo",
0,
0,
vec![("foo", "eng"), ("fore", "span"), ("food", "lug")],
),
(
"fo",
0,
8,
vec![("foo", "eng"), ("fore", "span"), ("food", "lug")],
),
("fo", 1, 8, vec![("fore", "span"), ("food", "lug")]),
("fo", 1, 0, vec![("fore", "span"), ("food", "lug")]),
("fo", 0, 2, vec![("foo", "eng"), ("fore", "span")]),
("fo", 1, 2, vec![("fore", "span"), ("food", "lug")]),
("fo", 0, 1, vec![("foo", "eng")]),
("fo", 2, 1, vec![("food", "lug")]),
("fo", 1, 1, vec![("fore", "span")]),
];
for (term, skip, limit, expected) in test_data {
let expected: Vec<(Vec<u8>, Vec<u8>)> = expected
.into_iter()
.map(|(k, v)| (str_to_bytes!(k), str_to_bytes!(v)))
.collect();
let got = store
.search(&str_to_bytes!(term), skip, limit)
.expect(&format!(
"search for {}, skip: {}, limit: {}",
term, skip, limit
));
assert_eq!(&expected, &got);
}
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn persists_to_file() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store
.clear()
.expect("store failed to get cleared for some reason");
let keys = get_keys();
let values = get_values();
insert_test_data(&mut store, &keys, &values, None);
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
let received_values = get_values_for_keys(&mut store, &keys);
let expected_values = wrap_values_in_result(&values);
assert_list_eq!(&expected_values, &received_values);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn persists_to_file_after_delete() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = get_keys();
let values = get_values();
let keys_to_delete = keys[2..].to_vec();
insert_test_data(&mut store, &keys, &values, None);
delete_keys(&mut store, &keys_to_delete);
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
let received_values = get_values_for_keys(&mut store, &keys);
let mut expected_values = wrap_values_in_result(&values[..2]);
for _ in 0..keys_to_delete.len() {
expected_values.push(Ok(None));
}
assert_list_eq!(&expected_values, &received_values);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn persists_to_file_after_clear() {
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = get_keys();
let values = get_values();
insert_test_data(&mut store, &keys, &values, None);
store.clear().expect("store failed to clear");
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
let received_values = get_values_for_keys(&mut store, &keys);
let expected_values: Vec<io::Result<Option<Vec<u8>>>> =
keys.iter().map(|_| Ok(None)).collect();
assert_list_eq!(&expected_values, &received_values);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn compact_removes_deleted_and_expired_from_db_file() {
fs::remove_dir_all(STORE_PATH).ok();
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = get_keys();
let values = get_values();
insert_test_data(
&mut store,
&keys[0..2].to_vec(),
&values[0..2].to_vec(),
Some(1),
);
insert_test_data(&mut store, &keys[2..].to_vec(), &values[2..].to_vec(), None);
delete_keys(&mut store, &keys[2..3].to_vec());
let buffer_pool = acquire_lock!(store.buffer_pool).expect("acquire lock on buffer pool");
let db_file_path = buffer_pool.file_path.to_str().unwrap().to_owned();
drop(buffer_pool);
thread::sleep(Duration::from_secs(2));
let original_file_size = get_file_size(&db_file_path);
store.compact().expect("compact store");
let final_file_size = get_file_size(&db_file_path);
let expected_file_size_reduction = keys[0..3]
.iter()
.zip(&values[0..3])
.map(|(k, v)| KeyValueEntry::new(k, v, 0).as_bytes().len() as u64)
.reduce(|accum, v| accum + v)
.unwrap();
assert_eq!(
original_file_size - final_file_size,
expected_file_size_reduction
);
let received_values = get_values_for_keys(&mut store, &keys);
let received_unchanged_values = &received_values[3..];
let received_removed_values = &received_values[0..3];
let expected_unchanged_values = wrap_values_in_result(&values[3..]);
let expected_expired_values: Vec<io::Result<Option<Vec<u8>>>> =
keys[0..3].iter().map(|_| Ok(None)).collect();
assert_list_eq!(&expected_unchanged_values, &received_unchanged_values);
assert_list_eq!(&expected_expired_values, &received_removed_values);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn compact_removes_expired_from_search_index_file() {
fs::remove_dir_all(STORE_PATH).ok();
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), true).expect("create store");
store.clear().expect("store failed to clear");
let keys = to_byte_arrays_vector!(["foo", "bar", "fore", "band", "pig"]);
let values = to_byte_arrays_vector!(["eng", "port", "span", "nyoro", "dan"]);
insert_test_data(
&mut store,
&keys[0..2].to_vec(),
&values[0..2].to_vec(),
Some(1),
);
insert_test_data(&mut store, &keys[2..].to_vec(), &values[2..].to_vec(), None);
let search_index = store.search_index.as_ref().expect("has search index");
let search_index = acquire_lock!(search_index).expect("acquire lock on search index");
let search_index_file_path = search_index.file_path.to_str().unwrap().to_owned();
drop(search_index);
thread::sleep(Duration::from_secs(2));
let original_file_size = get_file_size(&search_index_file_path);
store.compact().expect("compact store");
let final_file_size = get_file_size(&search_index_file_path);
let expected_file_size_reduction = 282u64;
assert_eq!(
original_file_size - final_file_size,
expected_file_size_reduction
);
let test_data = [
("f", vec![("fore", "span")]),
("fo", vec![("fore", "span")]),
("foo", vec![]),
("for", vec![("fore", "span")]),
("b", vec![("band", "nyoro")]),
("ba", vec![("band", "nyoro")]),
("bar", vec![]),
("ban", vec![("band", "nyoro")]),
("band", vec![("band", "nyoro")]),
("p", vec![("pig", "dan")]),
("pi", vec![("pig", "dan")]),
("pig", vec![("pig", "dan")]),
("pigg", vec![]),
("food", vec![]),
("bandana", vec![]),
("bare", vec![]),
];
for (term, expected) in test_data {
let expected: Vec<(Vec<u8>, Vec<u8>)> = expected
.into_iter()
.map(|(k, v)| (str_to_bytes!(k), str_to_bytes!(v)))
.collect();
let got = store
.search(&str_to_bytes!(term), 0, 0)
.expect(&format!("search for {}", term));
assert_eq!(&expected, &got);
}
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn background_task_compacts_db_file() {
fs::remove_dir_all(STORE_PATH).ok();
let mut store =
Store::new(STORE_PATH, None, None, None, Some(1), false).expect("create store");
store.clear().expect("store failed to clear");
let keys = get_keys();
let values = get_values();
insert_test_data(
&mut store,
&keys[0..2].to_vec(),
&values[0..2].to_vec(),
Some(1),
);
insert_test_data(&mut store, &keys[2..].to_vec(), &values[2..].to_vec(), None);
delete_keys(&mut store, &keys[2..3].to_vec());
let buffer_pool = acquire_lock!(store.buffer_pool).expect("acquire lock on buffer pool");
let db_file_path = buffer_pool.file_path.to_str().unwrap().to_owned();
drop(buffer_pool);
let original_file_size = get_file_size(&db_file_path);
thread::sleep(Duration::from_secs(4));
let final_file_size = get_file_size(&db_file_path);
let expected_file_size_reduction = keys[0..3]
.iter()
.zip(&values[0..3])
.map(|(k, v)| KeyValueEntry::new(k, v, 0).as_bytes().len() as u64)
.reduce(|accum, v| accum + v)
.unwrap();
assert_eq!(
original_file_size - final_file_size,
expected_file_size_reduction
);
let received_values = get_values_for_keys(&mut store, &keys);
let received_unchanged_values = &received_values[3..];
let received_removed_values = &received_values[0..3];
let expected_unchanged_values = wrap_values_in_result(&values[3..]);
let expected_expired_values: Vec<io::Result<Option<Vec<u8>>>> =
keys[0..3].iter().map(|_| Ok(None)).collect();
assert_list_eq!(&expected_unchanged_values, &received_unchanged_values);
assert_list_eq!(&expected_expired_values, &received_removed_values);
drop(store);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn background_task_compacts_search_index_file() {
fs::remove_dir_all(STORE_PATH).ok();
let mut store =
Store::new(STORE_PATH, None, None, None, Some(1), true).expect("create store");
store.clear().expect("store failed to clear");
let keys = to_byte_arrays_vector!(["foo", "bar", "fore", "band", "pig"]);
let values = to_byte_arrays_vector!(["eng", "port", "span", "nyoro", "dan"]);
insert_test_data(
&mut store,
&keys[0..2].to_vec(),
&values[0..2].to_vec(),
Some(1),
);
insert_test_data(&mut store, &keys[2..].to_vec(), &values[2..].to_vec(), None);
let search_index = store.search_index.as_ref().expect("has search index");
let search_index = acquire_lock!(search_index).expect("acquire lock on search index");
let search_index_file_path = search_index.file_path.to_str().unwrap().to_owned();
drop(search_index);
let original_file_size = get_file_size(&search_index_file_path);
thread::sleep(Duration::from_secs(4));
let final_file_size = get_file_size(&search_index_file_path);
let expected_file_size_reduction = 282u64;
assert_eq!(
original_file_size - final_file_size,
expected_file_size_reduction
);
let test_data = [
("f", vec![("fore", "span")]),
("fo", vec![("fore", "span")]),
("foo", vec![]),
("for", vec![("fore", "span")]),
("b", vec![("band", "nyoro")]),
("ba", vec![("band", "nyoro")]),
("bar", vec![]),
("ban", vec![("band", "nyoro")]),
("band", vec![("band", "nyoro")]),
("p", vec![("pig", "dan")]),
("pi", vec![("pig", "dan")]),
("pig", vec![("pig", "dan")]),
("pigg", vec![]),
("food", vec![]),
("bandana", vec![]),
("bare", vec![]),
];
for (term, expected) in test_data {
let expected: Vec<(Vec<u8>, Vec<u8>)> = expected
.into_iter()
.map(|(k, v)| (str_to_bytes!(k), str_to_bytes!(v)))
.collect();
let got = store
.search(&str_to_bytes!(term), 0, 0)
.expect(&format!("search for {}", term));
assert_eq!(&expected, &got);
}
drop(store);
fs::remove_dir_all(STORE_PATH).expect("delete store folder");
}
#[test]
#[serial]
fn get_does_not_err_for_empty_string_values() {
let key = "foo".as_bytes().to_vec();
let value = "".as_bytes().to_vec();
let mut store =
Store::new(STORE_PATH, None, None, None, Some(0), false).expect("create store");
store.clear().expect("store failed to clear");
store.set(&key, &value, None).expect("set key");
let got = store.get(&key).expect("get key first time");
assert_eq!(got, Some(value.clone()));
let got = store.get(&key).expect("get key second time");
assert_eq!(got, Some(value.clone()));
}
fn delete_keys(store: &mut Store, keys_to_delete: &Vec<Vec<u8>>) {
for k in keys_to_delete {
store.delete(k).expect(&format!("delete key {:?}", k));
}
}
fn get_file_size(file_path: &str) -> u64 {
let mut file = OpenOptions::new()
.read(true)
.open(file_path)
.expect(&format!("open file {}", file_path));
file.seek(SeekFrom::End(0)).expect("get file size")
}
fn get_values_for_keys(
store: &mut Store,
keys: &Vec<Vec<u8>>,
) -> Vec<io::Result<Option<Vec<u8>>>> {
let mut received_values = Vec::with_capacity(keys.len());
for k in keys {
let _ = &received_values.push(store.get(k));
}
received_values
}
fn insert_test_data(
store: &mut Store,
keys: &Vec<Vec<u8>>,
values: &Vec<Vec<u8>>,
ttl: Option<u64>,
) {
for (k, v) in keys.iter().zip(values) {
store
.set(k, v, ttl)
.expect(&format!("set key {:?}, value {:?}", k, v));
}
}
fn get_keys() -> Vec<Vec<u8>> {
to_byte_arrays_vector!(["hey", "hi", "yoo-hoo", "bonjour", "oloota", "orirota"])
}
fn get_values() -> Vec<Vec<u8>> {
to_byte_arrays_vector!([
"English",
"English",
"Slang",
"French",
"Runyoro",
"Runyakole",
])
}
fn wrap_values_in_result(values: &[Vec<u8>]) -> Vec<io::Result<Option<Vec<u8>>>> {
values.iter().map(|v| Ok(Some(v.clone()))).collect()
}
}