use futures::stream::StreamExt;
use tempfile::TempDir;
use lsm::{Database, Params, StartMode, WriteBatch, WriteOptions};
const SM: StartMode = StartMode::CreateOrOverride;
#[cfg(feature = "async-io")]
use tokio_uring_executor::test as async_test;
#[cfg(not(feature = "async-io"))]
use tokio::test as async_test;
async fn test_init() -> (TempDir, Database) {
let _ = env_logger::builder().is_test(true).try_init();
let tmp_dir = tempfile::Builder::new()
.prefix("lsm-async-test-")
.tempdir()
.unwrap();
let mut db_path = tmp_dir.path().to_path_buf();
db_path.push("storage.lsm");
let params = Params {
db_path,
..Default::default()
};
let database = Database::new_with_params(SM, params)
.await
.expect("Failed to create database instance");
(tmp_dir, database)
}
#[async_test]
async fn get_put() {
let (_tmpdir, database) = test_init().await;
let key1 = String::from("Foo").into_bytes();
let key2 = String::from("Foz").into_bytes();
let value1 = String::from("Bar").into_bytes();
let value2 = String::from("Baz").into_bytes();
assert!(database.get(&key1).await.unwrap().is_none());
assert!(database.get(&key2).await.unwrap().is_none());
database.put(key1.clone(), value1.clone()).await.unwrap();
assert_eq!(
database.get(&key1).await.unwrap().unwrap().get_value(),
value1
);
assert!(database.get(&key2).await.unwrap().is_none());
database.put(key1.clone(), value2.clone()).await.unwrap();
assert_eq!(
database.get(&key1).await.unwrap().unwrap().get_value(),
value2
);
database.stop().await.unwrap();
}
#[async_test]
async fn iterate() {
const COUNT: u64 = 2500;
let (_tmpdir, database) = test_init().await;
let options = WriteOptions { sync: false };
for pos in 0..COUNT {
let key = format!("key_{pos:05}").into_bytes();
let value = format!("some_string_{pos}").into_bytes();
database.put_opts(key, value, &options).await.unwrap();
}
let mut pos = 0;
let mut iter = database.iter().await;
while let Some((key, val)) = iter.next().await {
let expected_key = format!("key_{pos:05}").into_bytes();
let expected_val = format!("some_string_{pos}").into_bytes();
assert_eq!(expected_key, key);
assert_eq!(expected_val, val.get_value());
pos += 1;
}
assert_eq!(pos, COUNT);
database.stop().await.unwrap();
}
#[async_test]
async fn range_iterate() {
const COUNT: u64 = 25_000;
let (_tmpdir, database) = test_init().await;
let options = WriteOptions { sync: false };
for pos in 0..COUNT {
let key = format!("key_{pos:05}").into_bytes();
let val = format!("some_string_{pos}").into_bytes();
database.put_opts(key, val, &options).await.unwrap();
}
let mut pos = 0;
let start = "key_00300".to_string().into_bytes();
let end = "key_10150".to_string().into_bytes();
let mut iter = database.range_iter(&start, &end).await;
while let Some((key, val)) = iter.next().await {
let real_pos = pos + 300;
let expected_key = format!("key_{real_pos:05}").into_bytes();
let expected_val = format!("some_string_{real_pos}").into_bytes();
assert_eq!(expected_key, key);
assert_eq!(expected_val, val.get_value());
pos += 1;
}
assert_eq!(pos, 9850);
database.stop().await.unwrap();
}
#[async_test]
async fn range_iterate_reverse() {
const COUNT: u64 = 25_000;
let (_tmpdir, database) = test_init().await;
let options = WriteOptions { sync: false };
for pos in 0..COUNT {
let key = format!("key_{pos:05}").into_bytes();
let value = format!("some_string_{pos}").into_bytes();
database.put_opts(key, value, &options).await.unwrap();
}
let mut pos = 0;
let start = "key_10150".to_string().into_bytes();
let end = "key_00300".to_string().into_bytes();
let mut iter = database.reverse_range_iter(&start, &end).await;
while let Some((key, val)) = iter.next().await {
let real_pos = 10150 - pos;
let expected_key = format!("key_{real_pos:05}").into_bytes();
let expected_val = format!("some_string_{real_pos}").into_bytes();
assert_eq!(expected_key, key);
assert_eq!(expected_val, val.get_value());
pos += 1;
}
assert_eq!(pos, 9850);
database.stop().await.unwrap();
}
#[async_test]
async fn range_iterate_empty() {
let (_tmpdir, database) = test_init().await;
const COUNT: u64 = 500;
let options = WriteOptions { sync: false };
for pos in 0..COUNT {
let key = format!("key_{pos:05}").into_bytes();
let value = format!("some_string_{pos}").into_bytes();
database.put_opts(key, value, &options).await.unwrap();
}
let start = "key_05300".to_string().into_bytes();
let end = "key_10150".to_string().into_bytes();
let mut iter = database.range_iter(&start, &end).await;
if iter.next().await.is_some() {
panic!("Found a key where there should be none");
}
database.stop().await.unwrap();
}
#[async_test]
async fn get_put_many() {
const COUNT: u64 = 1_000;
let (_tmpdir, database) = test_init().await;
let options = WriteOptions { sync: false };
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
let value = format!("some_string_{pos}").into_bytes();
database.put_opts(key, value, &options).await.unwrap();
}
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
assert_eq!(
database.get(&key).await.unwrap().unwrap().get_value(),
format!("some_string_{pos}").into_bytes(),
);
}
database.stop().await.unwrap();
}
#[async_test(flavor = "multi_thread", worker_threads = 4)]
async fn get_put_delete_large_entry() {
const SIZE: usize = 1000;
let (_tmpdir, database) = test_init().await;
let options = WriteOptions { sync: true };
for _ in 0..10 {
let key = "key_424245".to_string().into_bytes();
let mut value = Vec::new();
value.resize(SIZE, b'a');
database
.put_opts(key.clone(), value.clone(), &options)
.await
.unwrap();
assert_eq!(
database.get(&key).await.unwrap().unwrap().get_value(),
value
);
database.delete(key.clone()).await.unwrap();
assert!(database.get(&key).await.unwrap().is_none());
}
database.stop().await.unwrap();
}
#[async_test(flavor = "multi_thread", worker_threads = 4)]
async fn get_put_delete_many() {
const COUNT: u64 = 1_003;
let (_tmpdir, database) = test_init().await;
let options = WriteOptions { sync: false };
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
let value = format!("some_string_{pos}").into_bytes();
database.put_opts(key, value, &options).await.unwrap();
}
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
database.delete(key).await.unwrap();
}
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
assert!(database.get(&key).await.unwrap().is_none());
}
database.stop().await.unwrap();
}
#[async_test]
async fn override_some() {
const COUNT: u64 = 1_000;
let (_tmpdir, database) = test_init().await;
let options = WriteOptions { sync: false };
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
let value = format!("some_string_{pos}").into_bytes();
database.put_opts(key, value, &options).await.unwrap();
}
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
let value = format!("some_other_string_{pos}").into_bytes();
database.put_opts(key, value, &options).await.unwrap();
}
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
let value = format!("some_other_string_{pos}").into_bytes();
assert_eq!(
database.get(&key).await.unwrap().unwrap().get_value(),
value,
);
}
database.stop().await.unwrap();
}
#[async_test]
async fn override_many() {
const NCOUNT: u64 = 2_000;
const COUNT: u64 = 501;
let (_tmpdir, database) = test_init().await;
let options = WriteOptions { sync: false };
for pos in 0..NCOUNT {
let key = format!("key_{pos}").into_bytes();
let value = format!("some_string_{pos}").into_bytes();
database.put_opts(key, value, &options).await.unwrap();
}
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
let value = format!("some_other_string_{pos}").into_bytes();
database.put_opts(key, value, &options).await.unwrap();
}
for pos in 0..COUNT {
let key = format!("key_{pos}").into_bytes();
let value = format!("some_other_string_{pos}").into_bytes();
assert_eq!(
database.get(&key).await.unwrap().unwrap().get_value(),
value,
);
}
for pos in COUNT..NCOUNT {
let key = format!("key_{pos}").into_bytes();
let value = format!("some_string_{pos}").into_bytes();
assert_eq!(
database.get(&key).await.unwrap().unwrap().get_value(),
value,
);
}
database.stop().await.unwrap();
}
#[async_test]
async fn batched_write() {
const COUNT: u64 = 1000;
let (_tmpdir, database) = test_init().await;
let mut batch = WriteBatch::new();
for pos in 0..COUNT {
let key = format!("key{pos}").into_bytes();
let value = format!("value{pos}").into_bytes();
batch.put(key, value);
}
database.write(batch).await.unwrap();
for pos in 0..COUNT {
let key = format!("key{pos}").into_bytes();
let value = format!("value{pos}").into_bytes();
assert_eq!(
database.get(&key).await.unwrap().unwrap().get_value(),
value
);
}
database.stop().await.unwrap();
}