use std::path::PathBuf;
use tempest_core::test_utils::setup_tracing;
use tempest_io::VirtualIo;
use tempest_rt::block_on;
use std::rc::Rc;
use crate::{
base::{DefaultComparer, InternalKey, SeqNum},
config::SstWriteConfig,
iterator::{StorageIterator, mock::MockIterator},
};
use super::*;
type Mock = MockIterator<DefaultComparer>;
fn path() -> PathBuf {
PathBuf::from("/test.sst")
}
#[test]
fn test_write_single_block() {
setup_tracing();
block_on(VirtualIo::default(), async {
let source = Mock::new().add_items([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]);
let stats =
write::<VirtualIo, DefaultComparer, _>(path(), 3, SstWriteConfig::default(), source)
.await
.unwrap();
assert_eq!(stats.entry_count, 3);
assert!(stats.file_size > 0);
assert_eq!(stats.min_seqnum, SeqNum::new(10).unwrap());
assert_eq!(stats.max_seqnum, SeqNum::new(12).unwrap());
});
}
#[test]
fn test_write_seqnum_range() {
setup_tracing();
block_on(VirtualIo::default(), async {
let source =
Mock::new().add_items([(0, 20, "a"), (1, 10, "b"), (2, 30, "c"), (3, 15, "d")]);
let stats =
write::<VirtualIo, DefaultComparer, _>(path(), 4, SstWriteConfig::default(), source)
.await
.unwrap();
assert_eq!(stats.entry_count, 4);
assert_eq!(stats.min_seqnum, SeqNum::new(10).unwrap());
assert_eq!(stats.max_seqnum, SeqNum::new(30).unwrap());
});
}
#[test]
fn test_write_multi_block() {
setup_tracing();
let config = SstWriteConfig {
block_target_size: 32,
..Default::default()
};
block_on(VirtualIo::default(), async {
let source = Mock::new().add_items([
(0, 10, "value-a"),
(1, 11, "value-b"),
(2, 12, "value-c"),
(3, 13, "value-d"),
(4, 14, "value-e"),
(5, 15, "value-f"),
]);
let stats = write::<VirtualIo, DefaultComparer, _>(path(), 6, config, source)
.await
.unwrap();
assert_eq!(stats.entry_count, 6);
assert!(stats.file_size > 0);
assert_eq!(stats.min_seqnum, SeqNum::new(10).unwrap());
assert_eq!(stats.max_seqnum, SeqNum::new(15).unwrap());
});
}
#[test]
fn test_write_file_size_grows_with_entries() {
setup_tracing();
block_on(VirtualIo::default(), async {
let small = Mock::new().add_items([(0, 10, "a"), (1, 11, "b")]);
let stats_small = write::<VirtualIo, DefaultComparer, _>(
PathBuf::from("/small.sst"),
2,
SstWriteConfig::default(),
small,
)
.await
.unwrap();
let large = Mock::new().add_items([
(0, 10, "a"),
(1, 11, "b"),
(2, 12, "c"),
(3, 13, "d"),
(4, 14, "e"),
(5, 15, "f"),
(6, 16, "g"),
(7, 17, "h"),
]);
let stats_large = write::<VirtualIo, DefaultComparer, _>(
PathBuf::from("/large.sst"),
8,
SstWriteConfig::default(),
large,
)
.await
.unwrap();
assert!(stats_large.file_size > stats_small.file_size);
});
}
async fn write_and_load(
items: impl IntoIterator<Item = (u64, u64, &'static str)>,
) -> SstHandle<VirtualIo, DefaultComparer> {
let source = Mock::new().add_items(items.into_iter());
write::<VirtualIo, DefaultComparer, _>(path(), 0, SstWriteConfig::default(), source)
.await
.unwrap();
load::<VirtualIo, DefaultComparer>(path()).await.unwrap()
}
#[test]
fn test_load_valid() {
setup_tracing();
block_on(VirtualIo::default(), async {
let source = Mock::new().add_items([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]);
write::<VirtualIo, DefaultComparer, _>(path(), 3, SstWriteConfig::default(), source)
.await
.unwrap();
let handle = load::<VirtualIo, DefaultComparer>(path()).await.unwrap();
handle.close().await.unwrap();
});
}
#[test]
fn test_load_nonexistent() {
setup_tracing();
block_on(VirtualIo::default(), async {
let result = load::<VirtualIo, DefaultComparer>(path()).await;
assert!(result.is_err());
});
}
#[test]
fn test_load_invalid_magic() {
setup_tracing();
block_on(VirtualIo::default(), async {
let garbage = bytes::BytesMut::zeroed(256);
let fd = tempest_rt::open_file::<VirtualIo>(
path(),
tempest_io::OpenOptions::new().write(true).create(true),
)
.await
.unwrap();
tempest_rt::write_exact::<_, VirtualIo>(fd, garbage, 0)
.await
.0
.unwrap();
tempest_rt::close_file::<VirtualIo>(fd).await.unwrap();
let result = load::<VirtualIo, DefaultComparer>(path()).await;
assert!(result.is_err());
});
}
#[test]
fn test_get_existing_key() {
setup_tracing();
block_on(VirtualIo::default(), async {
let handle = write_and_load([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]).await;
let key = InternalKey::test_with_seqnum(1, 11);
let result = handle.get(&key).await.unwrap();
let (_, value) = result.expect("key should be found");
assert_eq!(value.as_ref(), b"b");
handle.close().await.unwrap();
});
}
#[test]
fn test_get_missing_key() {
setup_tracing();
block_on(VirtualIo::default(), async {
let handle = write_and_load([(0, 10, "a"), (1, 11, "b")]).await;
let key = InternalKey::test_with_seqnum(99, 10);
let result = handle.get(&key).await.unwrap();
assert!(result.is_none());
handle.close().await.unwrap();
});
}
#[test]
fn test_get_snapshot_visibility() {
setup_tracing();
block_on(VirtualIo::default(), async {
let handle = write_and_load([(0, 20, "a"), (1, 10, "b")]).await;
let key = InternalKey::test_with_seqnum(0, 10);
let result = handle.get(&key).await.unwrap();
assert!(
result.is_none(),
"should not see entry written after snapshot"
);
handle.close().await.unwrap();
});
}
type TestIter = SstIterator<VirtualIo, DefaultComparer>;
async fn next(iter: &mut TestIter) -> Option<(InternalKey<DefaultComparer>, bytes::Bytes)> {
<TestIter as StorageIterator<VirtualIo, DefaultComparer>>::next(iter)
.await
.unwrap()
}
async fn seek(iter: &mut TestIter, key_id: u64) {
<TestIter as StorageIterator<VirtualIo, DefaultComparer>>::seek(
iter,
InternalKey::test(key_id),
)
.await
.unwrap();
}
async fn make_iter(items: impl IntoIterator<Item = (u64, u64, &'static str)>) -> TestIter {
let handle = write_and_load(items).await;
SstIterator::new(Rc::new(handle))
}
#[test]
fn test_iter_full_scan() {
setup_tracing();
block_on(VirtualIo::default(), async {
let mut iter = make_iter([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]).await;
let (k, v) = next(&mut iter).await.unwrap();
assert_eq!(k.test_key_as_u64(), 0);
assert_eq!(v.as_ref(), b"a");
let (k, v) = next(&mut iter).await.unwrap();
assert_eq!(k.test_key_as_u64(), 1);
assert_eq!(v.as_ref(), b"b");
let (k, v) = next(&mut iter).await.unwrap();
assert_eq!(k.test_key_as_u64(), 2);
assert_eq!(v.as_ref(), b"c");
assert!(next(&mut iter).await.is_none());
Rc::try_unwrap(iter.into_handle())
.unwrap_or_else(|_| panic!("handle has extra refs"))
.close()
.await
.unwrap();
});
}
#[test]
fn test_iter_seek_forward() {
setup_tracing();
block_on(VirtualIo::default(), async {
let mut iter = make_iter([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]).await;
seek(&mut iter, 1).await;
let (k, v) = next(&mut iter).await.unwrap();
assert_eq!(k.test_key_as_u64(), 1);
assert_eq!(v.as_ref(), b"b");
let (k, _) = next(&mut iter).await.unwrap();
assert_eq!(k.test_key_as_u64(), 2);
assert!(next(&mut iter).await.is_none());
Rc::try_unwrap(iter.into_handle())
.unwrap_or_else(|_| panic!("handle has extra refs"))
.close()
.await
.unwrap();
});
}
#[test]
fn test_iter_seek_backward_noop() {
setup_tracing();
block_on(VirtualIo::default(), async {
let mut iter = make_iter([(0, 10, "a"), (1, 11, "b"), (2, 12, "c")]).await;
seek(&mut iter, 1).await;
next(&mut iter).await;
seek(&mut iter, 0).await;
let (k, _) = next(&mut iter).await.unwrap();
assert_eq!(k.test_key_as_u64(), 2);
assert!(next(&mut iter).await.is_none());
Rc::try_unwrap(iter.into_handle())
.unwrap_or_else(|_| panic!("handle has extra refs"))
.close()
.await
.unwrap();
});
}
#[test]
fn test_iter_across_block_boundary() {
setup_tracing();
let config = SstWriteConfig {
block_target_size: 32,
..Default::default()
};
block_on(VirtualIo::default(), async {
let items = [
(0, 10, "a"),
(1, 11, "b"),
(2, 12, "c"),
(3, 13, "d"),
(4, 14, "e"),
];
let source = Mock::new().add_items(items);
write::<VirtualIo, DefaultComparer, _>(path(), 0, config, source)
.await
.unwrap();
let handle = load::<VirtualIo, DefaultComparer>(path()).await.unwrap();
let mut iter = SstIterator::new(Rc::new(handle));
let mut keys = Vec::new();
while let Some((k, _)) = next(&mut iter).await {
keys.push(k.test_key_as_u64());
}
assert_eq!(keys, vec![0, 1, 2, 3, 4]);
Rc::try_unwrap(iter.into_handle())
.unwrap_or_else(|_| panic!("handle has extra refs"))
.close()
.await
.unwrap();
});
}