#![allow(unused_attributes)]
use anyhow::Result;
use chrono::Local;
use env_logger::fmt::Color;
use futures::{future, stream::futures_unordered::FuturesUnordered, FutureExt, StreamExt};
use log::Level;
use rand::Rng;
use std::{
env, fs,
io::{Seek, SeekFrom, Write},
path::Path,
path::PathBuf,
time::Duration,
};
use pearl::{Builder, Key, RefKey, Storage};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct KeyTest(Vec<u8>);
#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub struct RefKeyTest<'a>(&'a [u8]);
pub const MIN_DEFER_TIME: Duration = Duration::from_millis(100);
pub const MAX_DEFER_TIME: Duration = Duration::from_millis(600);
impl AsRef<[u8]> for KeyTest {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl AsRef<KeyTest> for KeyTest {
fn as_ref(&self) -> &KeyTest {
self
}
}
impl<'a> From<&'a [u8]> for RefKeyTest<'a> {
fn from(v: &'a [u8]) -> Self {
Self(v)
}
}
impl<'a> RefKey<'a> for RefKeyTest<'a> {}
impl<'a> Key<'a> for KeyTest {
const LEN: u16 = 4;
const MEM_SIZE: usize = 4 + std::mem::size_of::<Vec<u8>>();
type Ref = RefKeyTest<'a>;
}
impl Default for KeyTest {
fn default() -> Self {
Self(vec![0; 4])
}
}
impl<'a> From<&'a [u8]> for KeyTest {
fn from(a: &[u8]) -> Self {
let data = a.try_into().expect("key size mismatch");
Self(data)
}
}
impl From<Vec<u8>> for KeyTest {
fn from(mut v: Vec<u8>) -> Self {
v.resize(KeyTest::LEN as usize, 0);
Self(v)
}
}
impl KeyTest {
pub fn new(inner: u32) -> Self {
Self(inner.to_be_bytes().to_vec())
}
}
pub fn init(dir_name: &str) -> PathBuf {
env_logger::builder()
.format(|buf, record: &log::Record| {
let mut style = buf.style();
let color = match record.level() {
Level::Error => Color::Red,
Level::Warn => Color::Yellow,
Level::Info => Color::Green,
Level::Debug => Color::Cyan,
Level::Trace => Color::White,
};
style.set_color(color);
writeln!(
buf,
"[{} {:>24}:{:^4} {:^5}] - {}",
Local::now().format("%Y-%m-%dT%H:%M:%S"),
record.module_path().unwrap_or(""),
record.line().unwrap_or(0),
style.value(record.level()),
record.args(),
)
})
.try_init()
.unwrap_or(());
env::temp_dir().join(format!(
"pearl_test/{}/{}",
std::time::UNIX_EPOCH.elapsed().unwrap().as_secs() % 1_563_100_000,
dir_name
))
}
pub async fn default_test_storage_in(
dir_name: impl AsRef<Path>,
) -> Result<Storage<KeyTest>, String> {
create_test_storage(dir_name, 10_000).await
}
pub async fn create_test_storage(
dir_name: impl AsRef<Path>,
max_blob_size: u64,
) -> Result<Storage<KeyTest>, String> {
create_custom_test_storage(dir_name, |b| b.max_blob_size(max_blob_size)).await
}
pub async fn create_custom_test_storage(
dir_name: impl AsRef<Path>,
configurator: impl FnOnce(Builder) -> Builder,
) -> Result<Storage<KeyTest>, String> {
let path = env::temp_dir().join(dir_name);
let iodriver = pearl::IoDriver::new();
let builder = configurator(
Builder::new()
.work_dir(&path)
.set_io_driver(iodriver)
.blob_file_name_prefix("test")
.max_blob_size(1_000_000)
.max_data_in_blob(100_000)
.set_filter_config(Default::default())
.set_deferred_index_dump_times(MIN_DEFER_TIME, MAX_DEFER_TIME)
.allow_duplicates());
let mut storage = builder.build().unwrap();
storage.init().await.map_err(|e| e.to_string())?;
Ok(storage)
}
pub fn create_indexes(threads: usize, writes: usize) -> Vec<Vec<usize>> {
(0..threads)
.map(|i| (0..writes).map(|j| i * threads + j).collect())
.collect()
}
pub async fn clean(storage: Storage<KeyTest>, path: impl AsRef<Path>) {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
storage.close().await.expect("Storage closing error");
fs::remove_dir_all(path).expect("Test directory cleaning error");
}
pub async fn close_storage(storage: Storage<KeyTest>, expected_files: &[&PathBuf]) -> Result<()> {
let _ = wait_for(|| expected_files.iter().all(|p| p.exists()));
storage.close().await?;
Ok(())
}
pub async fn check_all_written(storage: &Storage<KeyTest>, keys: Vec<u32>) -> Result<(), String> {
let mut read_futures: FuturesUnordered<_> = keys
.iter()
.map(|key| {
storage
.read(KeyTest::new(*key))
.then(move |res| future::ready((res, *key)))
})
.collect();
let mut ok_count: usize = 0;
while let Some((res, key)) = read_futures.next().await {
match res {
Ok(_) => ok_count += 1,
Err(e) => println!("[{}] error reading {}", key, e),
}
}
if ok_count == keys.len() {
Ok(())
} else {
Err("Failed to read all keys".to_string())
}
}
pub fn generate_records(count: usize, avg_size: usize) -> Vec<(u32, Vec<u8>)> {
let mut gen = rand::thread_rng();
(0..count)
.map(|_i| {
let diff = gen.gen::<i32>() % (avg_size / 10) as i32;
let size = avg_size as i32 + diff;
let mut buf = vec![0; size as usize];
gen.fill(buf.as_mut_slice());
(gen.gen(), buf)
})
.collect()
}
pub enum CorruptionType {
ZeroedAtBegin(u64),
ZeroedAt(u64, u64),
}
pub fn corrupt_file(path: impl AsRef<Path>, corruption_type: CorruptionType) -> Result<()> {
let mut file = std::fs::OpenOptions::new()
.create(false)
.write(true)
.truncate(false)
.open(path)?;
let size = file.metadata()?.len();
match corruption_type {
CorruptionType::ZeroedAtBegin(zeroed_size) => {
let write_size = zeroed_size.min(size);
file.seek(SeekFrom::Start(0))?;
file.write_all(&vec![0u8; write_size as usize])?;
}
CorruptionType::ZeroedAt(start, len) => {
let write_size = len.min(size);
file.seek(SeekFrom::Start(start))?;
file.write_all(&vec![0u8; write_size as usize])?;
}
}
file.sync_all()?;
Ok(())
}
const MAX_WAIT_CYCLES: usize = 10;
const WAIT_DELAY: std::time::Duration = std::time::Duration::from_millis(20);
pub fn wait_for(condition: impl Fn() -> bool) -> bool {
(0..MAX_WAIT_CYCLES).fold(condition(), |evaluated, _| {
if !evaluated {
std::thread::sleep(WAIT_DELAY);
condition()
} else {
evaluated
}
})
}
pub fn build_rep_data(data_size: usize, slice: &mut [u8], records_amount: usize) -> Vec<Vec<u8>> {
let mut res = Vec::new();
for i in 0..records_amount {
slice[0] = (i % 256) as u8;
let mut data = Vec::new();
for _ in 0..(data_size / slice.len()) {
data.extend(slice.iter());
}
res.push(data);
}
res
}
pub fn cmp_records_collections(mut got: Vec<Vec<u8>>, expected: &[Vec<u8>]) -> bool {
if got.len() != expected.len() {
false
} else {
got.sort();
got.iter()
.zip(expected.iter())
.map(|(a, b)| a == b)
.filter(|b| *b)
.count()
== got.len()
}
}