#[cfg(test)]
mod tests {
use crate::consts::{SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8};
use crate::db::{DataStore, SizeUnit};
use crate::err::Error;
use crate::gc::garbage_collector::GC;
use crate::types::Key;
use std::sync::Arc;
use tempfile::tempdir;
use tokio::sync::RwLock;
async fn setup(
store: Arc<RwLock<DataStore<'static, Key>>>,
workload: &crate::tests::workload::Workload,
prepare_delete: bool,
) -> Result<(), Error> {
let _ = env_logger::builder().is_test(true).try_init();
let (_, data) = workload.generate_workload_data_as_vec();
workload.insert_parallel(&data, store.clone()).await?;
if prepare_delete {
let _ = store.write().await.put("test_key", "test").await?;
let _ = store.write().await.delete("test_key").await?;
}
Ok(())
}
#[tokio::test]
async fn datastore_gc_test_success() {
let root = tempdir().unwrap();
let path = root.path().join("gc_test_1");
let s_engine = DataStore::open_without_background("test", path.clone())
.await
.unwrap();
let store = Arc::new(RwLock::new(s_engine));
let workload_size = 5000;
let key_len = 5;
let val_len = 5;
let write_read_ratio = 0.5;
let workload =
crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio);
if let Err(err) = setup(store.clone(), &workload, true).await {
log::error!("Setup failed {}", err);
return;
}
let storage_reader = store.read().await;
let config = storage_reader.gc.config.clone();
#[allow(unused_variables)] let res = GC::gc_handler(
&config,
Arc::clone(&storage_reader.gc_table),
Arc::clone(&storage_reader.gc_log),
Arc::clone(&storage_reader.key_range),
Arc::clone(&storage_reader.read_only_memtables),
Arc::clone(&storage_reader.gc_updated_entries),
Arc::clone(&storage_reader.gc.punch_marker),
)
.await;
#[cfg(target_os = "linux")]
{
assert!(res.is_ok())
}
}
#[tokio::test]
async fn datastore_gc_test_unsupported_platform() {
let root = tempdir().unwrap();
let path = root.path().join("gc_test_2");
let s_engine = DataStore::open_without_background("test", path.clone())
.await
.unwrap();
let store = Arc::new(RwLock::new(s_engine));
let workload_size = 5000;
let key_len = 5;
let val_len = 5;
let write_read_ratio = 0.5;
let workload =
crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio);
if let Err(err) = setup(store.clone(), &workload, true).await {
log::error!("Setup failed {}", err);
return;
}
let storage_reader = store.read().await;
let config = storage_reader.gc.config.clone();
let _res = GC::gc_handler(
&config,
Arc::clone(&storage_reader.gc_table),
Arc::clone(&storage_reader.gc_log),
Arc::clone(&storage_reader.key_range),
Arc::clone(&storage_reader.read_only_memtables),
Arc::clone(&storage_reader.gc_updated_entries),
Arc::clone(&storage_reader.gc.punch_marker),
)
.await;
#[cfg(not(target_os = "linux"))]
{
assert!(_res.is_ok());
}
}
#[tokio::test]
async fn datastore_gc_test_tail_shifted() {
let root = tempdir().unwrap();
let path = root.path().join("gc_test_3");
let s_engine = DataStore::open_without_background("test", path.clone())
.await
.unwrap();
let store = Arc::new(RwLock::new(s_engine));
let workload_size = 5000;
let key_len = 5;
let val_len = 5;
let write_read_ratio = 0.5;
let workload =
crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio);
if let Err(err) = setup(store.clone(), &workload, true).await {
log::error!("Setup failed {}", err);
return;
}
let storage_reader = store.read().await;
let config = storage_reader.gc.config.clone();
let initial_tail_offset = storage_reader.gc_log.read().await.tail_offset;
let _ = GC::gc_handler(
&config,
Arc::clone(&storage_reader.gc_table),
Arc::clone(&storage_reader.gc_log),
Arc::clone(&storage_reader.key_range),
Arc::clone(&storage_reader.read_only_memtables),
Arc::clone(&storage_reader.gc_updated_entries),
Arc::clone(&storage_reader.gc.punch_marker),
)
.await;
drop(storage_reader);
let _ = store.write().await.put("test_key", "test_val").await;
assert!(store.read().await.gc.vlog.read().await.tail_offset >= initial_tail_offset);
}
#[tokio::test]
async fn datastore_gc_test_free_before_synchronization() {
let root = tempdir().unwrap();
let path = root.path().join("gc_test_free");
let s_engine = DataStore::open_without_background("test", path.clone())
.await
.unwrap();
let store = Arc::new(RwLock::new(s_engine));
let workload_size = 5000;
let key_len = 5;
let val_len = 5;
let write_read_ratio = 0.5;
let workload =
crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio);
if let Err(err) = setup(store.clone(), &workload, true).await {
log::error!("Setup failed {}", err);
return;
}
let storage_reader = store.read().await;
let config = storage_reader.gc.config.clone();
let initial_tail_offset = storage_reader.gc_log.read().await.tail_offset;
let _ = GC::gc_handler(
&config,
Arc::clone(&storage_reader.gc_table),
Arc::clone(&storage_reader.gc_log),
Arc::clone(&storage_reader.key_range),
Arc::clone(&storage_reader.read_only_memtables),
Arc::clone(&storage_reader.gc_updated_entries),
Arc::clone(&storage_reader.gc.punch_marker),
)
.await;
drop(storage_reader);
assert!(store.read().await.gc.vlog.read().await.tail_offset == initial_tail_offset);
}
#[tokio::test]
async fn datastore_gc_test_tail_shifted_to_correct_position() {
let bytes_to_scan_for_garbage_colection = SizeUnit::Bytes.as_bytes(100);
let root = tempdir().unwrap();
let path = root.path().join("gc_test_4");
let s_engine = DataStore::open_without_background("test", path.clone())
.await
.unwrap();
let store = Arc::new(RwLock::new(s_engine));
let workload_size = 5;
let key_len = 5;
let val_len = 5;
let write_read_ratio = 0.5;
let workload =
crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio);
if let Err(err) = setup(store.clone(), &workload, true).await {
log::error!("Setup failed {}", err);
return;
}
let string_length = 5;
let vaue_len = 3;
let storage_reader = store.read().await;
let mut config = storage_reader.gc.config.clone();
let initial_tail_offset = storage_reader.gc_log.read().await.tail_offset;
config.gc_chunk_size = bytes_to_scan_for_garbage_colection;
let _ = GC::gc_handler(
&config,
Arc::clone(&storage_reader.gc_table),
Arc::clone(&storage_reader.gc_log),
Arc::clone(&storage_reader.key_range),
Arc::clone(&storage_reader.read_only_memtables),
Arc::clone(&storage_reader.gc_updated_entries),
Arc::clone(&storage_reader.gc.punch_marker),
)
.await;
drop(storage_reader);
let _ = store.write().await.put("test_key", "test_val").await;
let max_extention_length = SIZE_OF_U32 +SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8 + string_length + vaue_len; assert!(
store.read().await.gc.vlog.read().await.tail_offset
<= initial_tail_offset + bytes_to_scan_for_garbage_colection + max_extention_length
);
}
#[tokio::test]
async fn datastore_gc_test_head_shifted() {
let bytes_to_scan_for_garbage_colection = SizeUnit::Bytes.as_bytes(100);
let root = tempdir().unwrap();
let path = root.path().join("gc_test_5");
let s_engine = DataStore::open_without_background("test", path.clone())
.await
.unwrap();
let store = Arc::new(RwLock::new(s_engine));
let _ = store.write().await.put("test_key", "test_val").await;
let _ = store.write().await.delete("test_key").await;
let workload_size = 2;
let key_len = 5;
let val_len = 5;
let write_read_ratio = 0.5;
let workload =
crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio);
if let Err(err) = setup(store.clone(), &workload, true).await {
log::error!("Setup failed {}", err);
return;
}
(store.write().await).gc.config.gc_chunk_size = bytes_to_scan_for_garbage_colection;
let storage_reader = store.read().await;
let initial_head_offset = storage_reader.gc_log.read().await.head_offset;
let _ = GC::gc_handler(
&storage_reader.gc.config.clone(),
Arc::clone(&storage_reader.gc_table),
Arc::clone(&storage_reader.gc_log),
Arc::clone(&storage_reader.key_range),
Arc::clone(&storage_reader.read_only_memtables),
Arc::clone(&storage_reader.gc_updated_entries),
Arc::clone(&storage_reader.gc.punch_marker),
)
.await;
drop(storage_reader);
let _ = store.write().await.put("test_key", "test_val").await;
assert!(store.read().await.gc.vlog.read().await.head_offset != initial_head_offset);
}
#[tokio::test]
async fn datastore_gc_test_no_entry_to_collect() {
let prepare_delete = false;
let root = tempdir().unwrap();
let path = root.path().join("gc_test_no_delete");
let s_engine = DataStore::open_without_background("test", path.clone())
.await
.unwrap();
let store = Arc::new(RwLock::new(s_engine));
let workload_size = 5000;
let key_len = 5;
let val_len = 5;
let write_read_ratio = 0.5;
let workload =
crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio);
if let Err(err) = setup(store.clone(), &workload, prepare_delete).await {
log::error!("Setup failed {}", err);
return;
}
let storage_reader = store.read().await;
let config = storage_reader.gc.config.clone();
let initial_tail_offset = storage_reader.gc_log.read().await.tail_offset;
let _ = GC::gc_handler(
&config,
Arc::clone(&storage_reader.gc_table),
Arc::clone(&storage_reader.gc_log),
Arc::clone(&storage_reader.key_range),
Arc::clone(&storage_reader.read_only_memtables),
Arc::clone(&storage_reader.gc_updated_entries),
Arc::clone(&storage_reader.gc.punch_marker),
)
.await;
drop(storage_reader);
assert!(store.read().await.gc.vlog.read().await.tail_offset == initial_tail_offset);
}
#[tokio::test]
async fn datastore_gc_test_punch_hole() {
#[cfg(target_os = "linux")]
{
use std::io::{Read, Seek, SeekFrom, Write};
use tempfile::NamedTempFile;
const PUNCH_START: i64 = 0;
const PUNCH_LENGTH: usize = 7;
let mut temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path().to_path_buf();
writeln!(temp_file, "Sample1Sample2Sample3Sample4Sample5Sample6").unwrap();
temp_file.flush().unwrap();
temp_file.as_file_mut().seek(SeekFrom::Start(0)).unwrap();
let mut buffer = [0; PUNCH_LENGTH];
let bytes_read = temp_file.read(&mut buffer).unwrap();
assert_eq!(bytes_read, PUNCH_LENGTH);
assert_eq!(&buffer, b"Sample1");
let punch_res = GC::punch_holes(file_path, PUNCH_START, PUNCH_LENGTH as i64).await;
assert!(punch_res.is_ok());
let inner_file = temp_file.as_file_mut();
inner_file.seek(SeekFrom::Start(0)).unwrap();
let mut buffer = [0; PUNCH_LENGTH];
let bytes_read = inner_file.read(&mut buffer).unwrap();
assert_eq!(bytes_read, PUNCH_LENGTH);
assert_eq!(&buffer, &[0; 7]); }
}
}