use std::ops::RangeBounds;
use std::sync::Arc;
use bytes::Bytes;
use fail_parallel::FailPointRegistry;
use object_store::path::Path;
use object_store::prefix::PrefixStore;
use object_store::registry::{DefaultObjectStoreRegistry, ObjectStoreRegistry};
use object_store::ObjectStore;
use crate::compactor::COMPACTOR_TASK_NAME;
use crate::db_transaction::DbTransaction;
use crate::dispatcher::MessageHandlerExecutor;
use crate::garbage_collector::GC_TASK_NAME;
use crate::transaction_manager::IsolationLevel;
use parking_lot::RwLock;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use crate::batch::WriteBatch;
use crate::batch_write::{WriteBatchMessage, WRITE_BATCH_TASK_NAME};
use crate::bytes_range::BytesRange;
use crate::cached_object_store::CachedObjectStore;
use crate::clock::MonotonicClock;
use crate::clock::{LogicalClock, SystemClock};
use crate::config::{
FlushOptions, FlushType, MergeOptions, PreloadLevel, PutOptions, ReadOptions, ScanOptions,
Settings, WriteOptions,
};
use crate::db_iter::DbIterator;
use crate::db_read::DbRead;
use crate::db_snapshot::DbSnapshot;
use crate::db_state::{DbState, SsTableId};
use crate::db_stats::DbStats;
use crate::error::SlateDBError;
use crate::manifest::store::FenceableManifest;
use crate::mem_table::WritableKVTable;
use crate::mem_table_flush::{MemtableFlushMsg, MEMTABLE_FLUSHER_TASK_NAME};
use crate::oracle::{DbOracle, Oracle};
use crate::paths::PathResolver;
use crate::rand::DbRand;
use crate::reader::Reader;
use crate::sst_iter::SstIteratorOptions;
use crate::stats::StatRegistry;
use crate::tablestore::TableStore;
use crate::transaction_manager::TransactionManager;
use crate::utils::{format_bytes_si, MonotonicSeq, SendSafely};
use crate::wal_buffer::{WalBufferManager, WAL_BUFFER_TASK_NAME};
use crate::wal_replay::{WalReplayIterator, WalReplayOptions};
use log::{info, trace, warn};
pub mod builder;
use crate::manifest::Manifest;
use crate::transactional_object::DirtyObject;
pub use builder::DbBuilder;
pub(crate) struct DbInner {
pub(crate) state: Arc<RwLock<DbState>>,
pub(crate) settings: Settings,
pub(crate) table_store: Arc<TableStore>,
pub(crate) memtable_flush_notifier: UnboundedSender<MemtableFlushMsg>,
pub(crate) write_notifier: UnboundedSender<WriteBatchMessage>,
pub(crate) db_stats: DbStats,
pub(crate) stat_registry: Arc<StatRegistry>,
#[allow(dead_code)]
pub(crate) fp_registry: Arc<FailPointRegistry>,
pub(crate) mono_clock: Arc<MonotonicClock>,
pub(crate) system_clock: Arc<dyn SystemClock>,
pub(crate) rand: Arc<DbRand>,
pub(crate) oracle: Arc<DbOracle>,
pub(crate) reader: Reader,
pub(crate) wal_buffer: Arc<WalBufferManager>,
pub(crate) wal_enabled: bool,
pub(crate) txn_manager: Arc<TransactionManager>,
}
impl DbInner {
pub async fn new(
settings: Settings,
logical_clock: Arc<dyn LogicalClock>,
system_clock: Arc<dyn SystemClock>,
rand: Arc<DbRand>,
table_store: Arc<TableStore>,
manifest: DirtyObject<Manifest>,
memtable_flush_notifier: UnboundedSender<MemtableFlushMsg>,
write_notifier: UnboundedSender<WriteBatchMessage>,
stat_registry: Arc<StatRegistry>,
fp_registry: Arc<FailPointRegistry>,
merge_operator: Option<crate::merge_operator::MergeOperatorType>,
) -> Result<Self, SlateDBError> {
let last_l0_seq = manifest.core().last_l0_seq;
let last_seq = MonotonicSeq::new(last_l0_seq);
let last_committed_seq = MonotonicSeq::new(last_l0_seq);
let last_remote_persisted_seq = MonotonicSeq::new(last_l0_seq);
let oracle = Arc::new(DbOracle::new(
last_seq,
last_committed_seq,
last_remote_persisted_seq,
));
let mono_clock = Arc::new(MonotonicClock::new(
logical_clock,
manifest.core().last_l0_clock_tick,
));
let state = Arc::new(RwLock::new(DbState::new(manifest)));
let db_stats = DbStats::new(stat_registry.as_ref());
let wal_enabled = DbInner::wal_enabled_in_options(&settings);
let reader = Reader {
table_store: table_store.clone(),
db_stats: db_stats.clone(),
mono_clock: mono_clock.clone(),
oracle: oracle.clone(),
merge_operator,
};
let recent_flushed_wal_id = state.read().state().core().replay_after_wal_id;
let wal_buffer = Arc::new(WalBufferManager::new(
state.clone(),
state.clone(),
db_stats.clone(),
recent_flushed_wal_id,
oracle.clone(),
table_store.clone(),
mono_clock.clone(),
settings.l0_sst_size_bytes,
settings.flush_interval,
));
let txn_manager = Arc::new(TransactionManager::new(rand.clone()));
let db_inner = Self {
state,
settings,
oracle,
wal_enabled,
table_store,
memtable_flush_notifier,
wal_buffer,
write_notifier,
db_stats,
mono_clock,
system_clock,
rand,
stat_registry,
fp_registry,
reader,
txn_manager,
};
Ok(db_inner)
}
pub async fn get_with_options<K: AsRef<[u8]>>(
&self,
key: K,
options: &ReadOptions,
) -> Result<Option<Bytes>, SlateDBError> {
self.db_stats.get_requests.inc();
self.check_closed()?;
let db_state = self.state.read().view();
self.reader
.get_with_options(key, options, &db_state, None, None)
.await
}
pub async fn scan_with_options(
&self,
range: BytesRange,
options: &ScanOptions,
) -> Result<DbIterator, SlateDBError> {
self.db_stats.scan_requests.inc();
self.check_closed()?;
let db_state = self.state.read().view();
self.reader
.scan_with_options(range, options, &db_state, None, None, None)
.await
}
async fn fence_writers(
&self,
manifest: &mut FenceableManifest,
next_wal_id: u64,
) -> Result<(), SlateDBError> {
let mut empty_wal_id = next_wal_id;
loop {
let empty_wal = WritableKVTable::new();
match self
.flush_imm_table(
&SsTableId::Wal(empty_wal_id),
empty_wal.table().clone(),
false,
)
.await
{
Ok(_) => {
return Ok(());
}
Err(SlateDBError::Fenced) => {
manifest.refresh().await?;
self.state
.write()
.merge_remote_manifest(manifest.prepare_dirty()?);
empty_wal_id += 1;
}
Err(e) => {
return Err(e);
}
}
}
}
#[allow(unused_variables)]
pub(crate) fn wal_enabled_in_options(settings: &Settings) -> bool {
#[cfg(feature = "wal_disable")]
return settings.wal_enabled;
#[cfg(not(feature = "wal_disable"))]
return true;
}
pub async fn write_with_options(
&self,
batch: WriteBatch,
options: &WriteOptions,
) -> Result<(), SlateDBError> {
self.check_closed()?;
if batch.ops.is_empty() {
return Ok(());
}
self.db_stats.write_batch_count.inc();
self.db_stats.write_ops.add(batch.ops.len() as u64);
let (tx, rx) = tokio::sync::oneshot::channel();
let batch_msg = WriteBatchMessage {
batch,
options: options.clone(),
done: tx,
};
self.maybe_apply_backpressure().await?;
self.write_notifier
.send_safely(self.state.read().closed_result_reader(), batch_msg)?;
let mut durable_watcher = rx.await??;
if options.await_durable {
durable_watcher.await_value().await?;
}
Ok(())
}
#[inline]
pub(crate) async fn maybe_apply_backpressure(&self) -> Result<(), SlateDBError> {
loop {
let (wal_size_bytes, imm_memtable_size_bytes) = {
let wal_size_bytes = self.wal_buffer.estimated_bytes()?;
let imm_memtable_size_bytes = {
let guard = self.state.read();
guard
.state()
.imm_memtable
.iter()
.map(|imm| {
let metadata = imm.table().metadata();
self.table_store.estimate_encoded_size(
metadata.entry_num,
metadata.entries_size_in_bytes,
)
})
.sum::<usize>()
};
(wal_size_bytes, imm_memtable_size_bytes)
};
let total_mem_size_bytes = wal_size_bytes + imm_memtable_size_bytes;
self.db_stats
.total_mem_size_bytes
.set(total_mem_size_bytes as i64);
trace!(
"checking backpressure [total_mem_size_bytes={}, wal_size_bytes={}, imm_memtable_size_bytes={}, max_unflushed_bytes={}]",
format_bytes_si(total_mem_size_bytes as u64),
format_bytes_si(wal_size_bytes as u64),
format_bytes_si(imm_memtable_size_bytes as u64),
format_bytes_si(self.settings.max_unflushed_bytes as u64),
);
if total_mem_size_bytes >= self.settings.max_unflushed_bytes {
self.db_stats.backpressure_count.inc();
warn!(
"unflushed memtable size exceeds max_unflushed_bytes. applying backpressure. [total_mem_size_bytes={}, wal_size_bytes={}, imm_memtable_size_bytes={}, max_unflushed_bytes={}]",
format_bytes_si(total_mem_size_bytes as u64),
format_bytes_si(wal_size_bytes as u64),
format_bytes_si(imm_memtable_size_bytes as u64),
format_bytes_si(self.settings.max_unflushed_bytes as u64),
);
let maybe_oldest_unflushed_memtable = {
let guard = self.state.read();
guard.state().imm_memtable.back().cloned()
};
let maybe_oldest_unflushed_wal = self.wal_buffer.oldest_unflushed_wal();
if maybe_oldest_unflushed_memtable.is_none() && maybe_oldest_unflushed_wal.is_none()
{
continue;
}
let await_flush_memtable = async {
if let Some(oldest_unflushed_memtable) = maybe_oldest_unflushed_memtable {
oldest_unflushed_memtable.await_flush_to_l0().await
} else {
std::future::pending().await
}
};
let await_flush_wal = async {
if let Some(oldest_unflushed_wal) = maybe_oldest_unflushed_wal {
oldest_unflushed_wal.await_durable().await
} else {
std::future::pending().await
}
};
self.flush_immutable_memtables().await?;
let timeout_fut = self.system_clock.sleep(Duration::from_secs(30));
tokio::select! {
result = await_flush_memtable => result?,
result = await_flush_wal => result?,
_ = timeout_fut => {
warn!("backpressure timeout: waited 30s, no memtable/WAL flushed yet");
}
};
} else {
break;
}
}
Ok(())
}
pub(crate) async fn flush_wals(&self) -> Result<(), SlateDBError> {
self.wal_buffer.flush().await
}
async fn flush_immutable_memtables(&self) -> Result<(), SlateDBError> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.memtable_flush_notifier.send_safely(
self.state.read().closed_result_reader(),
MemtableFlushMsg::FlushImmutableMemtables { sender: Some(tx) },
)?;
rx.await?
}
pub(crate) async fn flush_memtables(&self) -> Result<(), SlateDBError> {
{
let last_flushed_wal_id = self.wal_buffer.recent_flushed_wal_id();
let mut guard = self.state.write();
if !guard.memtable().is_empty() {
guard.freeze_memtable(last_flushed_wal_id)?;
true
} else {
false
}
};
self.flush_immutable_memtables().await
}
async fn replay_wal(&self) -> Result<(), SlateDBError> {
let sst_iter_options = SstIteratorOptions {
max_fetch_tasks: 1,
blocks_to_fetch: 256,
cache_blocks: false,
eager_spawn: true,
};
let replay_options = WalReplayOptions {
sst_batch_size: 4,
min_memtable_bytes: self.settings.l0_sst_size_bytes,
max_memtable_bytes: usize::MAX,
sst_iter_options,
};
let db_state = self.state.read().state().core().clone();
let mut replay_iter =
WalReplayIterator::new(&db_state, replay_options, Arc::clone(&self.table_store))
.await?;
while let Some(replayed_table) = replay_iter.next().await? {
self.maybe_apply_backpressure().await?;
self.replay_memtable(replayed_table)?;
}
assert!(
self.oracle.last_remote_persisted_seq.load() <= self.oracle.last_committed_seq.load()
);
self.oracle
.last_remote_persisted_seq
.store(self.oracle.last_committed_seq());
Ok(())
}
async fn preload_cache(
&self,
cached_obj_store: &CachedObjectStore,
path_resolver: &PathResolver,
) -> Result<(), SlateDBError> {
let current_state = self.state.read().state();
let max_cache_size = self
.settings
.object_store_cache_options
.max_cache_size_bytes
.unwrap_or(usize::MAX);
match self
.settings
.object_store_cache_options
.preload_disk_cache_on_startup
{
Some(PreloadLevel::AllSst) => {
let l0_count = current_state.manifest.core().l0.len();
let compacted_count: usize = current_state
.manifest
.core()
.compacted
.iter()
.map(|level| level.ssts.len())
.sum();
let total_capacity = l0_count + compacted_count;
let mut all_sst_paths: Vec<object_store::path::Path> =
Vec::with_capacity(total_capacity);
all_sst_paths.extend(
current_state
.manifest
.core()
.l0
.iter()
.map(|sst_handle| path_resolver.table_path(&sst_handle.id)),
);
all_sst_paths.extend(
current_state
.manifest
.core()
.compacted
.iter()
.flat_map(|level| &level.ssts)
.map(|sst_handle| path_resolver.table_path(&sst_handle.id)),
);
if !all_sst_paths.is_empty() {
if let Err(e) = cached_obj_store
.load_files_to_cache(all_sst_paths, max_cache_size)
.await
{
warn!("Failed to preload all SSTs to cache: {:?}", e);
}
}
}
Some(PreloadLevel::L0Sst) => {
let l0_sst_paths: Vec<object_store::path::Path> = current_state
.manifest
.core()
.l0
.iter()
.map(|sst_handle| path_resolver.table_path(&sst_handle.id))
.collect();
if !l0_sst_paths.is_empty() {
if let Err(e) = cached_obj_store
.load_files_to_cache(l0_sst_paths, max_cache_size)
.await
{
warn!("failed to preload L0 SSTs to cache [error={:?}]", e);
}
}
}
None => {
}
}
Ok(())
}
pub(crate) fn check_closed(&self) -> Result<(), SlateDBError> {
let closed_result_reader = {
let state = self.state.read();
state.closed_result_reader()
};
if let Some(result) = closed_result_reader.read() {
return match result {
Ok(()) => Err(SlateDBError::Closed),
Err(e) => Err(e.clone()),
};
}
Ok(())
}
}
#[derive(Clone)]
pub struct Db {
pub(crate) inner: Arc<DbInner>,
task_executor: Arc<MessageHandlerExecutor>,
}
impl Db {
pub async fn open<P: Into<Path>>(
path: P,
object_store: Arc<dyn ObjectStore>,
) -> Result<Self, crate::Error> {
Self::builder(path, object_store).build().await
}
pub fn builder<P: Into<Path>>(path: P, object_store: Arc<dyn ObjectStore>) -> DbBuilder<P> {
DbBuilder::new(path, object_store)
}
pub async fn close(&self) -> Result<(), crate::Error> {
if let Err(e) = self.task_executor.shutdown_task(COMPACTOR_TASK_NAME).await {
warn!("failed to shutdown compactor task [error={:?}]", e);
}
if let Err(e) = self.task_executor.shutdown_task(GC_TASK_NAME).await {
warn!("failed to shutdown garbage collector task [error={:?}]", e);
}
if let Err(e) = self
.task_executor
.shutdown_task(WRITE_BATCH_TASK_NAME)
.await
{
warn!("failed to shutdown writer task [error={:?}]", e);
}
if let Err(e) = self.task_executor.shutdown_task(WAL_BUFFER_TASK_NAME).await {
warn!("failed to shutdown wal writer task [error={:?}]", e);
}
if let Err(e) = self
.task_executor
.shutdown_task(MEMTABLE_FLUSHER_TASK_NAME)
.await
{
warn!("failed to shutdown memtable writer task [error={:?}]", e);
}
self.inner.state.write().closed_result().write(Ok(()));
info!("db closed");
Ok(())
}
pub async fn snapshot(&self) -> Result<Arc<DbSnapshot>, crate::Error> {
self.inner.check_closed()?;
let seq = self.inner.oracle.last_committed_seq();
let snapshot = DbSnapshot::new(self.inner.clone(), self.inner.txn_manager.clone(), seq);
Ok(snapshot)
}
pub async fn get<K: AsRef<[u8]> + Send>(&self, key: K) -> Result<Option<Bytes>, crate::Error> {
self.get_with_options(key, &ReadOptions::default()).await
}
pub async fn get_with_options<K: AsRef<[u8]> + Send>(
&self,
key: K,
options: &ReadOptions,
) -> Result<Option<Bytes>, crate::Error> {
self.inner
.get_with_options(key, options)
.await
.map_err(Into::into)
}
pub async fn scan<K, T>(&self, range: T) -> Result<DbIterator, crate::Error>
where
K: AsRef<[u8]> + Send,
T: RangeBounds<K> + Send,
{
self.scan_with_options(range, &ScanOptions::default()).await
}
pub async fn scan_with_options<K, T>(
&self,
range: T,
options: &ScanOptions,
) -> Result<DbIterator, crate::Error>
where
K: AsRef<[u8]> + Send,
T: RangeBounds<K> + Send,
{
let start = range
.start_bound()
.map(|b| Bytes::copy_from_slice(b.as_ref()));
let end = range
.end_bound()
.map(|b| Bytes::copy_from_slice(b.as_ref()));
let range = (start, end);
self.inner
.scan_with_options(BytesRange::from(range), options)
.await
.map_err(Into::into)
}
pub async fn scan_prefix<P>(&self, prefix: P) -> Result<DbIterator, crate::Error>
where
P: AsRef<[u8]> + Send,
{
self.scan_prefix_with_options(prefix, &ScanOptions::default())
.await
}
pub async fn scan_prefix_with_options<P>(
&self,
prefix: P,
options: &ScanOptions,
) -> Result<DbIterator, crate::Error>
where
P: AsRef<[u8]> + Send,
{
let range = BytesRange::from_prefix(prefix.as_ref());
self.inner
.scan_with_options(range, options)
.await
.map_err(Into::into)
}
pub async fn put<K, V>(&self, key: K, value: V) -> Result<(), crate::Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let mut batch = WriteBatch::new();
batch.put(key, value);
self.write(batch).await
}
pub async fn put_with_options<K, V>(
&self,
key: K,
value: V,
put_opts: &PutOptions,
write_opts: &WriteOptions,
) -> Result<(), crate::Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let mut batch = WriteBatch::new();
batch.put_with_options(key, value, put_opts);
self.write_with_options(batch, write_opts).await
}
pub async fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), crate::Error> {
let mut batch = WriteBatch::new();
batch.delete(key.as_ref());
self.write(batch).await
}
pub async fn delete_with_options<K: AsRef<[u8]>>(
&self,
key: K,
options: &WriteOptions,
) -> Result<(), crate::Error> {
let mut batch = WriteBatch::new();
batch.delete(key);
self.write_with_options(batch, options).await
}
pub async fn merge<K, V>(&self, key: K, value: V) -> Result<(), crate::Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let mut batch = WriteBatch::new();
batch.merge(key, value);
self.write(batch).await
}
pub async fn merge_with_options<K, V>(
&self,
key: K,
value: V,
merge_opts: &MergeOptions,
write_opts: &WriteOptions,
) -> Result<(), crate::Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let mut batch = WriteBatch::new();
batch.merge_with_options(key, value, merge_opts);
self.write_with_options(batch, write_opts).await
}
pub async fn write(&self, batch: WriteBatch) -> Result<(), crate::Error> {
self.write_with_options(batch, &WriteOptions::default())
.await
}
pub async fn write_with_options(
&self,
batch: WriteBatch,
options: &WriteOptions,
) -> Result<(), crate::Error> {
self.inner
.write_with_options(batch, options)
.await
.map_err(Into::into)
}
pub async fn flush(&self) -> Result<(), crate::Error> {
if self.inner.wal_enabled {
self.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
} else {
self.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
}
}
pub async fn flush_with_options(&self, options: FlushOptions) -> Result<(), crate::Error> {
match options.flush_type {
FlushType::Wal => {
if self.inner.wal_enabled {
self.inner.flush_wals().await
} else {
Err(SlateDBError::WalDisabled)
}
}
FlushType::MemTable => self.inner.flush_memtables().await,
}
.map_err(Into::into)
}
pub fn metrics(&self) -> Arc<StatRegistry> {
self.inner.stat_registry.clone()
}
pub async fn begin(
&self,
isolation_level: IsolationLevel,
) -> Result<DbTransaction, crate::Error> {
self.inner.check_closed()?;
let seq = self.inner.oracle.last_committed_seq();
let txn = DbTransaction::new(
self.inner.clone(),
self.inner.txn_manager.clone(),
seq,
isolation_level,
);
Ok(txn)
}
pub fn resolve_object_store(url: &str) -> Result<Arc<dyn ObjectStore>, crate::Error> {
let registry = DefaultObjectStoreRegistry::new();
let url = url
.try_into()
.map_err(|e| SlateDBError::InvalidObjectStoreURL(url.to_string(), e))?;
let (object_store, path) = registry.resolve(&url).map_err(SlateDBError::from)?;
let object_store: Arc<dyn ObjectStore> = if path.as_ref().is_empty() {
object_store
} else {
Arc::new(PrefixStore::new(object_store, path))
};
Ok(object_store)
}
}
#[async_trait::async_trait]
impl DbRead for Db {
async fn get_with_options<K: AsRef<[u8]> + Send>(
&self,
key: K,
options: &ReadOptions,
) -> Result<Option<Bytes>, crate::Error> {
self.get_with_options(key, options).await
}
async fn scan_with_options<K, T>(
&self,
range: T,
options: &ScanOptions,
) -> Result<DbIterator, crate::Error>
where
K: AsRef<[u8]> + Send,
T: RangeBounds<K> + Send,
{
self.scan_with_options(range, options).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cached_object_store::stats::{
OBJECT_STORE_CACHE_PART_ACCESS, OBJECT_STORE_CACHE_PART_HITS,
};
use crate::cached_object_store::{CachedObjectStore, FsCacheStorage};
use crate::cached_object_store_stats::CachedObjectStoreStats;
use crate::clock::DefaultSystemClock;
#[cfg(feature = "test-util")]
use crate::clock::MockSystemClock;
use crate::config::DurabilityLevel::{Memory, Remote};
use crate::config::{
CompactorOptions, GarbageCollectorDirectoryOptions, GarbageCollectorOptions,
ObjectStoreCacheOptions, Settings, SizeTieredCompactionSchedulerOptions, Ttl,
};
use crate::db::builder::GarbageCollectorBuilder;
use crate::db_state::CoreDbState;
use crate::db_stats::IMMUTABLE_MEMTABLE_FLUSHES;
use crate::iter::KeyValueIterator;
use crate::manifest::store::{ManifestStore, StoredManifest};
use crate::object_stores::ObjectStores;
use crate::proptest_util::arbitrary;
use crate::proptest_util::sample;
use crate::rand::DbRand;
#[cfg(feature = "test-util")]
use crate::seq_tracker::FindOption;
use crate::size_tiered_compaction::SizeTieredCompactionSchedulerSupplier;
use crate::sst::SsTableFormat;
use crate::sst_iter::{SstIterator, SstIteratorOptions};
use crate::test_utils::{
assert_iterator, OnDemandCompactionSchedulerSupplier, StringConcatMergeOperator, TestClock,
};
use crate::types::RowEntry;
use crate::{proptest_util, test_utils, CloseReason, KeyValue};
use async_trait::async_trait;
use chrono::TimeDelta;
#[cfg(feature = "test-util")]
use chrono::{TimeZone, Utc};
use fail_parallel::FailPointRegistry;
use futures::{future, future::join_all, StreamExt};
use object_store::memory::InMemory;
use object_store::ObjectStore;
use proptest::test_runner::{TestRng, TestRunner};
use std::collections::BTreeMap;
use std::collections::Bound::Included;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::runtime::Runtime;
use tracing::info;
#[tokio::test]
async fn test_put_get_delete() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let key = b"test_key";
let value = b"test_value";
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
kv_store.delete(key).await.unwrap();
assert_eq!(None, kv_store.get(key).await.unwrap());
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_prefix_returns_matching_keys() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_scan_prefix", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
kv_store.put(b"ab", b"v0").await.unwrap();
kv_store.put(b"aba", b"v1").await.unwrap();
kv_store.put(b"abb", b"v2").await.unwrap();
kv_store.put(b"ac", b"v3").await.unwrap();
let mut iter = kv_store.scan_prefix(b"ab").await.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"ab");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"aba");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"abb");
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_prefix_with_options_handles_unbounded_end() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_scan_prefix_unbounded", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
kv_store.put(&[0xff, 0xff], b"v0").await.unwrap();
kv_store.put(&[0xff, 0xff, 0x00], b"v1").await.unwrap();
kv_store.put(&[0xff, 0xff, 0x10], b"v2").await.unwrap();
kv_store.put(&[0xff, 0xff, 0xff], b"v4").await.unwrap();
kv_store.put(&[0xff, 0xfe], b"v3").await.unwrap();
let scan_options = ScanOptions {
cache_blocks: false,
..ScanOptions::default()
};
let mut iter = kv_store
.scan_prefix_with_options(&[0xff, 0xff], &scan_options)
.await
.unwrap();
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
&[0xff, 0xff]
);
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
&[0xff, 0xff, 0x00]
);
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
&[0xff, 0xff, 0x10]
);
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
&[0xff, 0xff, 0xff]
);
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_resolve_object_store_local_prefix_store_writes_to_path() {
let temp_dir = tempfile::tempdir().unwrap();
let prefix_path = temp_dir.path().join("prefix-store");
let url = format!("file://{}", prefix_path.display());
let object_store = Db::resolve_object_store(&url).unwrap();
let location = object_store::path::Path::from("nested/file.txt");
let payload = Bytes::from_static(b"local prefix payload");
object_store
.put(&location, payload.clone().into())
.await
.unwrap();
let expected_path = prefix_path.join("nested").join("file.txt");
let stored = tokio::fs::read(&expected_path).await.unwrap();
assert_eq!(stored, payload.to_vec());
}
#[test]
fn test_get_after_put() {
let mut runner = new_proptest_runner(None);
let runtime = Runtime::new().unwrap();
let table = sample::table(runner.rng(), 1000, 10);
let db_options = test_db_options(0, 1024, None);
let db = runtime.block_on(build_database_from_table(&table, db_options, true));
runner
.run(
&(arbitrary::bytes(100), arbitrary::bytes(100)),
|(key, value)| {
runtime.block_on(async {
if !key.is_empty() {
db.put_with_options(
&key,
&value,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(
Some(value),
db.get_with_options(
&key,
&ReadOptions {
durability_filter: Memory,
dirty: false,
cache_blocks: true,
}
)
.await
.unwrap()
);
}
});
Ok(())
},
)
.unwrap();
}
#[tokio::test]
async fn test_no_flush_interval() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db_options_no_flush_interval = {
let mut db_options = test_db_options(0, 1024, None);
db_options.flush_interval = None;
db_options
};
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(db_options_no_flush_interval)
.build()
.await
.unwrap();
let key = b"test_key";
let value = b"test_value";
kv_store
.put_with_options(
key,
value,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_ne!(kv_store.inner.wal_buffer.estimated_bytes().unwrap(), 0);
kv_store.flush().await.unwrap();
assert_eq!(kv_store.inner.wal_buffer.estimated_bytes().unwrap(), 0);
}
#[tokio::test]
async fn test_get_with_default_ttl_and_read_uncommitted() {
let clock = Arc::new(TestClock::new());
let ttl = 100;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options_with_ttl(0, 1024, None, Some(ttl)))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
let key = b"test_key";
let value = b"test_value";
kv_store.put(key, value).await.unwrap();
clock.ticker.store(99, Ordering::SeqCst);
assert_eq!(
Some(Bytes::from_static(value)),
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Memory))
.await
.unwrap(),
);
clock.ticker.store(100, Ordering::SeqCst);
assert_eq!(
None,
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Memory))
.await
.unwrap(),
);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_get_with_row_override_ttl_and_read_uncommitted() {
let clock = Arc::new(TestClock::new());
let default_ttl = 100;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options_with_ttl(0, 1024, None, Some(default_ttl)))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
let key = b"test_key";
let value = b"test_value";
kv_store
.put_with_options(
key,
value,
&PutOptions {
ttl: Ttl::ExpireAfter(50),
},
&WriteOptions::default(),
)
.await
.unwrap();
clock.ticker.store(49, Ordering::SeqCst);
assert_eq!(
Some(Bytes::from_static(value)),
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Memory))
.await
.unwrap(),
);
clock.ticker.store(50, Ordering::SeqCst);
assert_eq!(
None,
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Memory))
.await
.unwrap(),
);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_get_with_default_ttl_and_read_committed() {
let clock = Arc::new(TestClock::new());
let ttl = 100;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options_with_ttl(0, 1024, None, Some(ttl)))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
let key = b"test_key";
let key_other = b"time_advancing_key";
let value = b"test_value";
kv_store.put(key, value).await.unwrap();
clock.ticker.store(99, Ordering::SeqCst);
kv_store.put(key_other, value).await.unwrap(); kv_store.flush().await.unwrap();
assert_eq!(
Some(Bytes::from_static(value)),
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Remote))
.await
.unwrap(),
);
clock.ticker.store(100, Ordering::SeqCst);
assert_eq!(
Some(Bytes::from_static(value)),
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Remote))
.await
.unwrap(),
);
kv_store.put(key_other, value).await.unwrap(); kv_store.flush().await.unwrap();
assert_eq!(
None,
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Remote))
.await
.unwrap(),
);
kv_store.close().await.unwrap();
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_get_with_durability_level_when_wal_disabled() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut options = test_db_options(0, 1024 * 1024, None);
options.wal_enabled = false;
let db = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(options)
.build()
.await
.unwrap();
let put_options = PutOptions::default();
let write_options = WriteOptions {
await_durable: false,
};
let get_memory_options = ReadOptions::new().with_durability_filter(Memory);
let get_remote_options = ReadOptions::new().with_durability_filter(Remote);
db.put_with_options(b"foo", b"bar", &put_options, &write_options)
.await
.unwrap();
let val_bytes = Bytes::copy_from_slice(b"bar");
assert_eq!(
None,
db.get_with_options(b"foo", &get_remote_options)
.await
.unwrap()
);
assert_eq!(
Some(val_bytes.clone()),
db.get_with_options(b"foo", &get_memory_options)
.await
.unwrap()
);
db.flush().await.unwrap();
assert_eq!(
Some(val_bytes.clone()),
db.get_with_options(b"foo", &get_remote_options)
.await
.unwrap()
);
assert_eq!(
Some(val_bytes.clone()),
db.get_with_options(b"foo", &get_memory_options)
.await
.unwrap()
);
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_find_with_multiple_repeated_keys() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut options = test_db_options(0, 1024 * 1024, None);
options.wal_enabled = false;
let db = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(options)
.build()
.await
.unwrap();
let mut last_val: String = "foo".to_string();
for x in 0..4096 {
let val = format!("val{}", x);
db.put_with_options(
b"key",
val.as_bytes(),
&PutOptions {
ttl: Default::default(),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
last_val = val;
if db
.inner
.state
.write()
.memtable()
.metadata()
.entries_size_in_bytes
> (SsTableFormat::default().block_size * 3)
{
break;
}
}
assert_eq!(
Some(Bytes::copy_from_slice(last_val.as_bytes())),
db.get_with_options(b"key", &ReadOptions::new().with_durability_filter(Memory))
.await
.unwrap()
);
db.flush().await.unwrap();
let state = db.inner.state.read().view();
assert_eq!(1, state.state.manifest.core().l0.len());
let sst = state.state.manifest.core().l0.front().unwrap();
let index = db.inner.table_store.read_index(sst, true).await.unwrap();
assert!(!index.borrow().block_meta().is_empty());
assert_eq!(
Some(Bytes::copy_from_slice(last_val.as_bytes())),
db.get(b"key").await.unwrap()
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_get_with_row_override_ttl_and_read_committed() {
let clock = Arc::new(TestClock::new());
let ttl = 100;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options_with_ttl(0, 1024, None, Some(ttl)))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
let key = b"test_key";
let key_other = b"time_advancing_key";
let value = b"test_value";
kv_store
.put_with_options(
key,
value,
&PutOptions {
ttl: Ttl::ExpireAfter(50),
},
&WriteOptions::default(),
)
.await
.unwrap();
clock.ticker.store(49, Ordering::SeqCst);
kv_store.put(key_other, value).await.unwrap(); kv_store.flush().await.unwrap();
assert_eq!(
Some(Bytes::from_static(value)),
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Remote))
.await
.unwrap(),
);
clock.ticker.store(50, Ordering::SeqCst);
assert_eq!(
Some(Bytes::from_static(value)),
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Remote))
.await
.unwrap(),
);
kv_store.put(key_other, value).await.unwrap(); kv_store.flush().await.unwrap();
assert_eq!(
None,
kv_store
.get_with_options(key, &ReadOptions::new().with_durability_filter(Remote))
.await
.unwrap(),
);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_with_default_ttl_and_read_uncommitted() {
let clock = Arc::new(TestClock::new());
let ttl = 100;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options_with_ttl(0, 1024, None, Some(ttl)))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
kv_store.put(b"key1", b"value1").await.unwrap();
kv_store.put(b"key2", b"value2").await.unwrap();
kv_store.put(b"key3", b"value3").await.unwrap();
clock.ticker.store(99, Ordering::SeqCst);
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key4"[..],
&ScanOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key1");
assert_eq!(kv.value.as_ref(), b"value1");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key2");
assert_eq!(kv.value.as_ref(), b"value2");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key3");
assert_eq!(kv.value.as_ref(), b"value3");
assert_eq!(iter.next().await.unwrap(), None);
clock.ticker.store(100, Ordering::SeqCst);
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key4"[..],
&ScanOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_with_row_override_ttl_and_read_uncommitted() {
let clock = Arc::new(TestClock::new());
let default_ttl = 100;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options_with_ttl(0, 1024, None, Some(default_ttl)))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
kv_store
.put_with_options(
b"key1",
b"value1",
&PutOptions {
ttl: Ttl::ExpireAfter(50),
},
&WriteOptions::default(),
)
.await
.unwrap();
kv_store
.put_with_options(
b"key2",
b"value2",
&PutOptions {
ttl: Ttl::ExpireAfter(75),
},
&WriteOptions::default(),
)
.await
.unwrap();
kv_store
.put_with_options(
b"key3",
b"value3",
&PutOptions {
ttl: Ttl::ExpireAfter(100),
},
&WriteOptions::default(),
)
.await
.unwrap();
clock.ticker.store(49, Ordering::SeqCst);
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key4"[..],
&ScanOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key1");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key2");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key3");
assert_eq!(iter.next().await.unwrap(), None);
clock.ticker.store(50, Ordering::SeqCst);
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key4"[..],
&ScanOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key2");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key3");
assert_eq!(iter.next().await.unwrap(), None);
clock.ticker.store(75, Ordering::SeqCst);
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key4"[..],
&ScanOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key3");
assert_eq!(iter.next().await.unwrap(), None);
clock.ticker.store(100, Ordering::SeqCst);
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key4"[..],
&ScanOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_with_default_ttl_and_read_committed() {
let clock = Arc::new(TestClock::new());
let ttl = 100;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options_with_ttl(0, 1024, None, Some(ttl)))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
let key_other = b"time_advancing_key";
kv_store.put(b"key1", b"value1").await.unwrap();
kv_store.put(b"key2", b"value2").await.unwrap();
kv_store.put(b"key3", b"value3").await.unwrap();
clock.ticker.store(99, Ordering::SeqCst);
kv_store.put(key_other, b"value").await.unwrap(); kv_store.flush().await.unwrap();
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key4"[..],
&ScanOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key1");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key2");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key3");
assert_eq!(iter.next().await.unwrap(), None);
clock.ticker.store(100, Ordering::SeqCst);
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key4"[..],
&ScanOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key1");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key2");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key3");
assert_eq!(iter.next().await.unwrap(), None);
kv_store.put(key_other, b"value").await.unwrap(); kv_store.flush().await.unwrap();
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key4"[..],
&ScanOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_with_row_override_ttl_and_read_committed() {
let clock = Arc::new(TestClock::new());
let default_ttl = 100;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options_with_ttl(0, 1024, None, Some(default_ttl)))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
let key_other = b"time_advancing_key";
kv_store
.put_with_options(
b"key1",
b"value1",
&PutOptions {
ttl: Ttl::ExpireAfter(50),
},
&WriteOptions::default(),
)
.await
.unwrap();
kv_store
.put_with_options(
b"key2",
b"value2",
&PutOptions {
ttl: Ttl::ExpireAfter(75),
},
&WriteOptions::default(),
)
.await
.unwrap();
clock.ticker.store(49, Ordering::SeqCst);
kv_store.put(key_other, b"value").await.unwrap(); kv_store.flush().await.unwrap();
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key3"[..],
&ScanOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key1");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key2");
assert_eq!(iter.next().await.unwrap(), None);
clock.ticker.store(50, Ordering::SeqCst);
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key3"[..],
&ScanOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key1");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key2");
assert_eq!(iter.next().await.unwrap(), None);
kv_store.put(key_other, b"value").await.unwrap(); kv_store.flush().await.unwrap();
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key3"[..],
&ScanOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"key2");
assert_eq!(iter.next().await.unwrap(), None);
clock.ticker.store(75, Ordering::SeqCst);
kv_store.put(key_other, b"value").await.unwrap(); kv_store.flush().await.unwrap();
let mut iter = kv_store
.scan_with_options(
&b"key1"[..]..&b"key3"[..],
&ScanOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_get_with_object_store_cache_metrics() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut opts = test_db_options(0, 1024, None);
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_")
.tempdir()
.unwrap();
opts.object_store_cache_options.root_folder = Some(temp_dir.keep());
opts.object_store_cache_options.part_size_bytes = 1024;
let kv_store = Db::builder(
"/tmp/test_kv_store_with_cache_metrics",
object_store.clone(),
)
.with_settings(opts)
.build()
.await
.unwrap();
let access_count0 = kv_store
.metrics()
.lookup(OBJECT_STORE_CACHE_PART_ACCESS)
.unwrap()
.get();
let key = b"test_key";
let value = b"test_value";
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
let got = kv_store.get(key).await.unwrap();
let access_count1 = kv_store
.metrics()
.lookup(OBJECT_STORE_CACHE_PART_ACCESS)
.unwrap()
.get();
assert_eq!(got, Some(Bytes::from_static(value)));
assert!(access_count1 > 0);
assert!(access_count1 >= access_count0);
assert!(
kv_store
.metrics()
.lookup(OBJECT_STORE_CACHE_PART_HITS)
.unwrap()
.get()
>= 1
);
}
async fn test_object_store_cache_helper(
cache_puts_enabled: bool,
db_path: &str,
expected_cache_parts: Vec<(&str, usize)>,
) -> (Arc<CachedObjectStore>, Db) {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut opts = test_db_options(0, 1024, None);
opts.manifest_poll_interval = Duration::from_millis(u64::MAX);
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_")
.tempdir()
.unwrap();
let stats_registry = StatRegistry::new();
let cache_stats = Arc::new(CachedObjectStoreStats::new(&stats_registry));
let part_size = 1024;
info!("temp_dir: {:?}", temp_dir.path());
let cache_storage = Arc::new(FsCacheStorage::new(
temp_dir.keep(),
None,
None,
cache_stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let cached_object_store = CachedObjectStore::new(
object_store.clone(),
cache_storage,
part_size,
cache_puts_enabled,
cache_stats.clone(),
)
.unwrap();
let kv_store = Db::builder(db_path, cached_object_store.clone())
.with_settings(opts)
.build()
.await
.unwrap();
let key = b"test_key";
let value = b"test_value";
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
for (path, expected_parts) in expected_cache_parts {
let entry = cached_object_store
.cache_storage
.entry(&object_store::path::Path::from(path), part_size);
assert_eq!(
entry.cached_parts().await.unwrap().len(),
expected_parts,
"Path: {}",
path
);
}
(cached_object_store, kv_store)
}
#[tokio::test]
async fn test_get_with_object_store_cache_stored_files() {
let expected_cache_parts =
vec![
("tmp/test_kv_store_with_cache_stored_files/manifest/00000000000000000001.manifest", 0),
("tmp/test_kv_store_with_cache_stored_files/manifest/00000000000000000002.manifest", 0),
("tmp/test_kv_store_with_cache_stored_files/wal/00000000000000000001.sst", 1),
("tmp/test_kv_store_with_cache_stored_files/wal/00000000000000000002.sst", 0),
];
let (_cached_object_store, kv_store) = test_object_store_cache_helper(
false, "/tmp/test_kv_store_with_cache_stored_files",
expected_cache_parts,
)
.await;
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_get_with_object_store_cache_put_caching_enabled() {
let expected_cache_parts =
vec![
("tmp/test_kv_store_with_put_cache_enabled/manifest/00000000000000000001.manifest", 1),
("tmp/test_kv_store_with_put_cache_enabled/manifest/00000000000000000002.manifest", 1),
("tmp/test_kv_store_with_put_cache_enabled/wal/00000000000000000001.sst", 1),
("tmp/test_kv_store_with_put_cache_enabled/wal/00000000000000000002.sst", 1),
];
let (_cached_object_store, kv_store) = test_object_store_cache_helper(
true, "/tmp/test_kv_store_with_put_cache_enabled",
expected_cache_parts,
)
.await;
kv_store.close().await.unwrap();
}
async fn build_database_from_table(
table: &BTreeMap<Bytes, Bytes>,
db_options: Settings,
await_durable: bool,
) -> Db {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(db_options)
.build()
.await
.unwrap();
test_utils::seed_database(&db, table, false).await.unwrap();
if await_durable {
db.flush().await.unwrap();
}
db
}
#[tokio::test]
async fn test_should_allow_iterating_behind_box_dyn() {
#[async_trait]
trait IteratorSupplier {
async fn iterator(&self) -> Box<dyn IteratorTrait>;
}
struct DbHolder {
db: Db,
}
#[async_trait]
impl IteratorSupplier for DbHolder {
async fn iterator(&self) -> Box<dyn IteratorTrait> {
let range = BytesRange::new_empty();
let iter = self
.db
.inner
.scan_with_options(range, &ScanOptions::default())
.await
.unwrap();
Box::new(iter)
}
}
#[async_trait]
trait IteratorTrait {
async fn next(&mut self) -> Result<Option<KeyValue>, crate::Error>;
}
#[async_trait]
impl IteratorTrait for DbIterator {
async fn next(&mut self) -> Result<Option<KeyValue>, crate::Error> {
DbIterator::next(self).await
}
}
let db_options = test_db_options(0, 1024, None);
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(db_options)
.build()
.await
.unwrap();
let db_holder = DbHolder { db };
let mut boxed = db_holder.iterator().await;
let next = boxed.next().await;
assert_eq!(next.unwrap(), None);
}
async fn assert_records_in_range(
table: &BTreeMap<Bytes, Bytes>,
db: &Db,
scan_options: &ScanOptions,
range: BytesRange,
) {
let mut iter = db
.inner
.scan_with_options(range.clone(), scan_options)
.await
.unwrap();
test_utils::assert_ranged_db_scan(table, range, &mut iter).await;
}
#[test]
fn test_scan_returns_records_in_range() {
let mut runner = new_proptest_runner(None);
let table = sample::table(runner.rng(), 1000, 5);
let runtime = Runtime::new().unwrap();
let db_options = test_db_options(0, 1024, None);
let db = runtime.block_on(build_database_from_table(&table, db_options, true));
runner
.run(&arbitrary::nonempty_range(10), |range| {
runtime.block_on(assert_records_in_range(
&table,
&db,
&ScanOptions::default(),
range,
));
Ok(())
})
.unwrap();
}
fn new_proptest_runner(rng_seed: Option<[u8; 32]>) -> TestRunner {
proptest_util::runner::new(file!(), rng_seed)
}
#[test]
fn test_scan_returns_uncommitted_records_if_read_level_uncommitted() {
let mut runner = new_proptest_runner(None);
let table = sample::table(runner.rng(), 1000, 5);
let runtime = Runtime::new().unwrap();
let mut db_options = test_db_options(0, 1024, None);
db_options.flush_interval = Some(Duration::from_secs(5));
let db = runtime.block_on(build_database_from_table(&table, db_options, false));
runner
.run(&arbitrary::nonempty_range(10), |range| {
let scan_options = ScanOptions {
durability_filter: Memory,
..ScanOptions::default()
};
runtime.block_on(assert_records_in_range(&table, &db, &scan_options, range));
Ok(())
})
.unwrap();
}
#[test]
fn test_seek_outside_of_range_returns_invalid_argument() {
let mut runner = new_proptest_runner(None);
let table = sample::table(runner.rng(), 1000, 10);
let runtime = Runtime::new().unwrap();
let db_options = test_db_options(0, 1024, None);
let db = runtime.block_on(build_database_from_table(&table, db_options, true));
runner
.run(
&(arbitrary::nonempty_bytes(10), arbitrary::rng()),
|(arbitrary_key, mut rng)| {
runtime.block_on(assert_out_of_bound_seek_returns_invalid_argument(
&db,
&mut rng,
arbitrary_key,
));
Ok(())
},
)
.unwrap();
async fn assert_out_of_bound_seek_returns_invalid_argument(
db: &Db,
rng: &mut TestRng,
arbitrary_key: Bytes,
) {
let mut iter = db
.scan_with_options(..arbitrary_key.clone(), &ScanOptions::default())
.await
.unwrap();
let lower_bounded_range = BytesRange::from(arbitrary_key.clone()..);
let value = sample::bytes_in_range(rng, &lower_bounded_range);
let err = iter.seek(value.clone()).await.unwrap_err();
assert!(
err.to_string()
.contains("cannot seek to a key outside the iterator range"),
"{}",
err
);
let mut iter = db
.scan_with_options(arbitrary_key.clone().., &ScanOptions::default())
.await
.unwrap();
let upper_bounded_range = BytesRange::from(..arbitrary_key.clone());
let value = sample::bytes_in_range(rng, &upper_bounded_range);
let err = iter.seek(value.clone()).await.unwrap_err();
assert!(
err.to_string()
.contains("cannot seek to a key outside the iterator range"),
"{}",
err
);
}
}
#[test]
fn test_seek_fast_forwards_iterator() {
let mut runner = new_proptest_runner(None);
let table = sample::table(runner.rng(), 1000, 10);
let runtime = Runtime::new().unwrap();
let db_options = test_db_options(0, 1024, None);
let db = runtime.block_on(build_database_from_table(&table, db_options, true));
runner
.run(
&(arbitrary::nonempty_range(5), arbitrary::rng()),
|(range, mut rng)| {
runtime.block_on(assert_seek_fast_forwards_iterator(
&table, &db, &range, &mut rng,
));
Ok(())
},
)
.unwrap();
async fn assert_seek_fast_forwards_iterator(
table: &BTreeMap<Bytes, Bytes>,
db: &Db,
scan_range: &BytesRange,
rng: &mut TestRng,
) {
let mut iter = db
.inner
.scan_with_options(scan_range.clone(), &ScanOptions::default())
.await
.unwrap();
let seek_key = sample::bytes_in_range(rng, scan_range);
iter.seek(seek_key.clone()).await.unwrap();
let seek_range = BytesRange::new(Included(seek_key), scan_range.end_bound().cloned());
test_utils::assert_ranged_db_scan(table, seek_range, &mut iter).await;
}
}
#[tokio::test]
async fn test_write_batch() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key2", b"value2");
batch.delete(b"key1");
kv_store.write(batch).await.expect("write batch failed");
assert_eq!(kv_store.get(b"key1").await.unwrap(), None);
assert_eq!(
kv_store.get(b"key2").await.unwrap(),
Some(Bytes::from_static(b"value2"))
);
kv_store.close().await.unwrap();
}
#[cfg(feature = "wal_disable")]
#[tokio::test]
async fn test_write_batch_without_wal() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut options = test_db_options(0, 8, None);
options.wal_enabled = false;
let kv_store = Db::builder("/tmp/test_kv_store_without_wal", object_store.clone())
.with_settings(options)
.build()
.await
.unwrap();
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key2", b"value2");
batch.delete(b"key1");
kv_store.write(batch).await.expect("write batch failed");
assert_eq!(kv_store.get(b"key1").await.unwrap(), None);
assert_eq!(kv_store.get(b"key2").await.unwrap(), Some("value2".into()));
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_write_batch_with_empty_key() {
let mut batch = WriteBatch::new();
let result = std::panic::catch_unwind(move || {
batch.put(b"", b"value");
});
assert!(
result.is_err(),
"Expected panic when using empty key in put operation"
);
let mut batch = WriteBatch::new();
let result = std::panic::catch_unwind(move || {
batch.delete(b"");
});
assert!(
result.is_err(),
"Expected panic when using empty key in delete operation"
);
}
#[tokio::test]
async fn test_concurrent_batch_writes_consistency() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let compaction_scheduler = Arc::new(SizeTieredCompactionSchedulerSupplier::new(
SizeTieredCompactionSchedulerOptions::default(),
));
let kv_store = Arc::new(
Db::builder("/tmp/test_concurrent_kv_store", object_store)
.with_settings(test_db_options(
0,
1024,
Some(CompactorOptions {
poll_interval: Duration::from_millis(100),
max_sst_size: 256,
max_concurrent_compactions: 1,
manifest_update_timeout: Duration::from_secs(300),
}),
))
.with_compaction_scheduler_supplier(compaction_scheduler)
.build()
.await
.unwrap(),
);
const NUM_KEYS: usize = 100;
const NUM_ROUNDS: usize = 20;
for _ in 0..NUM_ROUNDS {
let task1 = {
let store = kv_store.clone();
tokio::spawn(async move {
let mut batch = WriteBatch::new();
for key in 1..=NUM_KEYS {
batch.put(key.to_be_bytes(), key.to_be_bytes());
}
store.write(batch).await.expect("write batch failed");
})
};
let task2 = {
let store = kv_store.clone();
tokio::spawn(async move {
let mut batch = WriteBatch::new();
for key in 1..=NUM_KEYS {
let value = (key * 2).to_be_bytes();
batch.put(key.to_be_bytes(), value);
}
store.write(batch).await.expect("write batch failed");
})
};
join_all(vec![task1, task2]).await;
let mut all_key = true;
let mut all_key2 = true;
for key in 1..=NUM_KEYS {
let value = kv_store.get(key.to_be_bytes()).await.unwrap();
let value = value.expect("Value should exist");
if value.as_ref() != key.to_be_bytes() {
all_key = false;
}
if value.as_ref() != (key * 2).to_be_bytes() {
all_key2 = false;
}
}
assert!(
all_key || all_key2,
"Inconsistent state: not all values match either key or key * 2"
);
}
kv_store.close().await.unwrap();
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_disable_wal_after_wal_enabled() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let options = test_db_options(0, 32, None);
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.build()
.await
.unwrap();
db.put(&[b'a'; 4], &[b'j'; 4]).await.unwrap();
db.put(&[b'b'; 4], &[b'k'; 4]).await.unwrap();
db.close().await.unwrap();
let mut options = test_db_options(0, 32, None);
options.wal_enabled = false;
let db = Db::builder(path, object_store.clone())
.with_settings(options.clone())
.build()
.await
.unwrap();
db.delete_with_options(
&[b'b'; 4],
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put(&[b'a'; 4], &[b'z'; 64]).await.unwrap();
db.close().await.unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(options.clone())
.build()
.await
.unwrap();
let val = db.get(&[b'a'; 4]).await.unwrap();
assert_eq!(val.unwrap(), Bytes::copy_from_slice(&[b'z'; 64]));
let val = db.get(&[b'b'; 4]).await.unwrap();
assert!(val.is_none());
}
#[cfg(feature = "wal_disable")]
#[tokio::test]
async fn test_wal_disabled() {
use crate::{test_utils::assert_iterator, types::RowEntry};
let clock = Arc::new(TestClock::new());
let mut options = test_db_options(0, 256, None);
options.wal_enabled = false;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_kv_store");
let sst_format = SsTableFormat::default();
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store.clone(), None),
sst_format,
path.clone(),
None,
));
let db = Db::builder(path.clone(), object_store.clone())
.with_settings(options)
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&path, object_store.clone()));
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let write_options = WriteOptions {
await_durable: false,
};
db.put_with_options(
&[b'a'; 32],
&[b'j'; 32],
&PutOptions::default(),
&write_options,
)
.await
.unwrap();
db.delete_with_options(&[b'b'; 31], &write_options)
.await
.unwrap();
let write_options = WriteOptions {
await_durable: true,
};
clock.ticker.store(10, Ordering::SeqCst);
db.put_with_options(
&[b'c'; 32],
&[b'l'; 32],
&PutOptions::default(),
&write_options,
)
.await
.unwrap();
let state = wait_for_manifest_condition(
&mut stored_manifest,
|s| !s.l0.is_empty(),
Duration::from_secs(30),
)
.await;
assert_eq!(state.l0.len(), 1);
let l0 = state.l0.front().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
l0,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(&[b'a'; 32], &[b'j'; 32], 1).with_create_ts(0),
RowEntry::new_tombstone(&[b'b'; 31], 2).with_create_ts(0),
RowEntry::new_value(&[b'c'; 32], &[b'l'; 32], 3).with_create_ts(10),
],
)
.await;
}
#[tokio::test]
async fn test_put_flushes_memtable() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 256, None))
.build()
.await
.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let sst_format = SsTableFormat {
min_filter_keys: 10,
..SsTableFormat::default()
};
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store.clone(), None),
sst_format,
path,
None,
));
let mut last_wal_id = 0;
for i in 0..3 {
let key = [b'a' + i; 16];
let value = [b'b' + i; 50];
kv_store.put(&key, &value).await.unwrap();
let key = [b'j' + i; 16];
let value = [b'k' + i; 50];
kv_store.put(&key, &value).await.unwrap();
let db_state = wait_for_manifest_condition(
&mut stored_manifest,
|s| s.replay_after_wal_id > last_wal_id,
Duration::from_secs(30),
)
.await;
assert_eq!(db_state.replay_after_wal_id, (i as u64) * 2 + 2);
last_wal_id = db_state.replay_after_wal_id
}
let manifest = stored_manifest.refresh().await.unwrap();
let l0 = &manifest.core.l0;
assert_eq!(l0.len(), 3);
let sst_iter_options = SstIteratorOptions::default();
for i in 0u8..3u8 {
let sst1 = l0.get(2 - i as usize).unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
sst1,
table_store.clone(),
sst_iter_options,
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), [b'a' + i; 16]);
assert_eq!(kv.value.as_ref(), [b'b' + i; 50]);
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), [b'j' + i; 16]);
assert_eq!(kv.value.as_ref(), [b'k' + i; 50]);
let kv = iter.next().await.unwrap();
assert!(kv.is_none());
}
assert!(
kv_store
.metrics()
.lookup(IMMUTABLE_MEMTABLE_FLUSHES)
.unwrap()
.get()
> 0
);
}
#[tokio::test]
async fn test_flush_memtable_with_wal_enabled() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_flush_with_options";
let mut options = test_db_options(0, 256, None);
options.flush_interval = Some(Duration::from_secs(u64::MAX));
let kv_store = Db::builder(path, object_store.clone())
.with_settings(options)
.build()
.await
.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let sst_format = SsTableFormat {
min_filter_keys: 10,
..SsTableFormat::default()
};
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store.clone(), None),
sst_format,
path,
None,
));
let key1 = b"test_key_1";
let value1 = b"test_value_1";
kv_store
.put_with_options(
key1,
value1,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let key2 = b"test_key_2";
let value2 = b"test_value_2";
kv_store
.put_with_options(
key2,
value2,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let initial_manifest = stored_manifest.refresh().await.unwrap();
let initial_l0_count = initial_manifest.core.l0.len();
let initial_flush_count = kv_store
.metrics()
.lookup(IMMUTABLE_MEMTABLE_FLUSHES)
.unwrap()
.get();
kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let db_state = wait_for_manifest_condition(
&mut stored_manifest,
|s| s.l0.len() > initial_l0_count,
Duration::from_secs(30),
)
.await;
assert_eq!(db_state.l0.len(), initial_l0_count + 1);
let final_flush_count = kv_store
.metrics()
.lookup(IMMUTABLE_MEMTABLE_FLUSHES)
.unwrap()
.get();
assert!(final_flush_count > initial_flush_count);
let recent_flushed_wal_id = kv_store.inner.wal_buffer.recent_flushed_wal_id();
assert_eq!(recent_flushed_wal_id, 0);
let retrieved_value1 = kv_store.get(key1).await.unwrap().unwrap();
assert_eq!(retrieved_value1.as_ref(), value1);
let retrieved_value2 = kv_store.get(key2).await.unwrap().unwrap();
assert_eq!(retrieved_value2.as_ref(), value2);
let latest_sst = db_state.l0.back().unwrap();
let sst_iter_options = SstIteratorOptions::default();
let mut iter = SstIterator::new_borrowed_initialized(
..,
latest_sst,
table_store.clone(),
sst_iter_options,
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let mut found_keys = std::collections::HashSet::new();
while let Some(kv) = iter.next().await.unwrap() {
found_keys.insert(kv.key.to_vec());
}
assert!(found_keys.contains(key1.as_slice()));
assert!(found_keys.contains(key2.as_slice()));
}
#[cfg(feature = "test-util")]
async fn test_sequence_tracker_persisted_across_flush_and_reload_impl(wal_enabled: bool) {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_sequence_tracker_flush";
let mut settings = test_db_options(0, 256, None);
settings.flush_interval = None;
#[cfg(feature = "wal_disable")]
{
settings.wal_enabled = wal_enabled;
}
#[cfg(not(feature = "wal_disable"))]
let _ = wal_enabled;
let logical_clock = Arc::new(TestClock::new());
let system_clock = Arc::new(MockSystemClock::new());
let kv_store = Db::builder(path, object_store.clone())
.with_settings(settings.clone())
.with_system_clock(system_clock.clone())
.with_logical_clock(logical_clock.clone())
.build()
.await
.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let write_options = WriteOptions {
await_durable: false,
};
let put_options = PutOptions::default();
let timestamps_ms = [0_i64, 60_000, 120_000];
for (idx, ts) in timestamps_ms.iter().enumerate() {
logical_clock.ticker.store(*ts, Ordering::SeqCst);
system_clock.set(*ts);
let key = format!("key-{idx}").into_bytes();
let value = format!("value-{idx}").into_bytes();
kv_store
.put_with_options(&key, &value, &put_options, &write_options)
.await
.unwrap();
}
kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let target_ts = Utc.timestamp_opt(120, 0).single().unwrap();
let persisted_state = wait_for_manifest_condition(
&mut stored_manifest,
move |core| {
core.sequence_tracker
.find_seq(target_ts, FindOption::RoundDown)
== Some(3)
},
Duration::from_secs(5),
)
.await;
let tracker = persisted_state.sequence_tracker.clone();
let live_tracker = kv_store
.inner
.state
.read()
.state()
.core()
.sequence_tracker
.clone();
assert_eq!(tracker, live_tracker);
let seq1_ts = tracker.find_ts(1, FindOption::RoundDown).unwrap();
assert_eq!(seq1_ts.timestamp(), 0);
let seq2_ts = tracker.find_ts(2, FindOption::RoundDown).unwrap();
assert_eq!(seq2_ts.timestamp(), 60);
let seq3_ts = tracker.find_ts(3, FindOption::RoundDown).unwrap();
assert_eq!(seq3_ts.timestamp(), 120);
let ts_lookup = Utc.timestamp_opt(60, 0).single().unwrap();
assert_eq!(tracker.find_seq(ts_lookup, FindOption::RoundDown), Some(2));
kv_store.close().await.unwrap();
let reopened = Db::builder(path, object_store.clone())
.with_settings(settings)
.with_system_clock(system_clock.clone())
.with_logical_clock(logical_clock.clone())
.build()
.await
.unwrap();
let reopened_tracker = reopened
.inner
.state
.read()
.state()
.core()
.sequence_tracker
.clone();
assert_eq!(tracker, reopened_tracker);
reopened.close().await.unwrap();
}
#[tokio::test]
#[cfg(feature = "test-util")]
async fn test_sequence_tracker_persisted_across_flush_and_reload_wal_enabled() {
test_sequence_tracker_persisted_across_flush_and_reload_impl(true).await;
}
#[tokio::test]
#[cfg(all(feature = "test-util", feature = "wal_disable"))]
async fn test_sequence_tracker_persisted_across_flush_and_reload_wal_disabled() {
test_sequence_tracker_persisted_across_flush_and_reload_impl(false).await;
}
#[cfg(feature = "test-util")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_sequence_tracker_not_ahead_of_last_l0_seq_when_flush_races_with_writes() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut settings = test_db_options(0, 512, None);
settings.flush_interval = None;
let system_clock = Arc::new(MockSystemClock::new());
let db = Db::builder(
"/tmp/test_sequence_tracker_flush_race",
object_store.clone(),
)
.with_settings(settings)
.with_system_clock(system_clock.clone())
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let write_options = WriteOptions {
await_durable: false,
};
async fn put_with_timestamp(
db: &Db,
system_clock: &Arc<MockSystemClock>,
idx: usize,
write_options: &WriteOptions,
) {
system_clock.set((idx as i64) * 60_000);
let key = format!("race-key-{idx}").into_bytes();
let value = format!("race-value-{idx}").into_bytes();
db.put_with_options(&key, &value, &PutOptions::default(), write_options)
.await
.unwrap();
}
put_with_timestamp(&db, &system_clock, 0, &write_options).await;
put_with_timestamp(&db, &system_clock, 1, &write_options).await;
fail_parallel::cfg(fp_registry.clone(), "flush-memtable-to-l0", "pause").unwrap();
let flush_handle = {
let inner = Arc::clone(&db.inner);
tokio::spawn(async move { inner.flush_memtables().await })
};
let mut froze_memtable = false;
for _ in 0..6000 {
{
let guard = db.inner.state.read();
if !guard.state().imm_memtable.is_empty() {
froze_memtable = true;
break;
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(froze_memtable, "memtable was not frozen before flush");
put_with_timestamp(&db, &system_clock, 2, &write_options).await;
put_with_timestamp(&db, &system_clock, 3, &write_options).await;
fail_parallel::cfg(fp_registry.clone(), "flush-memtable-to-l0", "off").unwrap();
flush_handle.await.unwrap().unwrap();
{
let guard = db.inner.state.read();
assert!(guard.state().imm_memtable.is_empty());
}
let manifest_state = {
let guard = db.inner.state.read();
guard.state().manifest.core().clone()
};
let last_l0_seq = manifest_state.last_l0_seq;
assert!(
last_l0_seq >= 2,
"expected flushed memtable to advance last_l0_seq"
);
assert!(
manifest_state
.sequence_tracker
.find_ts(last_l0_seq + 1, FindOption::RoundUp)
.is_none(),
"sequence tracker should not advance beyond last_l0_seq (last_l0_seq={})",
last_l0_seq
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_flush_with_options_wal() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_flush_with_options_wal";
let mut options = test_db_options(0, 1024, None);
options.flush_interval = Some(Duration::from_secs(u64::MAX));
fail_parallel::cfg(
fp_registry.clone(),
"write-compacted-sst-io-error",
"return",
)
.unwrap();
let kv_store = Db::builder(path, object_store.clone())
.with_settings(options)
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let key1 = b"wal_test_key_1";
let value1 = b"wal_test_value_1";
kv_store
.put_with_options(
key1,
value1,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let key2 = b"wal_test_key_2";
let value2 = b"wal_test_value_2";
kv_store
.put_with_options(
key2,
value2,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let initial_wal_id = kv_store.inner.wal_buffer.recent_flushed_wal_id();
let flush_result = kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await;
assert!(flush_result.is_ok(), "WAL flush should succeed");
let retrieved_value1 = kv_store.get(key1).await.unwrap().unwrap();
assert_eq!(retrieved_value1.as_ref(), value1);
let retrieved_value2 = kv_store.get(key2).await.unwrap().unwrap();
assert_eq!(retrieved_value2.as_ref(), value2);
let final_wal_id = kv_store.inner.wal_buffer.recent_flushed_wal_id();
assert!(
final_wal_id >= initial_wal_id,
"WAL ID should not decrease after flush"
);
assert!(
kv_store.inner.check_closed().is_ok(),
"DB should not have an error state"
);
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_flush_with_options_wal_disabled_error() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_flush_with_options_wal_disabled";
let mut options = test_db_options(0, 1024, None);
options.wal_enabled = false; let kv_store = Db::builder(path, object_store.clone())
.with_settings(options)
.build()
.await
.unwrap();
let key1 = b"test_key_1";
let value1 = b"test_value_1";
kv_store
.put_with_options(
key1,
value1,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let flush_result = kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await;
assert!(flush_result.is_err(), "Expected WalDisabled error");
let error = flush_result.unwrap_err();
assert!(
error
.to_string()
.contains("attempted a WAL operation when the WAL is disabled"),
"Expected WalDisabled error message, got: {}",
error
);
let memtable_flush_result = kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await;
assert!(
memtable_flush_result.is_ok(),
"Memtable flush should work even when WAL is disabled"
);
let retrieved_value1 = kv_store.get(key1).await.unwrap().unwrap();
assert_eq!(retrieved_value1.as_ref(), value1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_apply_wal_memory_backpressure() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_kv_store");
let mut options = test_db_options(0, 1, None);
options.max_unflushed_bytes = 1;
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let db_stats = db.inner.db_stats.clone();
let write_opts = WriteOptions {
await_durable: false,
};
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
let wait_for = async move |condition: Box<dyn Fn() -> bool>| {
for _ in 0..3000 {
if condition() {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
};
db.put_with_options(b"key1", b"val1", &PutOptions::default(), &write_opts)
.await
.unwrap();
let this_wal_buffer = db.inner.wal_buffer.clone();
wait_for(Box::new(move || {
this_wal_buffer.buffered_wal_entries_count() > 0
}))
.await;
assert_eq!(db.inner.wal_buffer.buffered_wal_entries_count(), 1);
let join_handle = tokio::spawn(async move {
db.put_with_options(b"key2", b"val2", &PutOptions::default(), &write_opts)
.await
.unwrap();
});
let this_stats = db_stats.clone();
wait_for(Box::new(move || {
this_stats.backpressure_count.value.load(Ordering::SeqCst) > 0
}))
.await;
assert!(db_stats.backpressure_count.value.load(Ordering::SeqCst) >= 1);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
join_handle.abort();
let _ = join_handle.await;
}
#[tokio::test]
async fn test_apply_backpressure_to_memtable_flush() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut options = test_db_options(0, 1, None);
options.l0_max_ssts = 4;
let db = Db::builder("/tmp/test_kv_store", object_store.clone())
.with_settings(options)
.build()
.await
.unwrap();
db.put(b"key1", b"val1").await.unwrap();
db.put(b"key2", b"val2").await.unwrap();
db.put(b"key3", b"val3").await.unwrap();
db.put(b"key4", b"val4").await.unwrap();
db.put(b"key5", b"val5").await.unwrap();
db.flush().await.unwrap();
let db_state = db.inner.state.read().view();
assert_eq!(db_state.state.imm_memtable.len(), 1);
}
#[tokio::test]
async fn test_put_empty_value() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let key = b"test_key";
let value = b"";
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
}
#[tokio::test]
async fn test_flush_while_iterating() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_logical_clock(Arc::new(TestClock::new()))
.build()
.await
.unwrap();
let memtable = {
let lock = kv_store.inner.state.read();
lock.memtable()
.put(RowEntry::new_value(b"abc1111", b"value1111", 1));
lock.memtable()
.put(RowEntry::new_value(b"abc2222", b"value2222", 2));
lock.memtable()
.put(RowEntry::new_value(b"abc3333", b"value3333", 3));
lock.memtable().table().clone()
};
let mut iter = memtable.iter();
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key, b"abc1111".as_slice());
kv_store.flush().await.unwrap();
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key, b"abc2222".as_slice());
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key, b"abc3333".as_slice());
}
#[tokio::test]
async fn test_basic_restore() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let mut next_wal_id = 1;
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_logical_clock(Arc::new(TestClock::new()))
.build()
.await
.unwrap();
next_wal_id += 1;
let l0_count: u64 = 3;
for i in 0..l0_count {
kv_store
.put(&[b'a' + i as u8; 16], &[b'b' + i as u8; 48])
.await
.unwrap();
kv_store
.put(&[b'j' + i as u8; 16], &[b'k' + i as u8; 48])
.await
.unwrap();
next_wal_id += 2;
}
let sst_count: u64 = 5;
for i in 0..sst_count {
kv_store
.put(&i.to_be_bytes(), &i.to_be_bytes())
.await
.unwrap();
kv_store.flush().await.unwrap();
next_wal_id += 1;
}
kv_store.close().await.unwrap();
let kv_store_restored = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_logical_clock(Arc::new(TestClock::new()))
.build()
.await
.unwrap();
next_wal_id += 1;
for i in 0..l0_count {
let val = kv_store_restored.get([b'a' + i as u8; 16]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[b'b' + i as u8; 48])));
let val = kv_store_restored.get([b'j' + i as u8; 16]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[b'k' + i as u8; 48])));
}
for i in 0..sst_count {
let val = kv_store_restored.get(i.to_be_bytes()).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&i.to_be_bytes())));
}
kv_store_restored.close().await.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let stored_manifest =
StoredManifest::load(manifest_store, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
assert_eq!(db_state.next_wal_sst_id, next_wal_id);
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn test_restore_seq_number() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 256, None))
.with_logical_clock(Arc::new(TestClock::new()))
.build()
.await
.unwrap();
db.put(b"key1", b"val1").await.unwrap();
db.put(b"key2", b"val2").await.unwrap();
db.put(b"key3", b"val3").await.unwrap();
db.flush().await.unwrap();
db.close().await.unwrap();
let db_restored = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 256, None))
.with_logical_clock(Arc::new(TestClock::new()))
.build()
.await
.unwrap();
let state = db_restored.inner.state.read();
let memtable = state.memtable();
let mut iter = memtable.table().iter();
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(b"key1", b"val1", 1).with_create_ts(0),
RowEntry::new_value(b"key2", b"val2", 2).with_create_ts(0),
RowEntry::new_value(b"key3", b"val3", 3).with_create_ts(0),
],
)
.await;
}
#[tokio::test]
async fn test_read_merges_from_snapshot_across_compaction() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/testdb";
let should_compact_l0 = Arc::new(AtomicBool::new(false));
let this_should_compact_l0 = should_compact_l0.clone();
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
move |_state| this_should_compact_l0.swap(false, Ordering::SeqCst),
)));
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024 * 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator {}))
.with_compaction_scheduler_supplier(compaction_scheduler)
.build()
.await
.unwrap();
let db = Arc::new(db);
db.merge(b"foo", b"0").await.unwrap();
let snapshot = db.snapshot().await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
db.merge(b"foo", b"1").await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
should_compact_l0.store(true, Ordering::SeqCst);
let db_poll = db.clone();
tokio::time::timeout(Duration::from_secs(10), async move {
loop {
{
let db_state = db_poll.inner.state.read();
if !db_state.state().core().compacted.is_empty() {
return;
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
let result = snapshot.get(b"foo").await.unwrap();
assert_eq!(result, Some(Bytes::copy_from_slice(b"0")));
let result = db.get(b"foo").await.unwrap();
assert_eq!(result, Some(Bytes::copy_from_slice(b"01")));
}
#[tokio::test]
async fn test_all_kv_seq_num_are_greater_than_0() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store_seq_num";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024 * 1024, None))
.build()
.await
.unwrap();
db.put(b"key1", b"value1").await.unwrap();
let val = db.get(b"key1").await.unwrap();
assert_eq!(val, Some(Bytes::from_static(b"value1")));
let state = db.inner.state.read();
let memtable = state.memtable();
assert_eq!(memtable.table().last_seq(), Some(1));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_should_read_uncommitted_data_if_read_level_uncommitted() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
kv_store
.put_with_options(
"foo".as_bytes(),
"bar".as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(val, Some("bar".into()));
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(val, None);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
kv_store.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_should_read_only_committed_data() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
kv_store
.put("foo".as_bytes(), "bar".as_bytes())
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
kv_store
.put_with_options(
"foo".as_bytes(),
"bla".as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(val, Some("bar".into()));
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(val, Some("bla".into()));
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_should_delete_without_awaiting_flush() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
kv_store
.put("foo".as_bytes(), "bar".as_bytes())
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
kv_store
.delete_with_options(
"foo".as_bytes(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(val, Some("bar".into()));
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(val, None);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_should_read_only_committed_data() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
kv_store
.put("key1".as_bytes(), "committed1".as_bytes())
.await
.unwrap();
kv_store
.put("key2".as_bytes(), "committed2".as_bytes())
.await
.unwrap();
kv_store
.put("key3".as_bytes(), "committed3".as_bytes())
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
kv_store
.put_with_options(
"key2".as_bytes(),
"uncommitted2".as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
kv_store
.put_with_options(
"key4".as_bytes(),
"uncommitted4".as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let mut iter = kv_store
.scan_with_options(
"key1".as_bytes().."key5".as_bytes(),
&ScanOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key1");
assert_eq!(kv.value.as_ref(), b"committed1");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key2");
assert_eq!(kv.value.as_ref(), b"committed2");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key3");
assert_eq!(kv.value.as_ref(), b"committed3");
assert_eq!(iter.next().await.unwrap(), None);
let mut iter = kv_store
.scan_with_options(
"key1".as_bytes().."key5".as_bytes(),
&ScanOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key1");
assert_eq!(kv.value.as_ref(), b"committed1");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key2");
assert_eq!(kv.value.as_ref(), b"uncommitted2");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key3");
assert_eq!(kv.value.as_ref(), b"committed3");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key4");
assert_eq!(kv.value.as_ref(), b"uncommitted4");
assert_eq!(iter.next().await.unwrap(), None);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
kv_store.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_recover_imm_from_wal() {
let fp_registry = Arc::new(FailPointRegistry::new());
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "pause").unwrap();
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let mut next_wal_id = 1;
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
next_wal_id += 1;
let key1 = [b'a'; 32];
let value1 = [b'b'; 96];
db.put(key1, value1).await.unwrap();
next_wal_id += 1;
let key2 = [b'c'; 32];
let value2 = [b'd'; 96];
db.put(key2, value2).await.unwrap();
next_wal_id += 1;
let reader = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
next_wal_id += 1;
let db_state = reader.inner.state.read().view();
assert_eq!(db_state.state.imm_memtable.len(), 2);
assert_eq!(
db_state
.state
.imm_memtable
.front()
.unwrap()
.recent_flushed_wal_id(),
1 + 2
);
assert_eq!(
db_state
.state
.imm_memtable
.get(1)
.unwrap()
.recent_flushed_wal_id(),
2
);
assert_eq!(db_state.state.core().next_wal_sst_id, next_wal_id);
assert_eq!(
reader.get(key1).await.unwrap(),
Some(Bytes::copy_from_slice(&value1))
);
assert_eq!(
reader.get(key2).await.unwrap(),
Some(Bytes::copy_from_slice(&value2))
);
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "off").unwrap();
db.close().await.unwrap();
reader.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_recover_imm_from_wal_after_flush_error() {
let fp_registry = Arc::new(FailPointRegistry::new());
fail_parallel::cfg(
fp_registry.clone(),
"write-compacted-sst-io-error",
"return",
)
.unwrap();
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 4096, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let key1 = [b'a'; 32];
let value1 = [b'b'; 96];
let result = db.put(&key1, &value1).await;
assert!(result.is_ok(), "Failed to write key1");
assert_eq!(db.inner.wal_buffer.recent_flushed_wal_id(), 2);
let flush_result = db.inner.flush_memtables().await;
assert!(flush_result.is_err());
db.close().await.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "pause").unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let db_state = db.inner.state.read().view();
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "off").unwrap();
assert_eq!(db_state.state.imm_memtable.len(), 1);
assert_eq!(db_state.state.core().l0.len(), 0);
assert_eq!(db_state.state.core().compacted.len(), 0);
assert_eq!(
db_state
.state
.imm_memtable
.front()
.unwrap()
.recent_flushed_wal_id(),
1 + 1
);
assert!(db_state.state.imm_memtable.get(1).is_none());
assert_eq!(db_state.state.core().next_wal_sst_id, 4);
assert_eq!(
db.get(key1).await.unwrap(),
Some(Bytes::copy_from_slice(&value1))
);
}
#[tokio::test]
async fn test_should_fail_write_if_wal_flush_task_panics() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Arc::new(
Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap(),
);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "panic").unwrap();
let result = db.put(b"foo", b"bar").await.unwrap_err();
assert!(result.to_string().contains("background task panicked"));
}
#[tokio::test]
async fn test_wal_id_last_seen_should_exist_even_if_wal_write_fails() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Arc::new(
Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap(),
);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "panic").unwrap();
let result = db.put(b"foo", b"bar").await.unwrap_err();
assert_eq!(result.kind(), crate::ErrorKind::Closed(CloseReason::Panic));
assert!(result
.to_string()
.contains("background task panicked. name=`wal_writer`"));
db.close().await.unwrap();
let manifest_store = ManifestStore::new(&Path::from(path), object_store.clone());
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store.clone(), None),
SsTableFormat::default(),
path,
None,
));
let next_wal_sst_id = table_store.next_wal_sst_id(0).await.unwrap();
let (_, manifest) = manifest_store.read_latest_manifest().await.unwrap();
assert!(manifest.core.next_wal_sst_id > next_wal_sst_id);
}
async fn do_test_should_read_compacted_db(options: Settings) {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let should_compact_l0 = Arc::new(AtomicBool::new(false));
let this_should_compact_l0 = should_compact_l0.clone();
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
move |_state| this_should_compact_l0.swap(false, Ordering::SeqCst),
)));
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.with_compaction_scheduler_supplier(compaction_scheduler.clone())
.build()
.await
.unwrap();
let ms = ManifestStore::new(&Path::from(path), object_store.clone());
let mut sm = StoredManifest::load(Arc::new(ms), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
for i in 0..4 {
db.put(&[b'a' + i; 32], &[1u8 + i; 32]).await.unwrap();
db.put(&[b'm' + i; 32], &[13u8 + i; 32]).await.unwrap();
}
wait_for_manifest_condition(
&mut sm,
|s| {
should_compact_l0.store(true, Ordering::SeqCst);
s.l0_last_compacted.is_some() && s.l0.is_empty()
},
Duration::from_secs(10),
)
.await;
info!(
"1 l0: {} {}",
db.inner.state.read().state().core().l0.len(),
db.inner.state.read().state().core().compacted.len()
);
for i in 0..4 {
db.put(&[b'f' + i; 32], &[6u8 + i; 32]).await.unwrap();
db.put(&[b's' + i; 32], &[19u8 + i; 32]).await.unwrap();
}
wait_for_manifest_condition(
&mut sm,
|s| {
should_compact_l0.store(true, Ordering::SeqCst);
s.l0_last_compacted.is_some() && s.l0.is_empty()
},
Duration::from_secs(10),
)
.await;
info!(
"2 l0: {} {}",
db.inner.state.read().state().core().l0.len(),
db.inner.state.read().state().core().compacted.len()
);
db.put(&[b'a'; 32], &[128u8; 32]).await.unwrap();
db.put(&[b'm'; 32], &[129u8; 32]).await.unwrap();
let val = db.get([b'a'; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[128u8; 32])));
let val = db.get([b'm'; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[129u8; 32])));
for i in 1..4 {
info!(
"3 l0: {} {}",
db.inner.state.read().state().core().l0.len(),
db.inner.state.read().state().core().compacted.len()
);
let val = db.get([b'a' + i; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[1u8 + i; 32])));
let val = db.get([b'm' + i; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[13u8 + i; 32])));
}
for i in 0..4 {
let val = db.get([b'f' + i; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[6u8 + i; 32])));
let val = db.get([b's' + i; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[19u8 + i; 32])));
}
let neg_lookup = db.get(b"abc").await;
assert!(neg_lookup.unwrap().is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_read_from_compacted_db() {
do_test_should_read_compacted_db(test_db_options(
0,
127,
Some(CompactorOptions {
poll_interval: Duration::from_millis(100),
max_sst_size: 256,
max_concurrent_compactions: 1,
manifest_update_timeout: Duration::from_secs(300),
}),
))
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_read_from_compacted_db_no_filters() {
do_test_should_read_compacted_db(test_db_options(
u32::MAX,
127,
Some(CompactorOptions {
poll_interval: Duration::from_millis(100),
manifest_update_timeout: Duration::from_secs(300),
max_sst_size: 256,
max_concurrent_compactions: 1,
}),
))
.await
}
#[tokio::test]
async fn test_db_open_should_write_empty_wal() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.build()
.await
.unwrap();
assert_eq!(db.inner.state.read().state().core().next_wal_sst_id, 2);
db.put(b"1", b"1").await.unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.build()
.await
.unwrap();
assert_eq!(db.inner.state.read().state().core().next_wal_sst_id, 4);
}
#[tokio::test]
async fn test_empty_wal_should_fence_old_writer() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
async fn do_put(db: &Db, key: &[u8], val: &[u8]) -> Result<(), crate::Error> {
db.put_with_options(
key,
val,
&PutOptions::default(),
&WriteOptions {
await_durable: true,
},
)
.await
}
let db1 = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.build()
.await
.unwrap();
do_put(&db1, b"1", b"1").await.unwrap();
let db2 = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.build()
.await
.unwrap();
let err = do_put(&db1, b"1", b"1").await.unwrap_err();
assert_eq!(err.to_string(), "Closed error: detected newer DB client");
do_put(&db2, b"2", b"2").await.unwrap();
assert_eq!(db2.inner.state.read().state().core().next_wal_sst_id, 5);
}
#[tokio::test]
async fn test_invalid_clock_progression() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let clock = Arc::new(TestClock::new());
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
clock.ticker.store(10, Ordering::SeqCst);
db.put(b"1", b"1").await.unwrap();
clock.ticker.store(5, Ordering::SeqCst);
match db.put(b"1", b"1").await {
Ok(_) => panic!("expected an error on inserting backwards time"),
Err(e) => assert_eq!(e.to_string(), "Invalid error: invalid clock tick, must be monotonic. last_tick=`10`, next_tick=`5`"),
}
}
#[tokio::test]
async fn test_invalid_clock_progression_across_db_instances() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let clock = Arc::new(TestClock::new());
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
clock.ticker.store(10, Ordering::SeqCst);
db.put(b"1", b"1").await.unwrap();
db.flush().await.unwrap();
let db2 = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
clock.ticker.store(5, Ordering::SeqCst);
match db2.put(b"1", b"1").await {
Ok(_) => panic!("expected an error on inserting backwards time"),
Err(e) => assert_eq!(e.to_string(), "Invalid error: invalid clock tick, must be monotonic. last_tick=`10`, next_tick=`5`"),
}
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn should_flush_all_memtables_when_wal_disabled() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db_options = Settings {
wal_enabled: false,
flush_interval: Some(Duration::from_secs(10)),
..Settings::default()
};
let db = Db::builder(path, object_store.clone())
.with_settings(db_options.clone())
.build()
.await
.unwrap();
let mut rng = proptest_util::rng::new_test_rng(None);
let table = sample::table(&mut rng, 1000, 5);
test_utils::seed_database(&db, &table, false).await.unwrap();
db.flush().await.unwrap();
let reopened_db = Db::builder(path, object_store.clone())
.with_settings(db_options.clone())
.build()
.await
.unwrap();
assert_records_in_range(
&table,
&reopened_db,
&ScanOptions::default(),
BytesRange::from(..),
)
.await
}
#[tokio::test]
async fn test_recover_clock_tick_from_wal() {
let clock = Arc::new(TestClock::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
clock.ticker.store(10, Ordering::SeqCst);
db.put(&[b'a'; 4], &[b'j'; 8])
.await
.expect("write batch failed");
clock.ticker.store(11, Ordering::SeqCst);
db.put(&[b'b'; 4], &[b'k'; 8])
.await
.expect("write batch failed");
db.close().await.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let stored_manifest =
StoredManifest::load(manifest_store, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
let last_clock_tick = db_state.last_l0_clock_tick;
assert_eq!(last_clock_tick, i64::MIN);
let clock = Arc::new(TestClock::new());
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
assert_eq!(db.inner.mono_clock.last_tick.load(Ordering::SeqCst), 11);
}
#[tokio::test]
async fn test_should_update_manifest_clock_tick_on_l0_flush() {
let clock = Arc::new(TestClock::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 32, None))
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
clock.ticker.store(10, Ordering::SeqCst);
db.put(&[b'a'; 4], &[b'j'; 8])
.await
.expect("write batch failed");
clock.ticker.store(11, Ordering::SeqCst);
db.put(&[b'b'; 4], &[b'k'; 8])
.await
.expect("write batch failed");
db.flush().await.unwrap();
db.close().await.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let stored_manifest =
StoredManifest::load(manifest_store, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
let last_clock_tick = db_state.last_l0_clock_tick;
assert_eq!(last_clock_tick, 11);
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_recover_clock_tick_from_manifest() {
let clock = Arc::new(TestClock::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let mut options = test_db_options(0, 32, None);
options.wal_enabled = false;
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
clock.ticker.store(10, Ordering::SeqCst);
db.put(&[b'a'; 4], &[b'j'; 28])
.await
.expect("write batch failed");
clock.ticker.store(11, Ordering::SeqCst);
db.put(&[b'b'; 4], &[b'k'; 28])
.await
.expect("write batch failed");
db.flush().await.unwrap();
db.close().await.unwrap();
let clock = Arc::new(TestClock::new());
let mut options = test_db_options(0, 32, None);
options.wal_enabled = false;
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.with_logical_clock(clock.clone())
.build()
.await
.unwrap();
assert_eq!(db.inner.mono_clock.last_tick.load(Ordering::SeqCst), 11);
}
#[tokio::test]
async fn test_put_get_reopen_delete_with_separate_wal_store() {
async fn count_ssts_in(store: &Arc<InMemory>) -> usize {
store
.list(None)
.filter(|r| {
future::ready(
r.as_ref()
.unwrap()
.location
.extension()
.unwrap()
.to_lowercase()
== "sst",
)
})
.count()
.await
}
let fp_registry = Arc::new(FailPointRegistry::new());
let main_object_store = Arc::new(InMemory::new());
let wal_object_store = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", main_object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_wal_object_store(wal_object_store.clone())
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
assert_eq!(count_ssts_in(&main_object_store).await, 0);
assert_eq!(count_ssts_in(&wal_object_store).await, 1);
let key = b"test_key";
let value = b"test_value";
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "pause").unwrap();
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
assert_eq!(count_ssts_in(&main_object_store).await, 0);
assert_eq!(count_ssts_in(&wal_object_store).await, 2);
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "off").unwrap();
let mut batch = WriteBatch::default();
for i in 0u32..128 {
batch.put(i.to_be_bytes(), i.to_be_bytes());
}
kv_store.write(batch).await.unwrap();
kv_store.flush().await.unwrap();
assert_eq!(count_ssts_in(&main_object_store).await, 1);
assert_eq!(count_ssts_in(&wal_object_store).await, 3);
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
kv_store.close().await.unwrap();
assert_eq!(count_ssts_in(&wal_object_store).await, 3);
let kv_store = Db::builder("/tmp/test_kv_store", main_object_store)
.with_settings(test_db_options(0, 1024, None))
.with_wal_object_store(wal_object_store.clone())
.build()
.await
.unwrap();
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
kv_store.delete(key).await.unwrap();
assert_eq!(None, kv_store.get(key).await.unwrap());
kv_store.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_memtable_flush_cleanup_when_fenced() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_flush_cleanup";
let fp_registry = Arc::new(FailPointRegistry::new());
let mut options = test_db_options(0, 32, None);
options.flush_interval = None;
options.manifest_poll_interval = TimeDelta::MAX.to_std().unwrap();
let db1 = Db::builder(path, object_store.clone())
.with_settings(options.clone())
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "pause").unwrap();
db1.put(b"k", b"v").await.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
FenceableManifest::init_writer(
stored_manifest,
Duration::from_secs(300),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "off").unwrap();
let result = db1.inner.flush_memtables().await;
assert!(matches!(result, Err(SlateDBError::Fenced)));
assert!(db1
.inner
.table_store
.list_compacted_ssts(..)
.await
.unwrap()
.is_empty());
db1.close().await.unwrap();
}
#[tokio::test]
async fn test_wal_store_reconfiguration_fails() {
let object_store = Arc::new(InMemory::new());
let wal_object_store = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_wal_object_store(wal_object_store.clone())
.build()
.await
.unwrap();
kv_store.close().await.unwrap();
let result = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await;
match result {
Err(err) => {
assert!(err.to_string().contains("unsupported"));
}
_ => panic!("expected Unsupported error"),
}
}
#[test]
fn test_write_option_defaults() {
let write_options = WriteOptions::default();
assert!(write_options.await_durable);
}
#[tokio::test]
#[cfg(feature = "zstd")]
async fn test_compression_overflow_bug() {
use crate::config::CompressionCodec;
use std::str::FromStr;
let os: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let compress = CompressionCodec::from_str("zstd").unwrap();
let db_builder = Db::builder("/tmp/test_kv_store", os.clone()).with_settings(Settings {
compression_codec: Some(compress),
..Settings::default()
});
let db = db_builder.build().await.unwrap();
for i in 0..1000 {
let key = format!("k{}", i);
let value = format!("{}{}", "v".repeat(i), i);
let put_option = PutOptions::default();
let write_option = WriteOptions {
await_durable: false,
};
db.put_with_options(key.as_bytes(), value.clone(), &put_option, &write_option)
.await
.expect("failed to put");
}
db.flush().await.expect("flush failed");
db.close().await.expect("failed to close db");
let db_builder = Db::builder("/tmp/test_kv_store", os.clone()).with_settings(Settings {
compression_codec: Some(compress),
..Settings::default()
});
let db = db_builder.build().await.unwrap();
let v = db.get("k1").await.expect("get failed").unwrap();
assert_eq!(v.as_ref(), b"v1");
db.close().await.expect("failed to close db");
}
async fn wait_for_manifest_condition(
sm: &mut StoredManifest,
cond: impl Fn(&CoreDbState) -> bool,
timeout: Duration,
) -> CoreDbState {
let start = tokio::time::Instant::now();
while start.elapsed() < timeout {
let manifest = sm.refresh().await.unwrap();
if cond(&manifest.core) {
return manifest.core.clone();
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!("manifest condition took longer than timeout")
}
fn test_db_options(
min_filter_keys: u32,
l0_sst_size_bytes: usize,
compactor_options: Option<CompactorOptions>,
) -> Settings {
test_db_options_with_ttl(min_filter_keys, l0_sst_size_bytes, compactor_options, None)
}
fn test_db_options_with_ttl(
min_filter_keys: u32,
l0_sst_size_bytes: usize,
compactor_options: Option<CompactorOptions>,
ttl: Option<u64>,
) -> Settings {
Settings {
flush_interval: Some(Duration::from_millis(100)),
#[cfg(feature = "wal_disable")]
wal_enabled: true,
manifest_poll_interval: Duration::from_millis(100),
manifest_update_timeout: Duration::from_secs(300),
max_unflushed_bytes: 134_217_728,
l0_max_ssts: 8,
min_filter_keys,
filter_bits_per_key: 10,
l0_sst_size_bytes,
compactor_options,
compression_codec: None,
merge_operator: None,
object_store_cache_options: ObjectStoreCacheOptions::default(),
garbage_collector_options: None,
default_ttl: ttl,
}
}
#[tokio::test]
async fn test_snapshot_basic_functionality() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::open("test_db", object_store).await.unwrap();
db.put(b"key1", b"value1").await.unwrap();
db.put(b"key2", b"value2").await.unwrap();
let snapshot = db.snapshot().await.unwrap();
assert_eq!(
snapshot.get(b"key1").await.unwrap(),
Some(Bytes::from(b"value1".as_ref()))
);
assert_eq!(
snapshot.get(b"key2").await.unwrap(),
Some(Bytes::from(b"value2".as_ref()))
);
db.put(b"key3", b"value3").await.unwrap();
assert_eq!(snapshot.get(b"key3").await.unwrap(), None);
assert_eq!(
db.get(b"key3").await.unwrap(),
Some(Bytes::from(b"value3".as_ref()))
);
}
#[tokio::test]
async fn test_recent_snapshot_min_seq_monotonic() {
let path = "/tmp/test_recent_snapshot_min_seq_monotonic";
let object_store = Arc::new(InMemory::new());
let settings = Settings {
l0_sst_size_bytes: 4 * 1024, max_unflushed_bytes: 2 * 1024, min_filter_keys: 0,
flush_interval: Some(Duration::from_millis(100)),
..Default::default()
};
let db = Db::builder(path, object_store)
.with_settings(settings)
.build()
.await
.unwrap();
{
let state = db.inner.state.read();
assert_eq!(state.state().core().recent_snapshot_min_seq, 0);
}
db.put(b"key1", b"value1").await.unwrap();
db.inner.flush_memtables().await.unwrap();
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
assert!(
recent_min_seq > 0,
"recent_snapshot_min_seq should be > 0 after flush"
);
}
let _snapshot = db.snapshot().await.unwrap();
let snapshot_seq = db.inner.oracle.last_committed_seq();
db.put(b"key2", b"value2").await.unwrap();
db.inner.flush_memtables().await.unwrap();
let min_active_seq = db.inner.txn_manager.min_active_seq();
assert!(min_active_seq.is_some());
assert_eq!(min_active_seq.unwrap(), snapshot_seq);
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
assert_eq!(
recent_min_seq,
min_active_seq.unwrap(),
"recent_snapshot_min_seq should equal min_active_seq after flush"
);
}
drop(_snapshot);
db.put(b"key3", b"value3").await.unwrap();
db.inner.flush_memtables().await.unwrap();
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
let last_l0_seq = state.state().core().last_l0_seq;
assert_eq!(
recent_min_seq, last_l0_seq,
"recent_snapshot_min_seq should equal last_l0_seq when no active snapshots"
);
assert!(recent_min_seq > snapshot_seq);
}
}
#[tokio::test]
async fn test_memtable_flush_updates_last_remote_persisted_seq() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test";
let mut opts = test_db_options(0, 256, None);
opts.flush_interval = Some(Duration::MAX);
let db = Db::builder(path, object_store.clone())
.with_settings(opts)
.build()
.await
.unwrap();
let write_opts = WriteOptions {
await_durable: false,
};
db.put_with_options(&b"foo", &b"bar", &PutOptions::default(), &write_opts)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let v = db
.get_with_options(&b"foo", &ReadOptions::new().with_durability_filter(Memory))
.await
.unwrap();
assert_eq!(v, Some(Bytes::from(b"bar".as_ref())));
let v = db
.get_with_options(&b"foo", &ReadOptions::new().with_durability_filter(Remote))
.await
.unwrap();
assert_eq!(v, Some(Bytes::from(b"bar".as_ref())));
}
#[tokio::test]
async fn should_merge_operand_into_empty_key() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_merge_1", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
db.merge(b"key1", b"value1").await.unwrap();
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("value1")));
}
#[tokio::test]
async fn should_merge_multiple_operands_into_same_key() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_merge_2", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
db.merge(b"key1", b"a").await.unwrap();
db.merge(b"key1", b"b").await.unwrap();
db.merge(b"key1", b"c").await.unwrap();
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abc")));
}
#[tokio::test]
async fn should_persist_merge_operands_across_flush() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_merge_3", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
db.merge(b"key1", b"a").await.unwrap();
db.merge(b"key1", b"b").await.unwrap();
db.flush().await.unwrap();
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("ab")));
db.merge(b"key1", b"c").await.unwrap();
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abc")));
}
#[tokio::test]
async fn should_error_when_merging_without_merge_operator() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_merge_4", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
db.merge(b"key1", b"value1").await.unwrap();
let result = db.get(b"key1").await;
assert!(
result.is_err(),
"Reading merge operand without merge operator should error"
);
match result {
Err(e) => {
let error_string = format!("{}", e);
assert!(
error_string.contains("merge operator missing")
|| error_string.contains("MergeOperatorMissing"),
"Error should be MergeOperatorMissing, got: {:?}",
e
);
}
Ok(_) => unreachable!("Should have errored"),
}
}
#[tokio::test]
async fn should_merge_operands_after_reopen() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_merge_5";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
db.merge(b"key1", b"a").await.unwrap();
db.merge(b"key1", b"b").await.unwrap();
db.flush().await.unwrap();
db.close().await.unwrap();
let db_reopened = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let result = db_reopened.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("ab")));
db_reopened.merge(b"key1", b"c").await.unwrap();
let result = db_reopened.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abc")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_gc_race_deletes_l0_before_manifest_update() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_gc_race_deletes_l0_before_manifest_update");
let mut settings = test_db_options(0, 1024, None);
settings.flush_interval = None;
let db = Db::builder(path.clone(), object_store.clone())
.with_settings(settings)
.with_fp_registry(fp_registry.clone())
.build()
.await
.expect("failed to build DB");
let db = Arc::new(db);
fail_parallel::cfg(
fp_registry.clone(),
"after-flush-imm-to-l0-before-manifest",
"pause",
)
.expect("failed to set failpoint");
db.put_with_options(
b"key1",
b"value1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.expect("failed to put");
let this_db = db.clone();
let flush_handle = tokio::spawn(async move {
this_db
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
});
let mut ssts = Vec::new();
for _ in 0..200 {
ssts = db
.inner
.table_store
.list_compacted_ssts(..)
.await
.expect("failed to list compacted ssts");
if !ssts.is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
ssts.len(),
1,
"expected exactly one L0 SST after GC, but found {:?}",
ssts.iter().map(|sst| sst.id).collect::<Vec<_>>()
);
let gc_options = GarbageCollectorOptions {
wal_options: Some(GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_millis(0),
}),
manifest_options: Some(GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_millis(0),
}),
compacted_options: Some(GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_millis(0),
}),
compactions_options: Some(GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_millis(0),
}),
};
let gc = GarbageCollectorBuilder::new(path.clone(), object_store.clone())
.with_options(gc_options)
.with_stat_registry(db.metrics())
.with_system_clock(db.inner.system_clock.clone())
.build();
for _ in 0..5 {
gc.run_gc_once().await;
ssts = db
.inner
.table_store
.list_compacted_ssts(..)
.await
.expect("failed to list compacted ssts after manual GC");
if ssts.is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
ssts.len(),
1,
"expected exactly one L0 SST after GC, but found {:?}",
ssts.iter().map(|sst| sst.id).collect::<Vec<_>>()
);
fail_parallel::cfg(
fp_registry.clone(),
"after-flush-imm-to-l0-before-manifest",
"off",
)
.expect("failed to set failpoint");
flush_handle
.await
.expect("failed to join flush handle")
.expect("flush failed");
let manifest_store = ManifestStore::new(&path, object_store.clone());
let (_, manifest) = manifest_store
.read_latest_manifest()
.await
.expect("failed to read latest manifest");
assert_eq!(
manifest.core.l0.len(),
1,
"expected exactly one L0 SST in manifest"
);
let l0_id = manifest.core.l0[0].id;
assert_eq!(
l0_id, ssts[0].id,
"expected SST {:?} but found SST {:?}",
ssts[0].id, l0_id,
);
let table_store = TableStore::new(
ObjectStores::new(object_store.clone(), None),
SsTableFormat::default(),
path.clone(),
None,
);
let compacted_ssts = table_store
.list_compacted_ssts(..)
.await
.expect("failed to list compacted ssts");
let still_exists = compacted_ssts.iter().any(|m| m.id == l0_id);
assert!(
still_exists,
"manifest references L0 SST {:?} that GC has already deleted",
l0_id
);
db.close().await.expect("failed to close DB");
}
}