use bytes::Bytes;
use kip_db::kernel::io::{FileExtension, IoFactory, IoType};
use kip_db::kernel::lsm::storage::KipStorage;
use kip_db::kernel::KernelResult;
use kip_db::kernel::Storage;
use std::io::{Read, Seek, SeekFrom, Write};
use tempfile::TempDir;
use walkdir::WalkDir;
#[test]
fn get_stored_value() -> KernelResult<()> {
#[cfg(feature = "sled")]
{
use kip_db::kernel::sled_storage::SledStorage;
get_stored_value_with_kv_store::<SledStorage>()?;
}
get_stored_value_with_kv_store::<KipStorage>()?;
Ok(())
}
fn get_stored_value_with_kv_store<T: Storage>() -> KernelResult<()> {
tokio_test::block_on(async move {
let key1: Vec<u8> = encode_key("key1")?;
let key2: Vec<u8> = encode_key("key2")?;
let value1: Vec<u8> = encode_key("value1")?;
let value2: Vec<u8> = encode_key("value2")?;
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let kv_store = T::open(temp_dir.path()).await?;
kv_store
.set(Bytes::from(key1.clone()), Bytes::from(value1.clone()))
.await?;
kv_store
.set(Bytes::from(key2.clone()), Bytes::from(value2.clone()))
.await?;
kv_store.flush().await?;
kv_store.get(&key1).await?;
kv_store.get(&key2).await?;
kv_store.flush().await?;
drop(kv_store);
let kv_store = T::open(temp_dir.path()).await?;
assert_eq!(kv_store.get(&key1).await?, Some(Bytes::from(value1)));
assert_eq!(kv_store.get(&key2).await?, Some(Bytes::from(value2)));
Ok(())
})
}
#[test]
fn overwrite_value() -> KernelResult<()> {
#[cfg(feature = "sled")]
{
use kip_db::kernel::sled_storage::SledStorage;
overwrite_value_with_kv_store::<SledStorage>()?;
}
overwrite_value_with_kv_store::<KipStorage>()?;
Ok(())
}
fn overwrite_value_with_kv_store<T: Storage>() -> KernelResult<()> {
tokio_test::block_on(async move {
let key1: Vec<u8> = encode_key("key1")?;
let value1: Vec<u8> = encode_key("value1")?;
let value2: Vec<u8> = encode_key("value2")?;
let value3: Vec<u8> = encode_key("value3")?;
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let kv_store = T::open(temp_dir.path()).await?;
kv_store
.set(Bytes::from(key1.clone()), Bytes::from(value1.clone()))
.await?;
kv_store.flush().await?;
assert_eq!(
kv_store.get(&key1).await?,
Some(Bytes::from(value1.clone()))
);
kv_store
.set(Bytes::from(key1.clone()), Bytes::from(value2.clone()))
.await?;
kv_store.flush().await?;
assert_eq!(
kv_store.get(&key1).await?,
Some(Bytes::from(value2.clone()))
);
drop(kv_store);
let kv_store = T::open(temp_dir.path()).await?;
assert_eq!(
kv_store.get(&key1).await?,
Some(Bytes::from(value2.clone()))
);
kv_store
.set(Bytes::from(key1.clone()), Bytes::from(value3.clone()))
.await?;
kv_store.flush().await?;
assert_eq!(
kv_store.get(&key1).await?,
Some(Bytes::from(value3.clone()))
);
Ok(())
})
}
#[test]
fn get_non_existent_value() -> KernelResult<()> {
#[cfg(feature = "sled")]
{
use kip_db::kernel::sled_storage::SledStorage;
get_non_existent_value_with_kv_store::<SledStorage>()?;
}
get_non_existent_value_with_kv_store::<KipStorage>()?;
Ok(())
}
fn get_non_existent_value_with_kv_store<T: Storage>() -> KernelResult<()> {
tokio_test::block_on(async move {
let key1: Vec<u8> = encode_key("key1")?;
let key2: Vec<u8> = encode_key("key2")?;
let value1: Vec<u8> = encode_key("value1")?;
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let kv_store = T::open(temp_dir.path()).await?;
kv_store
.set(Bytes::from(key1.clone()), Bytes::from(value1))
.await?;
assert_eq!(kv_store.get(&key2).await?, None);
kv_store.flush().await?;
drop(kv_store);
let kv_store = T::open(temp_dir.path()).await?;
assert_eq!(kv_store.get(&key2).await?, None);
Ok(())
})
}
#[test]
fn remove_non_existent_key() -> KernelResult<()> {
#[cfg(feature = "sled")]
{
use kip_db::kernel::sled_storage::SledStorage;
remove_non_existent_key_with_kv_store::<SledStorage>()?;
}
remove_non_existent_key_with_kv_store::<KipStorage>()?;
Ok(())
}
fn remove_non_existent_key_with_kv_store<T: Storage>() -> KernelResult<()> {
tokio_test::block_on(async move {
let key1: Vec<u8> = encode_key("key1")?;
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let kv_store = T::open(temp_dir.path()).await?;
assert!(kv_store.remove(&key1).await.is_err());
Ok(())
})
}
#[test]
fn remove_key() -> KernelResult<()> {
#[cfg(feature = "sled")]
{
use kip_db::kernel::sled_storage::SledStorage;
remove_key_with_kv_store::<SledStorage>()?;
}
remove_key_with_kv_store::<KipStorage>()?;
Ok(())
}
fn remove_key_with_kv_store<T: Storage>() -> KernelResult<()> {
tokio_test::block_on(async move {
let key1: Vec<u8> = encode_key("key1")?;
let value1: Vec<u8> = encode_key("value1")?;
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let kv_store = T::open(temp_dir.path()).await?;
kv_store
.set(Bytes::from(key1.clone()), Bytes::from(value1))
.await?;
assert!(kv_store.remove(&key1).await.is_ok());
assert_eq!(kv_store.get(&key1).await?, None);
Ok(())
})
}
#[test]
fn compaction() -> KernelResult<()> {
#[cfg(feature = "sled")]
{
use kip_db::kernel::sled_storage::SledStorage;
compaction_with_kv_store::<SledStorage>()?;
}
compaction_with_kv_store::<KipStorage>()?;
Ok(())
}
fn compaction_with_kv_store<T: Storage>() -> KernelResult<()> {
tokio_test::block_on(async move {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let kv_store = T::open(temp_dir.path()).await?;
let dir_size = || {
let entries = WalkDir::new(temp_dir.path()).into_iter();
let len: walkdir::Result<u64> = entries
.map(|res| {
res.and_then(|entry| entry.metadata())
.map(|metadata| metadata.len())
})
.sum();
len.expect("fail to get directory size")
};
let mut current_size = dir_size();
for iter in 0..1000 {
for key_id in 0..1000 {
let key = format!("key{}", key_id);
let value = format!("{}", iter);
kv_store
.set(
Bytes::from(encode_key(key.as_str())?),
Bytes::from(encode_key(value.as_str())?),
)
.await?
}
kv_store.flush().await?;
let new_size = dir_size();
if new_size > current_size {
current_size = new_size;
continue;
}
drop(kv_store);
let kv_store = T::open(temp_dir.path()).await?;
for key_id in 0..1000 {
let key = format!("key{}", key_id);
assert_eq!(
kv_store.get(&encode_key(key.as_str())?).await?,
Some(Bytes::from(encode_key(format!("{}", iter).as_str())?))
);
}
return Ok(());
}
panic!("No compaction detected");
})
}
#[test]
fn test_io() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let factory = IoFactory::new(temp_dir.path(), FileExtension::Log).unwrap();
io_type_test(&factory, IoType::Buf)?;
io_type_test(&factory, IoType::Direct)?;
Ok(())
}
fn io_type_test(factory: &IoFactory, io_type: IoType) -> KernelResult<()> {
let mut writer = factory.writer(1, io_type)?;
let data_write1 = vec![b'1', b'2', b'3'];
let data_write2 = vec![b'4', b'5', b'6'];
let pos_1 = writer.current_pos()?;
let len_1 = writer.write(&data_write1)?;
let pos_2 = writer.current_pos()?;
let len_2 = writer.write(&data_write2)?;
writer.flush()?;
let mut reader = factory.reader(1, io_type)?;
let mut buf = [0; 6];
reader.read_exact(&mut buf)?;
assert_eq!([b'1', b'2', b'3', b'4', b'5', b'6'], buf);
assert_eq!(pos_1, 0);
assert_eq!(pos_2, 3);
assert_eq!(len_1, 3);
assert_eq!(len_2, 3);
assert_eq!(reader.seek(SeekFrom::Start(2))?, 2);
assert_eq!(reader.seek(SeekFrom::End(-1))?, 5);
assert_eq!(reader.seek(SeekFrom::Current(-1))?, 4);
assert_eq!(reader.file_size()?, 6);
assert!(factory.exists(1)?);
factory.clean(1)?;
assert!(!factory.exists(1)?);
Ok(())
}
fn encode_key(key: &str) -> KernelResult<Vec<u8>> {
Ok(bincode::serialize(key)?)
}