#[cfg(test)]
mod tests {
use crate::tests::workload::{FilterWorkload, SSTContructor};
use crate::{
bucket::{Bucket, BucketMap},
consts::{BUCKET_HIGH, MIN_TRESHOLD},
err::Error,
};
use std::sync::Arc;
use tempfile::tempdir;
use tokio::fs;
use uuid::Uuid;
#[tokio::test]
async fn test_bucket_new() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_new");
let new_bucket = Bucket::new(path.to_owned()).await;
assert!(new_bucket.is_ok());
let new_bucket = new_bucket.unwrap();
let new_dir = new_bucket.dir.to_str().unwrap();
let prefix = new_dir.rfind("bucket").unwrap();
assert_eq!(&new_dir[..prefix - 1], path.to_str().unwrap());
assert_eq!(new_bucket.size, 0);
assert_eq!(new_bucket.avarage_size, 0);
assert!(new_bucket.sstables.read().await.is_empty());
}
#[tokio::test]
async fn test_bucket_from_with_empty() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_empty");
let id = Uuid::new_v4();
let average_size = 0;
let sstables = Vec::new();
let res = Bucket::from(path.to_owned(), id, sstables, average_size).await;
assert!(res.is_ok());
let new_bucket = res.unwrap();
assert_eq!(new_bucket.dir, path);
assert_eq!(new_bucket.avarage_size, average_size);
assert_eq!(new_bucket.id, id);
assert!(new_bucket.sstables.read().await.is_empty());
}
#[tokio::test]
async fn test_bucket_from_with_sstables() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_with_sstables");
let id = Uuid::new_v4();
let sst_count = 3;
let sst_samples = SSTContructor::generate_ssts(sst_count).await;
let sst_meta = sst_samples
.iter()
.map(|s| tokio::spawn(fs::metadata(s.data_file.path.clone())));
let mut all_sstable_size = 0;
for meta_task in sst_meta {
let meta_data = meta_task
.await
.map_err(|err| Error::GetFileMetaData(err.into()))
.unwrap();
all_sstable_size += meta_data.unwrap().len() as usize;
}
let expected_avg = all_sstable_size / sst_count as usize;
let res = Bucket::from(path.to_owned(), id, sst_samples, expected_avg).await;
assert!(res.is_ok());
let new_bucket = res.unwrap();
assert_eq!(new_bucket.dir, path);
assert_eq!(new_bucket.avarage_size, expected_avg);
assert_eq!(new_bucket.id, id);
assert_eq!(new_bucket.sstables.read().await.len(), sst_count as usize);
}
#[tokio::test]
async fn test_cal_average_size() {
let sst_count = 3;
let sst_samples = SSTContructor::generate_ssts(sst_count).await;
let sst_meta = sst_samples
.iter()
.map(|s| tokio::spawn(fs::metadata(s.data_file.path.clone())));
let mut all_sstable_size = 0;
for meta_task in sst_meta {
let meta_data = meta_task
.await
.map_err(|err| Error::GetFileMetaData(err.into()))
.unwrap();
all_sstable_size += meta_data.unwrap().len() as usize;
}
let expected_avg = all_sstable_size / sst_count as usize;
let actual_avg = Bucket::cal_average_size(sst_samples).await;
assert!(actual_avg.is_ok());
assert_eq!(actual_avg.unwrap(), expected_avg);
}
#[tokio::test]
async fn test_sstcount_exceed_threshold() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_exceed_threshold");
let new_bucket = Bucket::new(path.to_owned()).await.unwrap();
let sst_count = 5;
let sst_samples = SSTContructor::generate_ssts(sst_count).await;
for s in sst_samples {
new_bucket.sstables.write().await.push(s)
}
assert!(new_bucket.sstable_count_exceeds_threshhold().await);
new_bucket.sstables.write().await.clear();
assert!(!(new_bucket.sstable_count_exceeds_threshhold().await));
}
#[tokio::test]
async fn test_extract_sstable_to_compact() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_extract_sstable");
let new_bucket = Bucket::new(path.to_owned()).await.unwrap();
let sst_count = 5;
let sst_samples = SSTContructor::generate_ssts(sst_count).await;
let sst_meta = sst_samples
.iter()
.map(|s| tokio::spawn(fs::metadata(s.data_file.path.clone())));
let mut all_sstable_size = 0;
for meta_task in sst_meta {
let meta_data = meta_task
.await
.map_err(|err| Error::GetFileMetaData(err.into()))
.unwrap();
all_sstable_size += meta_data.unwrap().len() as usize;
}
for s in sst_samples {
new_bucket.sstables.write().await.push(s)
}
let expected_avg = all_sstable_size / sst_count as usize;
let extracted_ssts = new_bucket.extract_sstables().await;
assert!(extracted_ssts.is_ok());
let (ssts, avg) = extracted_ssts.unwrap();
assert_eq!(avg, expected_avg);
assert_eq!(ssts.len(), sst_count as usize);
}
#[tokio::test]
async fn table_fits_into_bucket() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_fits");
let mut new_bucket = Bucket::new(path.to_owned()).await.unwrap();
let sst_sample = SSTContructor::generate_ssts(2).await;
for s in sst_sample {
new_bucket.sstables.write().await.push(s)
}
let mut sst_within_size_range = SSTContructor::generate_ssts(1).await[0].to_owned();
new_bucket.avarage_size = sst_within_size_range.size();
let fits_into_bucket =
new_bucket.fits_into_bucket(Arc::new(Box::new(sst_within_size_range.to_owned())));
assert!(fits_into_bucket);
sst_within_size_range.size = ((new_bucket.avarage_size as f64 * BUCKET_HIGH) * 2.0) as usize;
let fits_into_bucket =
new_bucket.fits_into_bucket(Arc::new(Box::new(sst_within_size_range.to_owned())));
assert!(!fits_into_bucket);
new_bucket.avarage_size = ((new_bucket.avarage_size as f64 * BUCKET_HIGH) * 2.0) as usize;
let fits_into_bucket =
new_bucket.fits_into_bucket(Arc::new(Box::new(sst_within_size_range.to_owned())));
assert!(fits_into_bucket);
}
#[tokio::test]
async fn test_bucket_map_new() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_map_new");
let bucket_map = BucketMap::new(path.to_owned()).await;
assert!(bucket_map.is_ok());
let bucket_map = bucket_map.unwrap();
assert_eq!(bucket_map.dir, path);
assert_eq!(bucket_map.buckets.len(), 0);
}
#[tokio::test]
async fn test_bucket_map_extract_imbalanced_buckets() {
let root = tempdir().unwrap();
let path = root.path().join("imbalanced_bucket");
let new_bucket1 = Bucket::new(path.to_owned()).await.unwrap();
let sst_count = 6;
let sst_samples = SSTContructor::generate_ssts(sst_count).await;
for s in sst_samples.iter().cloned() {
new_bucket1.sstables.write().await.push(s)
}
let new_bucket2 = Bucket::new(path.to_owned()).await.unwrap();
for s in sst_samples.iter().cloned() {
new_bucket2.sstables.write().await.push(s)
}
let new_bucket3 = Bucket::new(path.to_owned()).await.unwrap();
for s in sst_samples.iter().cloned() {
new_bucket3.sstables.write().await.push(s)
}
let new_bucket4 = Bucket::new(path.to_owned()).await.unwrap();
for s in sst_samples.iter().cloned() {
new_bucket4.sstables.write().await.push(s)
}
let root = tempdir().unwrap();
let path = root.path().join("buket_map_extract");
let mut bucket_map = BucketMap::new(path.to_owned()).await.unwrap();
bucket_map.buckets.insert(new_bucket1.id, new_bucket1.to_owned());
bucket_map.buckets.insert(new_bucket2.id, new_bucket2);
bucket_map.buckets.insert(new_bucket3.id, new_bucket3);
bucket_map.buckets.insert(new_bucket4.id, new_bucket4);
let imbalanced_buckets = bucket_map.extract_imbalanced_buckets().await;
assert!(imbalanced_buckets.is_ok());
let (buckets, ssts_to_remove) = imbalanced_buckets.unwrap();
let mut expected_ssts_to_remove_in_buckets = 0;
assert_eq!(buckets.len(), 4);
for bucket in buckets.iter().cloned() {
let sst_len = bucket.sstables.read().await.len();
assert!(sst_len == sst_count as usize);
assert!(sst_len > MIN_TRESHOLD);
expected_ssts_to_remove_in_buckets += sst_len;
}
let mut expected_ssts_to_remove_from_file = 0;
for (_id, ssts) in ssts_to_remove {
expected_ssts_to_remove_from_file += ssts.len();
}
assert_eq!(
expected_ssts_to_remove_from_file,
expected_ssts_to_remove_in_buckets
);
bucket_map.buckets.clear();
let imbalanced_buckets = bucket_map.extract_imbalanced_buckets().await;
assert!(imbalanced_buckets.is_ok());
let (buckets, sst_to_remove) = imbalanced_buckets.unwrap();
assert_eq!(buckets.len(), 0);
assert_eq!(sst_to_remove.len(), 0);
new_bucket1.sstables.write().await.clear();
new_bucket1.sstables.write().await.push(sst_samples[0].to_owned());
bucket_map.buckets.insert(new_bucket1.id, new_bucket1);
let imbalanced_buckets = bucket_map.extract_imbalanced_buckets().await;
assert!(imbalanced_buckets.is_ok());
let (buckets, sst_to_remove) = imbalanced_buckets.unwrap();
assert_eq!(buckets.len(), 0);
assert_eq!(sst_to_remove.len(), 0);
}
#[tokio::test]
async fn test_bucket_map_is_balanced() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_map_is_balanced");
let new_bucket1 = Bucket::new(path.to_owned()).await.unwrap();
let sst_count = 6;
let sst_samples = SSTContructor::generate_ssts(sst_count).await;
for s in sst_samples.iter().cloned() {
new_bucket1.sstables.write().await.push(s)
}
let new_bucket2 = Bucket::new(path.to_owned()).await.unwrap();
for s in sst_samples.iter().cloned() {
new_bucket2.sstables.write().await.push(s)
}
let new_bucket3 = Bucket::new(path.to_owned()).await.unwrap();
for s in sst_samples.iter().cloned() {
new_bucket3.sstables.write().await.push(s)
}
let new_bucket4 = Bucket::new(path.to_owned()).await.unwrap();
for s in sst_samples.iter().cloned() {
new_bucket4.sstables.write().await.push(s)
}
let root = tempdir().unwrap();
let path = root.path().join("bucket_map_is_balanced");
let mut bucket_map = BucketMap::new(path.to_owned()).await.unwrap();
bucket_map.buckets.insert(new_bucket1.id, new_bucket1.to_owned());
bucket_map.buckets.insert(new_bucket2.id, new_bucket2);
bucket_map.buckets.insert(new_bucket3.id, new_bucket3);
bucket_map.buckets.insert(new_bucket4.id, new_bucket4);
let is_balanced = bucket_map.is_balanced().await;
assert!(!is_balanced);
bucket_map.buckets.clear();
let is_balanced = bucket_map.is_balanced().await;
assert!(is_balanced);
new_bucket1.sstables.write().await.clear();
new_bucket1.sstables.write().await.push(sst_samples[0].to_owned());
bucket_map.buckets.insert(new_bucket1.id, new_bucket1);
let is_balanced = bucket_map.is_balanced().await;
assert!(is_balanced);
}
#[tokio::test]
async fn table_insert_to_appropriate_bucket() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_map_appropriate_bucket");
let mut bucket_map = BucketMap::new(path.to_owned()).await.unwrap();
let false_pos = 0.1;
let mut sst_within_size_range = SSTContructor::generate_ssts(1).await[0].to_owned();
sst_within_size_range.load_entries_from_file().await.unwrap();
sst_within_size_range.filter = Some(FilterWorkload::from(
false_pos,
sst_within_size_range.entries.to_owned(),
));
let insert_res = bucket_map
.insert_to_appropriate_bucket(Arc::new(Box::new(sst_within_size_range.to_owned())))
.await;
assert!(insert_res.is_ok());
assert_eq!(bucket_map.buckets.len(), 1);
let insert_res = bucket_map
.insert_to_appropriate_bucket(Arc::new(Box::new(sst_within_size_range.to_owned())))
.await;
assert!(insert_res.is_ok());
assert_eq!(bucket_map.buckets.len(), 1);
sst_within_size_range.size = ((sst_within_size_range.size as f64 * BUCKET_HIGH) * 2.0) as usize;
let insert_res = bucket_map
.insert_to_appropriate_bucket(Arc::new(Box::new(sst_within_size_range.to_owned())))
.await;
assert!(insert_res.is_ok());
assert_eq!(bucket_map.buckets.len(), 2);
}
#[tokio::test]
async fn test_delete_sstables() {
let root = tempdir().unwrap();
let path = root.path().join("bucket_map_delete_sstables");
let new_bucket1 = Bucket::new(path.to_owned()).await.unwrap();
let sst_count = 6;
let sst_samples = SSTContructor::generate_ssts(sst_count).await;
let sst1 = tempdir().unwrap().path().to_owned();
let sst2 = tempdir().unwrap().path().to_owned();
let sst3 = tempdir().unwrap().path().to_owned();
let sst4 = tempdir().unwrap().path().to_owned();
let sst5 = tempdir().unwrap().path().to_owned();
let sst6 = tempdir().unwrap().path().to_owned();
let ssts = [sst1, sst2, sst3, sst4, sst5, sst6];
for (idx, mut s) in sst_samples.iter().cloned().enumerate() {
s.dir = ssts[idx].to_owned().to_path_buf();
new_bucket1.sstables.write().await.push(s)
}
let new_bucket2 = Bucket::new(path.to_owned()).await.unwrap();
for (idx, mut s) in sst_samples.iter().cloned().enumerate() {
s.dir = ssts[idx].to_owned().to_path_buf();
new_bucket2.sstables.write().await.push(s)
}
let new_bucket3 = Bucket::new(path.to_owned()).await.unwrap();
for (idx, mut s) in sst_samples.iter().cloned().enumerate() {
s.dir = ssts[idx].to_owned().to_path_buf();
new_bucket3.sstables.write().await.push(s)
}
let new_bucket4 = Bucket::new(path.to_owned()).await.unwrap();
for (idx, mut s) in sst_samples.iter().cloned().enumerate() {
s.dir = ssts[idx].to_owned().to_path_buf();
new_bucket4.sstables.write().await.push(s)
}
let new_bucket5 = Bucket::new(path.to_owned()).await.unwrap();
for (idx, mut s) in sst_samples.iter().cloned().enumerate() {
s.dir = ssts[idx].to_owned().to_path_buf();
new_bucket5.sstables.write().await.push(s)
}
let root = tempdir().unwrap();
let path = root.path().join(".");
let mut bucket_map = BucketMap::new(path.to_owned()).await.unwrap();
bucket_map.buckets.insert(new_bucket1.id, new_bucket1.to_owned());
bucket_map.buckets.insert(new_bucket2.id, new_bucket2);
bucket_map.buckets.insert(new_bucket3.id, new_bucket3);
bucket_map.buckets.insert(new_bucket4.id, new_bucket4);
bucket_map.buckets.insert(new_bucket5.id, new_bucket5);
let imbalanced_buckets = bucket_map.extract_imbalanced_buckets().await;
assert!(imbalanced_buckets.is_ok());
let (buckets, ssts_to_remove) = imbalanced_buckets.unwrap();
assert_eq!(buckets.len(), 5);
let delete_res = bucket_map.delete_ssts(&ssts_to_remove).await;
assert!(delete_res.is_ok());
assert_eq!(bucket_map.buckets.len(), 0);
}
}