use std::{ops::Bound, path::PathBuf};
use bytes::Bytes;
use futures::TryStreamExt;
use tempest_core::test_utils::setup_tracing;
use tempest_io::VirtualIo;
use tempest_rt::{JoinHandle, block_on};
use crate::{DefaultStrategy, Storage, StorageHandle, batch::WriteBatch, config::StorageConfig};
fn storage() -> (StorageHandle, JoinHandle<()>) {
Storage::<VirtualIo, DefaultStrategy>::init(
PathBuf::from("/storage"),
StorageConfig::for_testing(),
)
}
async fn shutdown(storage: StorageHandle, join: JoinHandle<()>) {
drop(storage);
join.await;
}
fn b(s: &'static str) -> Bytes {
Bytes::from_static(s.as_bytes())
}
#[test]
fn test_storage_write_empty_batch() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
storage.write(WriteBatch::new()).await.unwrap();
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_write_single_put() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"red");
storage.write(batch).await.unwrap();
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_write_multiple_puts() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"red");
batch.put(b"banana", b"yellow");
batch.put(b"cherry", b"red");
storage.write(batch).await.unwrap();
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_write_sequential_batches() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"a", b"1");
storage.write(batch).await.unwrap();
let mut batch = WriteBatch::new();
batch.put(b"b", b"2");
storage.write(batch).await.unwrap();
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_get_missing_key() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let result = storage.get(b("apple")).await.unwrap();
assert_eq!(result, None);
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_get_existing_key() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"red");
storage.write(batch).await.unwrap();
let result = storage.get(b("apple")).await.unwrap();
assert_eq!(result, Some(b("red")));
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_get_overwritten_key() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"old");
storage.write(batch).await.unwrap();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"new");
storage.write(batch).await.unwrap();
let result = storage.get(b("apple")).await.unwrap();
assert_eq!(result, Some(b("new")));
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_get_deleted_key() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"red");
storage.write(batch).await.unwrap();
let mut batch = WriteBatch::new();
batch.delete(b"apple");
storage.write(batch).await.unwrap();
let result = storage.get(b("apple")).await.unwrap();
assert_eq!(result, None);
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_get_unrelated_key_after_delete() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"red");
batch.put(b"banana", b"yellow");
storage.write(batch).await.unwrap();
let mut batch = WriteBatch::new();
batch.delete(b"apple");
storage.write(batch).await.unwrap();
assert_eq!(storage.get(b("apple")).await.unwrap(), None);
assert_eq!(storage.get(b("banana")).await.unwrap(), Some(b("yellow")));
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_scan_unbounded() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"a", b"1");
batch.put(b"b", b"2");
batch.put(b"c", b"3");
storage.write(batch).await.unwrap();
let scan = storage
.scan(Bound::Unbounded, Bound::Unbounded)
.await
.unwrap();
let results: Vec<_> = scan.try_collect().await.unwrap();
assert_eq!(
results,
vec![(b("a"), b("1")), (b("b"), b("2")), (b("c"), b("3"))]
);
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_scan_included_start() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"a", b"1");
batch.put(b"b", b"2");
batch.put(b"c", b"3");
storage.write(batch).await.unwrap();
let scan = storage
.scan(Bound::Included(b("b")), Bound::Unbounded)
.await
.unwrap();
let results: Vec<_> = scan.try_collect().await.unwrap();
assert_eq!(results, vec![(b("b"), b("2")), (b("c"), b("3"))]);
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_scan_excluded_end() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"a", b"1");
batch.put(b"b", b"2");
batch.put(b"c", b"3");
storage.write(batch).await.unwrap();
let scan = storage
.scan(Bound::Unbounded, Bound::Excluded(b("c")))
.await
.unwrap();
let results: Vec<_> = scan.try_collect().await.unwrap();
assert_eq!(results, vec![(b("a"), b("1")), (b("b"), b("2"))]);
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_scan_both_bounds() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"red");
batch.put(b"banana", b"yellow");
batch.put(b"cherry", b"red");
batch.put(b"date", b"brown");
storage.write(batch).await.unwrap();
let scan = storage
.scan(Bound::Included(b("banana")), Bound::Excluded(b("date")))
.await
.unwrap();
let results: Vec<_> = scan.try_collect().await.unwrap();
assert_eq!(
results,
vec![(b("banana"), b("yellow")), (b("cherry"), b("red"))]
);
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_scan_empty_range() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"a", b"1");
batch.put(b"z", b"26");
storage.write(batch).await.unwrap();
let scan = storage
.scan(Bound::Included(b("m")), Bound::Excluded(b("n")))
.await
.unwrap();
let results: Vec<_> = scan.try_collect().await.unwrap();
assert!(results.is_empty());
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_scan_snapshot_isolation() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"banana", b"yellow");
batch.put(b"kiwi", b"green");
batch.put(b"lemon", b"yellow");
storage.write(batch).await.unwrap();
let scan = storage
.scan(Bound::Included(b("banana")), Bound::Excluded(b("orange")))
.await
.unwrap();
let mut batch = WriteBatch::new();
batch.put(b"banana", b"wrong color");
batch.put(b"cherry", b"red");
batch.delete(b"lemon");
storage.write(batch).await.unwrap();
let results: Vec<_> = scan.try_collect().await.unwrap();
assert_eq!(
results,
vec![
(b("banana"), b("yellow")),
(b("kiwi"), b("green")),
(b("lemon"), b("yellow")),
]
);
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_scan_dedup_latest_version() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"old");
storage.write(batch).await.unwrap();
let mut batch = WriteBatch::new();
batch.put(b"apple", b"new");
storage.write(batch).await.unwrap();
let scan = storage
.scan(Bound::Unbounded, Bound::Unbounded)
.await
.unwrap();
let results: Vec<_> = scan.try_collect().await.unwrap();
assert_eq!(results, vec![(b("apple"), b("new"))]);
shutdown(storage, join).await;
});
}
#[test]
fn test_storage_scan_deleted_key_absent() {
setup_tracing();
block_on(VirtualIo::default(), async {
let (storage, join) = storage();
let mut batch = WriteBatch::new();
batch.put(b"a", b"1");
batch.put(b"b", b"2");
batch.put(b"c", b"3");
storage.write(batch).await.unwrap();
let mut batch = WriteBatch::new();
batch.delete(b"b");
storage.write(batch).await.unwrap();
let scan = storage
.scan(Bound::Unbounded, Bound::Unbounded)
.await
.unwrap();
let results: Vec<_> = scan.try_collect().await.unwrap();
assert_eq!(results, vec![(b("a"), b("1")), (b("c"), b("3"))]);
shutdown(storage, join).await;
});
}