pub use crate::db_status::DbStatus;
use std::ops::RangeBounds;
use std::sync::Arc;
use bytes::Bytes;
use fail_parallel::FailPointRegistry;
use object_store::path::Path;
use object_store::prefix::PrefixStore;
use object_store::{parse_url_opts, ObjectStore};
use crate::compactor::COMPACTOR_TASK_NAME;
use crate::db_transaction::DbTransaction;
use crate::dispatcher::MessageHandlerExecutor;
use crate::garbage_collector::GC_TASK_NAME;
use crate::transaction_manager::IsolationLevel;
use crate::CloseReason;
use log::{info, trace, warn};
use parking_lot::RwLock;
use std::time::Duration;
use crate::batch::WriteBatch;
use crate::batch_write::{WriteBatchMessage, WRITE_BATCH_TASK_NAME};
use crate::bytes_range::BytesRange;
use crate::cached_object_store::CachedObjectStore;
use crate::clock::MonotonicClock;
use crate::config::{
FlushOptions, FlushType, MergeOptions, PutOptions, ReadOptions, ScanOptions, Settings,
WriteOptions,
};
use crate::db_iter::DbIterator;
use crate::db_read::DbRead;
use crate::db_snapshot::DbSnapshot;
use crate::db_state::{DbState, SsTableId};
use crate::db_stats::DbStats;
use crate::error::SlateDBError;
use crate::iter::IterationOrder;
use crate::manifest::store::FenceableManifest;
use crate::manifest::{Manifest, ManifestCore};
use crate::mem_table::WritableKVTable;
use crate::memtable_flusher::{FlushResult, FlushTarget, MemtableFlusher};
use crate::merge_operator::{instrument_merge_operator, MergeOperatorType};
use crate::oracle::{DbOracle, Oracle};
use crate::paths::PathResolver;
use crate::rand::DbRand;
use crate::reader::Reader;
use crate::snapshot_manager::SnapshotManager;
use crate::sst_iter::SstIteratorOptions;
use crate::tablestore::TableStore;
use crate::transaction_manager::TransactionManager;
use crate::types::KeyValue;
use crate::utils::{format_bytes_si, SafeSender};
use crate::wal_buffer::{WalBufferManager, WAL_BUFFER_TASK_NAME};
use crate::wal_replay::{WalReplayIterator, WalReplayOptions};
use slatedb_common::clock::SystemClock;
use slatedb_common::metrics::MetricsRecorderHelper;
use slatedb_txn_obj::DirtyObject;
use crate::db_status::{ClosedResultWriter, DbStatusManager};
pub use builder::DbBuilder;
pub use builder::DbReaderBuilder;
pub(crate) mod builder;
pub(crate) struct DbInner {
pub(crate) state: Arc<RwLock<DbState>>,
pub(crate) settings: Settings,
pub(crate) table_store: Arc<TableStore>,
pub(crate) memtable_flusher: Arc<MemtableFlusher>,
pub(crate) write_notifier: SafeSender<WriteBatchMessage>,
pub(crate) db_stats: DbStats,
#[allow(dead_code)]
pub(crate) recorder: MetricsRecorderHelper,
#[allow(dead_code)]
pub(crate) fp_registry: Arc<FailPointRegistry>,
pub(crate) mono_clock: Arc<MonotonicClock>,
pub(crate) system_clock: Arc<dyn SystemClock>,
pub(crate) rand: Arc<DbRand>,
pub(crate) oracle: Arc<DbOracle>,
pub(crate) flush_merge_operator: Option<MergeOperatorType>,
pub(crate) reader: Reader,
pub(crate) wal_buffer: Arc<WalBufferManager>,
pub(crate) wal_enabled: bool,
pub(crate) txn_manager: Arc<TransactionManager>,
pub(crate) snapshot_manager: Arc<SnapshotManager>,
pub(crate) status_manager: DbStatusManager,
}
impl DbInner {
pub(crate) async fn new(
settings: Settings,
system_clock: Arc<dyn SystemClock>,
rand: Arc<DbRand>,
table_store: Arc<TableStore>,
manifest: DirtyObject<Manifest>,
memtable_flusher: Arc<MemtableFlusher>,
write_notifier: SafeSender<WriteBatchMessage>,
recorder: MetricsRecorderHelper,
fp_registry: Arc<FailPointRegistry>,
merge_operator: Option<crate::merge_operator::MergeOperatorType>,
status_manager: DbStatusManager,
) -> Result<Self, SlateDBError> {
let last_l0_seq = manifest.value.core.last_l0_seq;
let oracle = Arc::new(DbOracle::new(
last_l0_seq,
last_l0_seq,
last_l0_seq,
status_manager.clone(),
));
let mono_clock = Arc::new(MonotonicClock::new(
system_clock.clone(),
manifest.value.core.last_l0_clock_tick,
));
let db_state = DbState::new(manifest);
let state = Arc::new(RwLock::new(db_state));
let db_stats = DbStats::new(&recorder);
let wal_enabled = DbInner::wal_enabled_in_options(&settings);
let flush_merge_operator = merge_operator.clone().map(|merge_operator| {
instrument_merge_operator(
merge_operator,
db_stats.merge_operator_flush_operands.clone(),
)
});
let reader = Reader::new(
table_store.clone(),
db_stats.clone(),
mono_clock.clone(),
oracle.clone(),
merge_operator.clone(),
);
let recent_flushed_wal_id = state.read().state().core().replay_after_wal_id;
let wal_buffer = Arc::new(WalBufferManager::new(
state.clone(),
status_manager.clone(),
db_stats.clone(),
recent_flushed_wal_id,
oracle.clone(),
table_store.clone(),
mono_clock.clone(),
settings.l0_sst_size_bytes,
settings.flush_interval,
));
let txn_manager = Arc::new(TransactionManager::new(oracle.clone(), rand.clone()));
let snapshot_manager = Arc::new(SnapshotManager::new(oracle.clone(), rand.clone()));
let db_inner = Self {
state,
settings,
memtable_flusher,
oracle,
wal_enabled,
table_store,
wal_buffer,
write_notifier,
db_stats,
mono_clock,
system_clock,
rand,
flush_merge_operator,
recorder,
fp_registry,
reader,
txn_manager,
snapshot_manager,
status_manager,
};
Ok(db_inner)
}
pub(crate) async fn get_with_options<K: AsRef<[u8]>>(
&self,
key: K,
options: &ReadOptions,
) -> Result<Option<Bytes>, SlateDBError> {
self.get_key_value_with_options(key, options)
.await
.map(|kv_opt| kv_opt.map(|kv| kv.value))
}
pub(crate) async fn get_key_value_with_options<K: AsRef<[u8]>>(
&self,
key: K,
options: &ReadOptions,
) -> Result<Option<KeyValue>, SlateDBError> {
self.check_closed()?;
let db_state = self.state.read().view();
self.reader
.get_key_value_with_options(key, options, &db_state, None, None)
.await
}
pub(crate) async fn scan_with_options(
&self,
range: BytesRange,
options: &ScanOptions,
) -> Result<DbIterator, SlateDBError> {
self.check_closed()?;
let db_state = self.state.read().view();
self.reader
.scan_with_options(range, options, &db_state, None, None, None)
.await
}
async fn fence_writers(
&self,
manifest: &mut FenceableManifest,
next_wal_id: u64,
) -> Result<(), SlateDBError> {
let mut empty_wal_id = next_wal_id;
loop {
let empty_wal = WritableKVTable::new();
match self
.flush_imm_table(
&SsTableId::Wal(empty_wal_id),
empty_wal.table().clone(),
false,
)
.await
{
Ok(_) => {
return Ok(());
}
Err(SlateDBError::Fenced) => {
manifest.refresh().await?;
let remote_dirty = manifest.prepare_dirty()?;
let dirty_manifest = {
let mut state = self.state.write();
state.merge_remote_manifest(remote_dirty);
state.state().manifest.clone()
};
self.status_manager.report_manifest(dirty_manifest.into());
empty_wal_id += 1;
}
Err(e) => {
return Err(e);
}
}
}
}
#[allow(unused_variables)]
pub(crate) fn wal_enabled_in_options(settings: &Settings) -> bool {
#[cfg(feature = "wal_disable")]
return settings.wal_enabled;
#[cfg(not(feature = "wal_disable"))]
return true;
}
pub(crate) async fn write_with_options(
&self,
batch: WriteBatch,
options: &WriteOptions,
) -> Result<WriteHandle, SlateDBError> {
self.db_stats.write_batch_count.increment(1);
self.db_stats.write_ops.increment(batch.ops.len() as u64);
self.check_closed()?;
if batch.ops.is_empty() {
return Err(SlateDBError::EmptyBatch);
}
let (tx, rx) = tokio::sync::oneshot::channel();
let batch_msg = WriteBatchMessage {
batch,
options: options.clone(),
done: tx,
};
self.maybe_apply_backpressure().await?;
self.write_notifier.send(batch_msg)?;
let (write_handle, mut durable_watcher) = rx.await??;
if options.await_durable {
durable_watcher.await_value().await?;
}
Ok(write_handle)
}
#[inline]
pub(crate) async fn maybe_apply_backpressure(&self) -> Result<(), SlateDBError> {
loop {
let (wal_size_bytes, imm_memtable_size_bytes) = {
let wal_size_bytes = self.wal_buffer.estimated_bytes()?;
let imm_memtable_size_bytes = {
let guard = self.state.read();
guard
.state()
.imm_memtable
.iter()
.map(|imm| {
let metadata = imm.table().metadata();
self.table_store.estimate_encoded_size_compacted(
metadata.entry_num,
metadata.entries_size_in_bytes,
)
})
.sum::<usize>()
};
(wal_size_bytes, imm_memtable_size_bytes)
};
let total_mem_size_bytes = wal_size_bytes + imm_memtable_size_bytes;
self.db_stats
.total_mem_size_bytes
.set(total_mem_size_bytes as i64);
trace!(
"checking backpressure [total_mem_size_bytes={}, wal_size_bytes={}, imm_memtable_size_bytes={}, max_unflushed_bytes={}]",
format_bytes_si(total_mem_size_bytes as u64),
format_bytes_si(wal_size_bytes as u64),
format_bytes_si(imm_memtable_size_bytes as u64),
format_bytes_si(self.settings.max_unflushed_bytes as u64),
);
if total_mem_size_bytes >= self.settings.max_unflushed_bytes {
self.db_stats.backpressure_count.increment(1);
warn!(
"unflushed memtable size exceeds max_unflushed_bytes. applying backpressure. [total_mem_size_bytes={}, wal_size_bytes={}, imm_memtable_size_bytes={}, max_unflushed_bytes={}]",
format_bytes_si(total_mem_size_bytes as u64),
format_bytes_si(wal_size_bytes as u64),
format_bytes_si(imm_memtable_size_bytes as u64),
format_bytes_si(self.settings.max_unflushed_bytes as u64),
);
let maybe_oldest_unflushed_memtable = {
let guard = self.state.read();
guard.state().imm_memtable.back().cloned()
};
let watcher_for_oldest_unflushed_wal =
self.wal_buffer.watcher_for_oldest_unflushed_wal();
if maybe_oldest_unflushed_memtable.is_none()
&& watcher_for_oldest_unflushed_wal.is_none()
{
continue;
}
let await_memtable_uploaded = async {
if let Some(oldest_unflushed_memtable) = maybe_oldest_unflushed_memtable {
oldest_unflushed_memtable.await_uploaded().await
} else {
std::future::pending().await
}
};
let await_flush_wal = async {
if let Some(mut watcher) = watcher_for_oldest_unflushed_wal {
watcher.await_value().await
} else {
std::future::pending().await
}
};
let timeout_fut = self.system_clock.sleep(Duration::from_secs(30));
tokio::select! {
result = await_memtable_uploaded => result?,
result = await_flush_wal => result?,
_ = timeout_fut => {
warn!("backpressure timeout: waited 30s, no memtable/WAL flushed yet");
}
};
} else {
break;
}
}
Ok(())
}
pub(crate) async fn flush_wals(&self) -> Result<u64, SlateDBError> {
self.wal_buffer.flush().await?;
Ok(self.wal_buffer.recent_flushed_wal_id())
}
async fn flush_imm_memtables(&self, target: FlushTarget) -> Result<FlushResult, SlateDBError> {
self.memtable_flusher().flush(target).await
}
pub(crate) async fn flush_memtables(
&self,
target: FlushTarget,
) -> Result<FlushResult, SlateDBError> {
self.freeze_current_memtable()?;
self.flush_imm_memtables(target).await
}
pub(crate) fn memtable_flusher(&self) -> &MemtableFlusher {
&self.memtable_flusher
}
pub(crate) async fn flush(&self, check_status: bool) -> Result<(), SlateDBError> {
if self.wal_enabled {
self.flush_with_options(
FlushOptions {
flush_type: FlushType::Wal,
},
check_status,
)
.await
} else {
self.flush_with_options(
FlushOptions {
flush_type: FlushType::MemTable,
},
check_status,
)
.await
}
}
pub(crate) async fn flush_with_options(
&self,
options: FlushOptions,
check_status: bool,
) -> Result<(), SlateDBError> {
self.db_stats.flush_requests.increment(1);
if check_status {
self.check_closed()?;
}
match options.flush_type {
FlushType::Wal => {
if self.wal_enabled {
self.flush_wals().await.map(|_| ())
} else {
Err(SlateDBError::WalDisabled)
}
}
FlushType::MemTable => {
if self.wal_enabled {
self.flush_wals().await?;
}
self.flush_memtables(FlushTarget::All).await.map(|_| ())
}
}
}
async fn replay_wal(&self) -> Result<(), SlateDBError> {
let sst_iter_options = SstIteratorOptions {
max_fetch_tasks: 1,
blocks_to_fetch: 256,
cache_blocks: false,
eager_spawn: true,
order: IterationOrder::Ascending,
};
let replay_options = WalReplayOptions {
sst_batch_size: 4,
min_memtable_bytes: self.settings.l0_sst_size_bytes,
max_memtable_bytes: usize::MAX,
sst_iter_options,
min_seq: None,
};
let db_state = self.state.read().state().core().clone();
let mut replay_iter =
WalReplayIterator::new(&db_state, replay_options, Arc::clone(&self.table_store))
.await?;
while let Some(replayed_table) = replay_iter.next().await? {
assert!(self.oracle.last_remote_persisted_seq() <= replayed_table.last_seq);
self.oracle.advance_durable_seq(replayed_table.last_seq);
self.maybe_apply_backpressure().await?;
self.replay_memtable(replayed_table)?;
}
Ok(())
}
async fn preload_cache(
&self,
cached_obj_store: &CachedObjectStore,
path_resolver: &PathResolver,
) -> Result<(), SlateDBError> {
let state = self.state.read().state();
let cache_opts = &self.settings.object_store_cache_options;
crate::utils::preload_cache_from_manifest(
&state.manifest.value.core,
cached_obj_store,
path_resolver,
cache_opts.preload_disk_cache_on_startup,
cache_opts.max_cache_size_bytes.unwrap_or(usize::MAX),
)
.await
}
pub(crate) fn status(&self) -> DbStatus {
self.status_manager.status()
}
pub(crate) fn check_closed(&self) -> Result<(), SlateDBError> {
if let Some(result) = self.status_manager.result_reader().read() {
return match result {
Ok(()) => Err(SlateDBError::Closed),
Err(e) => Err(e),
};
}
Ok(())
}
pub(crate) fn manifest(&self) -> ManifestCore {
self.state.read().state().manifest.value.core.clone()
}
}
#[derive(Clone)]
pub struct Db {
pub(crate) inner: Arc<DbInner>,
task_executor: Arc<MessageHandlerExecutor>,
}
impl Db {
pub async fn open<P: Into<Path>>(
path: P,
object_store: Arc<dyn ObjectStore>,
) -> Result<Self, crate::Error> {
Self::builder(path, object_store).build().await
}
pub fn builder<P: Into<Path>>(path: P, object_store: Arc<dyn ObjectStore>) -> DbBuilder<P> {
DbBuilder::new(path, object_store)
}
pub async fn close(&self) -> Result<(), crate::Error> {
let should_flush = match self.status().close_reason {
Some(CloseReason::Clean) => return Err(SlateDBError::Closed.into()),
Some(_) => false,
None => true,
};
self.inner.status_manager.write_result(Ok(()));
if should_flush {
if let Err(e) = self.inner.flush(false).await {
warn!("failed to flush db during close [error={:?}]", e);
}
}
MemtableFlusher::shutdown(&self.task_executor).await;
if let Err(e) = self.task_executor.shutdown_task(COMPACTOR_TASK_NAME).await {
warn!("failed to shutdown compactor task [error={:?}]", e);
}
if let Err(e) = self.task_executor.shutdown_task(GC_TASK_NAME).await {
warn!("failed to shutdown garbage collector task [error={:?}]", e);
}
if let Err(e) = self
.task_executor
.shutdown_task(WRITE_BATCH_TASK_NAME)
.await
{
warn!("failed to shutdown writer task [error={:?}]", e);
}
if let Err(e) = self.task_executor.shutdown_task(WAL_BUFFER_TASK_NAME).await {
warn!("failed to shutdown wal writer task [error={:?}]", e);
}
info!("db closed");
Ok(())
}
pub async fn snapshot(&self) -> Result<Arc<DbSnapshot>, crate::Error> {
self.inner.check_closed()?;
let snapshot = DbSnapshot::new(self.inner.clone(), None);
Ok(snapshot)
}
pub async fn get<K: AsRef<[u8]> + Send>(&self, key: K) -> Result<Option<Bytes>, crate::Error> {
self.get_with_options(key, &ReadOptions::default()).await
}
pub async fn get_with_options<K: AsRef<[u8]> + Send>(
&self,
key: K,
options: &ReadOptions,
) -> Result<Option<Bytes>, crate::Error> {
self.inner
.get_with_options(key, options)
.await
.map_err(Into::into)
}
pub async fn get_key_value<K: AsRef<[u8]> + Send>(
&self,
key: K,
) -> Result<Option<KeyValue>, crate::Error> {
self.get_key_value_with_options(key, &ReadOptions::default())
.await
}
pub async fn get_key_value_with_options<K: AsRef<[u8]> + Send>(
&self,
key: K,
options: &ReadOptions,
) -> Result<Option<KeyValue>, crate::Error> {
let kv = self
.inner
.get_key_value_with_options(key, options)
.await
.map_err(crate::Error::from)?;
Ok(kv)
}
pub async fn scan<K, T>(&self, range: T) -> Result<DbIterator, crate::Error>
where
K: AsRef<[u8]> + Send,
T: RangeBounds<K> + Send,
{
self.scan_with_options(range, &ScanOptions::default()).await
}
pub async fn scan_with_options<K, T>(
&self,
range: T,
options: &ScanOptions,
) -> Result<DbIterator, crate::Error>
where
K: AsRef<[u8]> + Send,
T: RangeBounds<K> + Send,
{
let start = range
.start_bound()
.map(|b| Bytes::copy_from_slice(b.as_ref()));
let end = range
.end_bound()
.map(|b| Bytes::copy_from_slice(b.as_ref()));
let range = (start, end);
self.inner
.scan_with_options(BytesRange::from(range), options)
.await
.map_err(Into::into)
}
pub async fn scan_prefix<P>(&self, prefix: P) -> Result<DbIterator, crate::Error>
where
P: AsRef<[u8]> + Send,
{
self.scan_prefix_with_options(prefix, &ScanOptions::default())
.await
}
pub async fn scan_prefix_with_options<P>(
&self,
prefix: P,
options: &ScanOptions,
) -> Result<DbIterator, crate::Error>
where
P: AsRef<[u8]> + Send,
{
let range = BytesRange::from_prefix(prefix.as_ref());
self.inner
.scan_with_options(range, options)
.await
.map_err(Into::into)
}
pub async fn put<K, V>(&self, key: K, value: V) -> Result<WriteHandle, crate::Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let mut batch = WriteBatch::new();
batch.put(key, value);
self.write(batch).await
}
pub async fn put_with_options<K, V>(
&self,
key: K,
value: V,
put_opts: &PutOptions,
write_opts: &WriteOptions,
) -> Result<WriteHandle, crate::Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let mut batch = WriteBatch::new();
batch.put_with_options(key, value, put_opts);
self.write_with_options(batch, write_opts).await
}
pub async fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<WriteHandle, crate::Error> {
let mut batch = WriteBatch::new();
batch.delete(key.as_ref());
self.write(batch).await
}
pub async fn delete_with_options<K: AsRef<[u8]>>(
&self,
key: K,
options: &WriteOptions,
) -> Result<WriteHandle, crate::Error> {
let mut batch = WriteBatch::new();
batch.delete(key);
self.write_with_options(batch, options).await
}
pub async fn merge<K, V>(&self, key: K, value: V) -> Result<WriteHandle, crate::Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let mut batch = WriteBatch::new();
batch.merge(key, value);
self.write(batch).await
}
pub async fn merge_with_options<K, V>(
&self,
key: K,
value: V,
merge_opts: &MergeOptions,
write_opts: &WriteOptions,
) -> Result<WriteHandle, crate::Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let mut batch = WriteBatch::new();
batch.merge_with_options(key, value, merge_opts);
self.write_with_options(batch, write_opts).await
}
pub async fn write(&self, batch: WriteBatch) -> Result<WriteHandle, crate::Error> {
self.write_with_options(batch, &WriteOptions::default())
.await
}
pub async fn write_with_options(
&self,
batch: WriteBatch,
options: &WriteOptions,
) -> Result<WriteHandle, crate::Error> {
self.inner
.write_with_options(batch, options)
.await
.map_err(Into::into)
}
pub async fn flush(&self) -> Result<(), crate::Error> {
self.inner.flush(true).await.map_err(Into::into)
}
pub async fn flush_with_options(&self, options: FlushOptions) -> Result<(), crate::Error> {
self.inner
.flush_with_options(options, true)
.await
.map_err(Into::into)
}
pub fn manifest(&self) -> ManifestCore {
self.inner.manifest()
}
pub fn subscribe(&self) -> tokio::sync::watch::Receiver<DbStatus> {
self.inner.status_manager.subscribe()
}
pub async fn begin(
&self,
isolation_level: IsolationLevel,
) -> Result<DbTransaction, crate::Error> {
self.inner.check_closed()?;
let txn = DbTransaction::new(
self.inner.clone(),
self.inner.txn_manager.clone(),
isolation_level,
);
Ok(txn)
}
pub fn resolve_object_store(url: &str) -> Result<Arc<dyn ObjectStore>, crate::Error> {
let url = url
.try_into()
.map_err(|e| SlateDBError::InvalidObjectStoreURL(url.to_string(), e))?;
let env_vars = std::env::vars().map(|(key, value)| (key.to_ascii_lowercase(), value));
let (object_store, path) = parse_url_opts(&url, env_vars).map_err(SlateDBError::from)?;
let object_store: Arc<dyn ObjectStore> = if path.as_ref().is_empty() {
Arc::from(object_store)
} else {
let object_store = Arc::from(object_store);
Arc::new(PrefixStore::new(object_store, path))
};
Ok(object_store)
}
pub fn status(&self) -> DbStatus {
self.inner.status()
}
}
#[async_trait::async_trait]
impl DbRead for Db {
async fn get_with_options<K: AsRef<[u8]> + Send>(
&self,
key: K,
options: &ReadOptions,
) -> Result<Option<Bytes>, crate::Error> {
self.get_with_options(key, options).await
}
async fn get_key_value_with_options<K: AsRef<[u8]> + Send>(
&self,
key: K,
options: &ReadOptions,
) -> Result<Option<KeyValue>, crate::Error> {
self.get_key_value_with_options(key, options).await
}
async fn scan_with_options<K, T>(
&self,
range: T,
options: &ScanOptions,
) -> Result<DbIterator, crate::Error>
where
K: AsRef<[u8]> + Send,
T: RangeBounds<K> + Send,
{
self.scan_with_options(range, options).await
}
}
#[derive(Debug, Clone)]
pub struct WriteHandle {
pub(crate) seq: u64,
pub(crate) create_ts: i64,
}
impl WriteHandle {
pub(crate) fn new(seq: u64, create_ts: i64) -> Self {
Self { seq, create_ts }
}
pub fn seqnum(&self) -> u64 {
self.seq
}
pub fn create_ts(&self) -> i64 {
self.create_ts
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cached_object_store::stats::{PART_ACCESS_COUNT, PART_HIT_COUNT};
use crate::cached_object_store::{CachedObjectStore, FsCacheStorage};
use crate::cached_object_store_stats::CachedObjectStoreStats;
use crate::config::DurabilityLevel::{Memory, Remote};
use crate::config::{
CheckpointOptions, CompactorOptions, GarbageCollectorDirectoryOptions,
GarbageCollectorOptions, ObjectStoreCacheOptions, PutOptions, Settings, Ttl, WriteOptions,
};
use crate::db::builder::GarbageCollectorBuilder;
use crate::db_common::MAX_WAL_FLUSHES_BEFORE_L0_FLUSH;
use crate::db_state::ManifestCore;
use crate::db_stats::IMMUTABLE_MEMTABLE_FLUSHES;
use crate::format::sst::SsTableFormat;
use crate::instrumented_object_store::stats::{
REQUEST_COUNT as OBJECT_STORE_REQUEST_COUNT,
REQUEST_DURATION_SECONDS as OBJECT_STORE_REQUEST_DURATION_SECONDS,
};
use crate::iter::RowEntryIterator;
use crate::manifest::store::{ManifestStore, StoredManifest};
use crate::merge_operator::{
MERGE_OPERATOR_COMPACT_PATH, MERGE_OPERATOR_FLUSH_PATH, MERGE_OPERATOR_READ_PATH,
};
use crate::object_stores::ObjectStores;
use crate::proptest_util::arbitrary;
use crate::proptest_util::sample;
use crate::rand::DbRand;
use crate::seq_tracker::FindOption;
use crate::sst_iter::{SstIterator, SstIteratorOptions};
use crate::test_utils::{
assert_iterator, lookup_merge_operator_operands, OnDemandCompactionSchedulerSupplier,
StringConcatMergeOperator,
};
use crate::types::RowEntry;
use crate::wal_reader::WalReader;
use crate::{proptest_util, test_utils, CloseReason, CompactorBuilder, KeyValue};
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use fail_parallel::FailPointRegistry;
use futures::{future, future::join_all, StreamExt};
use object_store::memory::InMemory;
use object_store::ObjectStore;
use proptest::test_runner::{TestRng, TestRunner};
use slatedb_common::clock::DefaultSystemClock;
use slatedb_common::clock::MockSystemClock;
use slatedb_common::metrics::{
lookup_metric, lookup_metric_with_labels, DefaultMetricsRecorder, MetricValue,
MetricsRecorderHelper,
};
use std::collections::BTreeMap;
use std::collections::Bound::Included;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::runtime::Runtime;
use tracing::info;
fn object_store_labels(
component: &'static str,
store_type: &'static str,
op: &'static str,
api: &'static str,
) -> [(&'static str, &'static str); 4] {
[
("component", component),
("store_type", store_type),
("op", op),
("api", api),
]
}
fn lookup_object_store_histogram_count(
recorder: &DefaultMetricsRecorder,
labels: &[(&str, &str)],
) -> Option<u64> {
recorder
.snapshot()
.by_name_and_labels(OBJECT_STORE_REQUEST_DURATION_SECONDS, labels)
.map(|metric| match &metric.value {
MetricValue::Histogram { count, .. } => *count,
other => panic!("expected histogram metric, got {other:?}"),
})
}
fn lookup_object_store_op_request_count(
recorder: &DefaultMetricsRecorder,
component: &'static str,
store_type: &'static str,
op: &'static str,
) -> i64 {
let apis = match op {
"get" => &["get", "get_range", "get_ranges", "head"][..],
"put" => &[
"put",
"multipart_init",
"multipart_part",
"multipart_complete",
][..],
"delete" => &["delete"][..],
_ => panic!("unexpected op {op}"),
};
apis.iter()
.map(|api| {
lookup_metric_with_labels(
recorder,
OBJECT_STORE_REQUEST_COUNT,
&object_store_labels(component, store_type, op, api),
)
.unwrap_or(0)
})
.sum()
}
fn lookup_object_store_op_histogram_count(
recorder: &DefaultMetricsRecorder,
component: &'static str,
store_type: &'static str,
op: &'static str,
) -> u64 {
let apis = match op {
"get" => &["get", "get_range", "get_ranges", "head"][..],
"put" => &[
"put",
"multipart_init",
"multipart_part",
"multipart_complete",
][..],
"delete" => &["delete"][..],
_ => panic!("unexpected op {op}"),
};
apis.iter()
.map(|api| {
lookup_object_store_histogram_count(
recorder,
&object_store_labels(component, store_type, op, api),
)
.unwrap_or(0)
})
.sum()
}
#[tokio::test]
async fn test_put_get_delete() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let key = b"test_key";
let value = b"test_value";
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
kv_store.delete(key).await.unwrap();
assert_eq!(None, kv_store.get(key).await.unwrap());
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_manifest_returns_current_manifest_core() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_manifest_accessor", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
db.put(b"test_key", b"test_value").await.unwrap();
let manifest = db.manifest();
let expected = db.inner.state.read().state().core().clone();
assert_eq!(manifest, expected);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_coarse_size_estimation_via_manifest() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_coarse_size_estimation";
let should_compact = Arc::new(AtomicBool::new(false));
let should_compact_clone = should_compact.clone();
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
move |_state| should_compact_clone.swap(false, Ordering::SeqCst),
)));
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_compactor_builder(
CompactorBuilder::new(path, object_store.clone())
.with_scheduler_supplier(compaction_scheduler),
)
.build()
.await
.unwrap();
let db = Arc::new(db);
for i in 0..100u32 {
let key = format!("k{:04}", i);
db.put(key.as_bytes(), &[0u8; 64]).await.unwrap();
}
db.flush().await.unwrap();
let manifest = db.manifest();
assert!(!manifest.l0.is_empty());
for view in &manifest.l0 {
assert!(view.estimate_size() > 0);
}
should_compact.store(true, Ordering::SeqCst);
let db_poll = db.clone();
tokio::time::timeout(Duration::from_secs(10), async move {
loop {
{
let state = db_poll.inner.state.read();
if !state.state().core().compacted.is_empty() {
return;
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
let manifest = db.manifest();
assert!(!manifest.compacted.is_empty());
for sr in &manifest.compacted {
let covering = sr
.tables_covering_range(Bytes::from_static(b"k0000")..Bytes::from_static(b"k0100"));
assert!(!covering.is_empty());
for view in &covering {
assert!(view.estimate_size() > 0);
}
let outside = sr
.tables_covering_range(Bytes::from_static(b"a0000")..Bytes::from_static(b"a9999"));
assert!(outside.is_empty());
}
db.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_prefix_returns_matching_keys() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_scan_prefix", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
kv_store.put(b"ab", b"v0").await.unwrap();
kv_store.put(b"aba", b"v1").await.unwrap();
kv_store.put(b"abb", b"v2").await.unwrap();
kv_store.put(b"ac", b"v3").await.unwrap();
let mut iter = kv_store.scan_prefix(b"ab").await.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"ab");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"aba");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"abb");
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_descending_returns_records_in_reverse_order() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_scan_descending", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
kv_store.put(b"a", b"v0").await.unwrap();
kv_store.put(b"b", b"v1").await.unwrap();
kv_store.flush().await.unwrap();
kv_store.put(b"c", b"v2").await.unwrap();
kv_store.put(b"d", b"v3").await.unwrap();
let scan_options = ScanOptions::default().with_order(IterationOrder::Descending);
let mut iter = kv_store
.scan_with_options::<Vec<u8>, _>(.., &scan_options)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"d");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"c");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"b");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"a");
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_descending_bounded_range() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_scan_descending_bounded", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
kv_store.put(b"a", b"v0").await.unwrap();
kv_store.put(b"b", b"v1").await.unwrap();
kv_store.put(b"c", b"v2").await.unwrap();
kv_store.put(b"d", b"v3").await.unwrap();
kv_store.put(b"e", b"v4").await.unwrap();
let scan_options = ScanOptions::default().with_order(IterationOrder::Descending);
let mut iter = kv_store
.scan_with_options::<Vec<u8>, _>(b"b".to_vec()..b"d".to_vec(), &scan_options)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"c");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"b");
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_descending_skips_deleted_keys() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_scan_descending_deletes", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
kv_store.put(b"a", b"v0").await.unwrap();
kv_store.put(b"b", b"v1").await.unwrap();
kv_store.put(b"c", b"v2").await.unwrap();
kv_store.put(b"d", b"v3").await.unwrap();
kv_store.delete(b"b").await.unwrap();
kv_store.delete(b"d").await.unwrap();
let scan_options = ScanOptions::default().with_order(IterationOrder::Descending);
let mut iter = kv_store
.scan_with_options::<Vec<u8>, _>(.., &scan_options)
.await
.unwrap();
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"c");
assert_eq!(iter.next().await.unwrap().unwrap().key.as_ref(), b"a");
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_prefix_descending() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_scan_prefix_descending", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
kv_store.put(b"prefix/a", b"v0").await.unwrap();
kv_store.put(b"prefix/b", b"v1").await.unwrap();
kv_store.put(b"prefix/c", b"v2").await.unwrap();
kv_store.put(b"other/a", b"v3").await.unwrap();
let scan_options = ScanOptions::default().with_order(IterationOrder::Descending);
let mut iter = kv_store
.scan_prefix_with_options(b"prefix/", &scan_options)
.await
.unwrap();
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
b"prefix/c"
);
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
b"prefix/b"
);
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
b"prefix/a"
);
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_prefix_with_options_handles_unbounded_end() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_scan_prefix_unbounded", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
kv_store.put(&[0xff, 0xff], b"v0").await.unwrap();
kv_store.put(&[0xff, 0xff, 0x00], b"v1").await.unwrap();
kv_store.put(&[0xff, 0xff, 0x10], b"v2").await.unwrap();
kv_store.put(&[0xff, 0xff, 0xff], b"v4").await.unwrap();
kv_store.put(&[0xff, 0xfe], b"v3").await.unwrap();
let scan_options = ScanOptions {
cache_blocks: false,
..ScanOptions::default()
};
let mut iter = kv_store
.scan_prefix_with_options(&[0xff, 0xff], &scan_options)
.await
.unwrap();
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
&[0xff, 0xff]
);
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
&[0xff, 0xff, 0x00]
);
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
&[0xff, 0xff, 0x10]
);
assert_eq!(
iter.next().await.unwrap().unwrap().key.as_ref(),
&[0xff, 0xff, 0xff]
);
assert_eq!(iter.next().await.unwrap(), None);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_resolve_object_store_local_prefix_store_writes_to_path() {
let temp_dir = tempfile::tempdir().unwrap();
let prefix_path = temp_dir.path().join("prefix-store");
let url = format!("file://{}", prefix_path.display());
let object_store = Db::resolve_object_store(&url).unwrap();
let location = object_store::path::Path::from("nested/file.txt");
let payload = Bytes::from_static(b"local prefix payload");
object_store
.put(&location, payload.clone().into())
.await
.unwrap();
let expected_path = prefix_path.join("nested").join("file.txt");
let stored = tokio::fs::read(&expected_path).await.unwrap();
assert_eq!(stored, payload.to_vec());
}
#[test]
fn test_get_after_put() {
let mut runner = new_proptest_runner(None);
let runtime = Runtime::new().unwrap();
let table = sample::table(runner.rng(), 1000, 10);
let db_options = test_db_options(0, 1024, None);
let db = runtime.block_on(build_database_from_table(&table, db_options, true));
runner
.run(
&(arbitrary::bytes(100), arbitrary::bytes(100)),
|(key, value)| {
runtime.block_on(async {
if !key.is_empty() {
db.put_with_options(
&key,
&value,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(
Some(value),
db.get_with_options(
&key,
&ReadOptions {
durability_filter: Memory,
dirty: false,
cache_blocks: true,
}
)
.await
.unwrap()
);
}
});
Ok(())
},
)
.unwrap();
}
#[tokio::test]
async fn test_no_flush_interval() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db_options_no_flush_interval = {
let mut db_options = test_db_options(0, 1024, None);
db_options.flush_interval = None;
db_options
};
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(db_options_no_flush_interval)
.build()
.await
.unwrap();
let key = b"test_key";
let value = b"test_value";
kv_store
.put_with_options(
key,
value,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_ne!(kv_store.inner.wal_buffer.estimated_bytes().unwrap(), 0);
kv_store.flush().await.unwrap();
assert_eq!(kv_store.inner.wal_buffer.estimated_bytes().unwrap(), 0);
}
#[tokio::test]
async fn test_close_triggers_flush_when_open() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db_options_no_flush_interval = {
let mut db_options = test_db_options(0, 1024, None);
db_options.flush_interval = None;
db_options
};
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let kv_store = Db::builder("/tmp/test_close_triggers_flush", object_store)
.with_settings(db_options_no_flush_interval)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
kv_store
.put_with_options(
b"test_key",
b"test_value",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(kv_store.inner.wal_buffer.buffered_wal_entries_count(), 1);
assert_eq!(
lookup_metric(&metrics_recorder, crate::db_stats::WAL_BUFFER_FLUSHES).unwrap(),
0
);
kv_store.close().await.unwrap();
assert_eq!(kv_store.inner.wal_buffer.buffered_wal_entries_count(), 0);
assert_eq!(
lookup_metric(&metrics_recorder, crate::db_stats::WAL_BUFFER_FLUSHES).unwrap(),
1
);
}
#[tokio::test]
async fn test_close_twice_returns_closed_clean() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_close_twice_returns_closed_clean", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
db.close().await.unwrap();
let err = db.close().await.unwrap_err();
assert_eq!(err.kind(), crate::ErrorKind::Closed(CloseReason::Clean));
}
#[tokio::test]
async fn test_close_failed_state_does_not_flush() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db_options_no_flush_interval = {
let mut db_options = test_db_options(0, 1024, None);
db_options.flush_interval = None;
db_options
};
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = Db::builder("/tmp/test_close_failed_state_no_flush", object_store)
.with_settings(db_options_no_flush_interval)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
db.put_with_options(
b"test_key",
b"test_value",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(db.inner.wal_buffer.buffered_wal_entries_count(), 1);
assert_eq!(
lookup_metric(&metrics_recorder, crate::db_stats::WAL_BUFFER_FLUSHES).unwrap(),
0
);
db.inner
.status_manager
.write_result(Err(crate::error::SlateDBError::Fenced));
db.close().await.unwrap();
assert_eq!(db.inner.wal_buffer.buffered_wal_entries_count(), 1);
assert_eq!(
lookup_metric(&metrics_recorder, crate::db_stats::WAL_BUFFER_FLUSHES).unwrap(),
0
);
let status = db.status();
assert_eq!(status.close_reason, Some(CloseReason::Fenced));
}
#[tokio::test]
async fn test_clean_close_flushes_pending_wal() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db_options_no_flush_interval = {
let mut db_options = test_db_options(0, 1024, None);
db_options.flush_interval = None;
db_options
};
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = Db::builder("/tmp/test_clean_close_flushes_pending_wal", object_store)
.with_settings(db_options_no_flush_interval)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
db.put_with_options(
b"test_key",
b"test_value",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(db.inner.wal_buffer.buffered_wal_entries_count(), 1);
assert_eq!(
lookup_metric(&metrics_recorder, crate::db_stats::WAL_BUFFER_FLUSHES).unwrap(),
0
);
db.close().await.unwrap();
assert_eq!(db.inner.wal_buffer.buffered_wal_entries_count(), 0);
assert_eq!(
lookup_metric(&metrics_recorder, crate::db_stats::WAL_BUFFER_FLUSHES).unwrap(),
1
);
let status = db.status();
assert_eq!(status.close_reason, Some(CloseReason::Clean));
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_get_with_durability_level_when_wal_disabled() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut options = test_db_options(0, 1024 * 1024, None);
options.wal_enabled = false;
let db = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(options)
.build()
.await
.unwrap();
let put_options = PutOptions::default();
let write_options = WriteOptions {
await_durable: false,
};
let get_memory_options = ReadOptions::new().with_durability_filter(Memory);
let get_remote_options = ReadOptions::new().with_durability_filter(Remote);
db.put_with_options(b"foo", b"bar", &put_options, &write_options)
.await
.unwrap();
let val_bytes = Bytes::copy_from_slice(b"bar");
assert_eq!(
None,
db.get_with_options(b"foo", &get_remote_options)
.await
.unwrap()
);
assert_eq!(
Some(val_bytes.clone()),
db.get_with_options(b"foo", &get_memory_options)
.await
.unwrap()
);
db.flush().await.unwrap();
assert_eq!(
Some(val_bytes.clone()),
db.get_with_options(b"foo", &get_remote_options)
.await
.unwrap()
);
assert_eq!(
Some(val_bytes.clone()),
db.get_with_options(b"foo", &get_memory_options)
.await
.unwrap()
);
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_find_with_multiple_repeated_keys() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut options = test_db_options(0, 1024 * 1024, None);
options.wal_enabled = false;
let db = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(options)
.build()
.await
.unwrap();
let mut last_val: String = "foo".to_string();
for x in 0..4096 {
let val = format!("val{}", x);
db.put_with_options(
b"key",
val.as_bytes(),
&PutOptions {
ttl: Default::default(),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
last_val = val;
if db
.inner
.state
.write()
.memtable()
.metadata()
.entries_size_in_bytes
> (SsTableFormat::default().block_size * 3)
{
break;
}
}
assert_eq!(
Some(Bytes::copy_from_slice(last_val.as_bytes())),
db.get_with_options(b"key", &ReadOptions::new().with_durability_filter(Memory))
.await
.unwrap()
);
db.flush().await.unwrap();
let state = db.inner.state.read().view();
assert_eq!(1, state.state.manifest.value.core.l0.len());
let view = state.state.manifest.value.core.l0.front().unwrap();
let index = db
.inner
.table_store
.read_index(&view.sst, true)
.await
.unwrap();
assert!(!index.borrow().block_meta().is_empty());
assert_eq!(
Some(Bytes::copy_from_slice(last_val.as_bytes())),
db.get(b"key").await.unwrap()
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_get_with_object_store_cache_metrics() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut opts = test_db_options(0, 1024, None);
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_")
.tempdir()
.unwrap();
opts.object_store_cache_options.root_folder = Some(temp_dir.keep());
opts.object_store_cache_options.part_size_bytes = 1024;
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let kv_store = Db::builder(
"/tmp/test_kv_store_with_cache_metrics",
object_store.clone(),
)
.with_settings(opts)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
let access_count0 = lookup_metric(&metrics_recorder, PART_ACCESS_COUNT).unwrap();
let key = b"test_key";
let value = b"test_value";
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
let got = kv_store.get(key).await.unwrap();
let access_count1 = lookup_metric(&metrics_recorder, PART_ACCESS_COUNT).unwrap();
assert_eq!(got, Some(Bytes::from_static(value)));
assert!(access_count1 > 0);
assert!(access_count1 >= access_count0);
assert!(lookup_metric(&metrics_recorder, PART_HIT_COUNT).unwrap() >= 1);
}
#[tokio::test]
async fn test_db_records_remote_object_store_reads_but_not_cache_hits() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut opts = test_db_options(0, 1024, None);
let temp_dir = tempfile::Builder::new()
.prefix("objstore_metrics_test_")
.tempdir()
.unwrap();
opts.object_store_cache_options.root_folder = Some(temp_dir.keep());
opts.object_store_cache_options.part_size_bytes = 1024;
let path = "/tmp/test_db_records_remote_object_store_reads_but_not_cache_hits";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(opts)
.build()
.await
.unwrap();
kv_store.put(b"test_key", b"test_value").await.unwrap();
kv_store.flush().await.unwrap();
kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
kv_store.close().await.unwrap();
let mut reopen_opts = test_db_options(0, 1024, None);
let reopen_temp_dir = tempfile::Builder::new()
.prefix("objstore_metrics_reopen_")
.tempdir()
.unwrap();
reopen_opts.object_store_cache_options.root_folder = Some(reopen_temp_dir.keep());
reopen_opts.object_store_cache_options.part_size_bytes = 1024;
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let reopened = Db::builder(path, object_store)
.with_settings(reopen_opts)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
let requests_before =
lookup_object_store_op_request_count(&metrics_recorder, "db", "main", "get");
let _got = reopened.get(b"test_key").await.unwrap();
let requests_after_first =
lookup_object_store_op_request_count(&metrics_recorder, "db", "main", "get");
let got = reopened.get(b"test_key").await.unwrap();
let requests_after_second =
lookup_object_store_op_request_count(&metrics_recorder, "db", "main", "get");
assert!(requests_after_first > requests_before);
assert_eq!(got, Some(Bytes::from_static(b"test_value")));
assert_eq!(requests_after_second, requests_after_first);
assert_eq!(
lookup_object_store_op_histogram_count(&metrics_recorder, "db", "main", "get"),
requests_after_first as u64
);
reopened.close().await.unwrap();
}
#[tokio::test]
async fn test_db_records_main_and_wal_object_store_requests_separately() {
let main_object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let wal_object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let path = "/tmp/test_db_records_main_and_wal_object_store_requests_separately";
let db = Db::builder(path, main_object_store)
.with_settings(test_db_options(0, 1024, None))
.with_wal_object_store(wal_object_store)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
let wal_before =
lookup_object_store_op_request_count(&metrics_recorder, "db", "wal", "put");
let main_before =
lookup_object_store_op_request_count(&metrics_recorder, "db", "main", "put");
let wal_hist_before =
lookup_object_store_op_histogram_count(&metrics_recorder, "db", "wal", "put");
let main_hist_before =
lookup_object_store_op_histogram_count(&metrics_recorder, "db", "main", "put");
db.put(b"key", b"value").await.unwrap();
db.flush().await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let wal_after = lookup_object_store_op_request_count(&metrics_recorder, "db", "wal", "put");
let main_after =
lookup_object_store_op_request_count(&metrics_recorder, "db", "main", "put");
let wal_hist_after =
lookup_object_store_op_histogram_count(&metrics_recorder, "db", "wal", "put");
let main_hist_after =
lookup_object_store_op_histogram_count(&metrics_recorder, "db", "main", "put");
assert!(wal_after > wal_before);
assert!(main_after > main_before);
assert!(wal_hist_after > wal_hist_before);
assert!(main_hist_after > main_hist_before);
db.close().await.unwrap();
}
async fn test_object_store_cache_helper(
cache_puts_enabled: bool,
db_path: &str,
expected_cache_parts: Vec<(&str, usize)>,
) -> (Arc<CachedObjectStore>, Db) {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut opts = test_db_options(0, 1024, None);
opts.manifest_poll_interval = Duration::from_millis(u64::MAX);
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_")
.tempdir()
.unwrap();
let recorder = MetricsRecorderHelper::noop();
let cache_stats = Arc::new(CachedObjectStoreStats::new(&recorder));
let part_size = 1024;
info!("temp_dir: {:?}", temp_dir.path());
let cache_storage = Arc::new(FsCacheStorage::new(
temp_dir.keep(),
None,
None,
cache_stats.clone(),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
));
let cached_object_store = CachedObjectStore::new(
object_store.clone(),
cache_storage,
part_size,
cache_puts_enabled,
cache_stats.clone(),
)
.unwrap();
let kv_store = Db::builder(db_path, cached_object_store.clone())
.with_settings(opts)
.build()
.await
.unwrap();
cached_object_store
.head(&object_store::path::Path::from(format!(
"{}/wal/00000000000000000001.sst",
db_path
)))
.await
.unwrap();
let key = b"test_key";
let value = b"test_value";
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
for (path, expected_parts) in expected_cache_parts {
let entry = cached_object_store
.cache_storage
.entry(&object_store::path::Path::from(path), part_size);
assert_eq!(
entry.cached_parts().await.unwrap().len(),
expected_parts,
"Path: {}",
path
);
}
(cached_object_store, kv_store)
}
#[tokio::test]
async fn test_get_with_object_store_cache_stored_files() {
let expected_cache_parts =
vec![
("tmp/test_kv_store_with_cache_stored_files/manifest/00000000000000000001.manifest", 0),
("tmp/test_kv_store_with_cache_stored_files/manifest/00000000000000000002.manifest", 0),
("tmp/test_kv_store_with_cache_stored_files/wal/00000000000000000001.sst", 1),
("tmp/test_kv_store_with_cache_stored_files/wal/00000000000000000002.sst", 0),
];
let (_cached_object_store, kv_store) = test_object_store_cache_helper(
false, "/tmp/test_kv_store_with_cache_stored_files",
expected_cache_parts,
)
.await;
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_get_with_object_store_cache_put_caching_enabled() {
let expected_cache_parts =
vec![
("tmp/test_kv_store_with_put_cache_enabled/manifest/00000000000000000001.manifest", 0),
("tmp/test_kv_store_with_put_cache_enabled/manifest/00000000000000000002.manifest", 0),
("tmp/test_kv_store_with_put_cache_enabled/wal/00000000000000000001.sst", 1),
("tmp/test_kv_store_with_put_cache_enabled/wal/00000000000000000002.sst", 1),
];
let (_cached_object_store, kv_store) = test_object_store_cache_helper(
true, "/tmp/test_kv_store_with_put_cache_enabled",
expected_cache_parts,
)
.await;
kv_store.close().await.unwrap();
}
async fn build_database_from_table(
table: &BTreeMap<Bytes, Bytes>,
db_options: Settings,
await_durable: bool,
) -> Db {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(db_options)
.build()
.await
.unwrap();
test_utils::seed_database(&db, table, false).await.unwrap();
if await_durable {
db.flush().await.unwrap();
}
db
}
#[tokio::test]
async fn test_should_allow_iterating_behind_box_dyn() {
#[async_trait]
trait IteratorSupplier {
async fn iterator(&self) -> Box<dyn IteratorTrait>;
}
struct DbHolder {
db: Db,
}
#[async_trait]
impl IteratorSupplier for DbHolder {
async fn iterator(&self) -> Box<dyn IteratorTrait> {
let range = BytesRange::new_empty();
let iter = self
.db
.inner
.scan_with_options(range, &ScanOptions::default())
.await
.unwrap();
Box::new(iter)
}
}
#[async_trait]
trait IteratorTrait {
async fn next(&mut self) -> Result<Option<KeyValue>, crate::Error>;
}
#[async_trait]
impl IteratorTrait for DbIterator {
async fn next(&mut self) -> Result<Option<KeyValue>, crate::Error> {
DbIterator::next(self).await
}
}
let db_options = test_db_options(0, 1024, None);
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(db_options)
.build()
.await
.unwrap();
let db_holder = DbHolder { db };
let mut boxed = db_holder.iterator().await;
let next = boxed.next().await;
assert_eq!(next.unwrap(), None);
}
async fn assert_records_in_range(
table: &BTreeMap<Bytes, Bytes>,
db: &Db,
scan_options: &ScanOptions,
range: BytesRange,
) {
let mut iter = db
.inner
.scan_with_options(range.clone(), scan_options)
.await
.unwrap();
test_utils::assert_ranged_db_scan(table, range, &mut iter).await;
}
#[test]
fn test_scan_returns_records_in_range() {
let mut runner = new_proptest_runner(None);
let table = sample::table(runner.rng(), 1000, 5);
let runtime = Runtime::new().unwrap();
let db_options = test_db_options(0, 1024, None);
let db = runtime.block_on(build_database_from_table(&table, db_options, true));
runner
.run(&arbitrary::nonempty_range(10), |range| {
runtime.block_on(assert_records_in_range(
&table,
&db,
&ScanOptions::default(),
range,
));
Ok(())
})
.unwrap();
}
fn new_proptest_runner(rng_seed: Option<[u8; 32]>) -> TestRunner {
proptest_util::runner::new(file!(), rng_seed)
}
#[test]
fn test_scan_returns_uncommitted_records_if_read_level_uncommitted() {
let mut runner = new_proptest_runner(None);
let table = sample::table(runner.rng(), 1000, 5);
let runtime = Runtime::new().unwrap();
let mut db_options = test_db_options(0, 1024, None);
db_options.flush_interval = Some(Duration::from_secs(5));
let db = runtime.block_on(build_database_from_table(&table, db_options, false));
runner
.run(&arbitrary::nonempty_range(10), |range| {
let scan_options = ScanOptions {
durability_filter: Memory,
..ScanOptions::default()
};
runtime.block_on(assert_records_in_range(&table, &db, &scan_options, range));
Ok(())
})
.unwrap();
}
#[test]
fn test_seek_outside_of_range_returns_invalid_argument() {
let mut runner = new_proptest_runner(None);
let table = sample::table(runner.rng(), 1000, 10);
let runtime = Runtime::new().unwrap();
let db_options = test_db_options(0, 1024, None);
let db = runtime.block_on(build_database_from_table(&table, db_options, true));
runner
.run(
&(arbitrary::nonempty_bytes(10), arbitrary::rng()),
|(arbitrary_key, mut rng)| {
runtime.block_on(assert_out_of_bound_seek_returns_invalid_argument(
&db,
&mut rng,
arbitrary_key,
));
Ok(())
},
)
.unwrap();
async fn assert_out_of_bound_seek_returns_invalid_argument(
db: &Db,
rng: &mut TestRng,
arbitrary_key: Bytes,
) {
let mut iter = db
.scan_with_options(..arbitrary_key.clone(), &ScanOptions::default())
.await
.unwrap();
let lower_bounded_range = BytesRange::from(arbitrary_key.clone()..);
let value = sample::bytes_in_range(rng, &lower_bounded_range);
let err = iter.seek(value.clone()).await.unwrap_err();
assert!(
err.to_string()
.contains("cannot seek to a key outside the iterator range"),
"{}",
err
);
let mut iter = db
.scan_with_options(arbitrary_key.clone().., &ScanOptions::default())
.await
.unwrap();
let upper_bounded_range = BytesRange::from(..arbitrary_key.clone());
let value = sample::bytes_in_range(rng, &upper_bounded_range);
let err = iter.seek(value.clone()).await.unwrap_err();
assert!(
err.to_string()
.contains("cannot seek to a key outside the iterator range"),
"{}",
err
);
}
}
#[test]
fn test_seek_fast_forwards_iterator() {
let mut runner = new_proptest_runner(None);
let table = sample::table(runner.rng(), 1000, 10);
let runtime = Runtime::new().unwrap();
let db_options = test_db_options(0, 1024, None);
let db = runtime.block_on(build_database_from_table(&table, db_options, true));
runner
.run(
&(arbitrary::nonempty_range(5), arbitrary::rng()),
|(range, mut rng)| {
runtime.block_on(assert_seek_fast_forwards_iterator(
&table, &db, &range, &mut rng,
));
Ok(())
},
)
.unwrap();
async fn assert_seek_fast_forwards_iterator(
table: &BTreeMap<Bytes, Bytes>,
db: &Db,
scan_range: &BytesRange,
rng: &mut TestRng,
) {
let mut iter = db
.inner
.scan_with_options(scan_range.clone(), &ScanOptions::default())
.await
.unwrap();
let seek_key = sample::bytes_in_range(rng, scan_range);
iter.seek(seek_key.clone()).await.unwrap();
let seek_range = BytesRange::new(Included(seek_key), scan_range.end_bound().cloned());
test_utils::assert_ranged_db_scan(table, seek_range, &mut iter).await;
}
}
#[tokio::test]
async fn test_write_batch() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key2", b"value2");
batch.delete(b"key1");
kv_store.write(batch).await.expect("write batch failed");
assert_eq!(kv_store.get(b"key1").await.unwrap(), None);
assert_eq!(
kv_store.get(b"key2").await.unwrap(),
Some(Bytes::from_static(b"value2"))
);
kv_store.close().await.unwrap();
}
#[cfg(feature = "wal_disable")]
#[tokio::test]
async fn test_write_batch_without_wal() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut options = test_db_options(0, 8, None);
options.wal_enabled = false;
let kv_store = Db::builder("/tmp/test_kv_store_without_wal", object_store.clone())
.with_settings(options)
.build()
.await
.unwrap();
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key2", b"value2");
batch.delete(b"key1");
kv_store.write(batch).await.expect("write batch failed");
assert_eq!(kv_store.get(b"key1").await.unwrap(), None);
assert_eq!(kv_store.get(b"key2").await.unwrap(), Some("value2".into()));
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_write_batch_with_empty_key() {
let mut batch = WriteBatch::new();
let result = std::panic::catch_unwind(move || {
batch.put(b"", b"value");
});
assert!(
result.is_err(),
"Expected panic when using empty key in put operation"
);
let mut batch = WriteBatch::new();
let result = std::panic::catch_unwind(move || {
batch.delete(b"");
});
assert!(
result.is_err(),
"Expected panic when using empty key in delete operation"
);
}
#[tokio::test]
async fn test_concurrent_batch_writes_consistency() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Arc::new(
Db::builder("/tmp/test_concurrent_kv_store", object_store)
.with_settings(test_db_options(
0,
1024,
Some(CompactorOptions {
poll_interval: Duration::from_millis(100),
max_sst_size: 256,
max_concurrent_compactions: 1,
manifest_update_timeout: Duration::from_secs(300),
..Default::default()
}),
))
.build()
.await
.unwrap(),
);
const NUM_KEYS: usize = 100;
const NUM_ROUNDS: usize = 20;
for _ in 0..NUM_ROUNDS {
let task1 = {
let store = kv_store.clone();
tokio::spawn(async move {
let mut batch = WriteBatch::new();
for key in 1..=NUM_KEYS {
batch.put(key.to_be_bytes(), key.to_be_bytes());
}
store.write(batch).await.expect("write batch failed");
})
};
let task2 = {
let store = kv_store.clone();
tokio::spawn(async move {
let mut batch = WriteBatch::new();
for key in 1..=NUM_KEYS {
let value = (key * 2).to_be_bytes();
batch.put(key.to_be_bytes(), value);
}
store.write(batch).await.expect("write batch failed");
})
};
join_all(vec![task1, task2]).await;
let mut all_key = true;
let mut all_key2 = true;
for key in 1..=NUM_KEYS {
let value = kv_store.get(key.to_be_bytes()).await.unwrap();
let value = value.expect("Value should exist");
if value.as_ref() != key.to_be_bytes() {
all_key = false;
}
if value.as_ref() != (key * 2).to_be_bytes() {
all_key2 = false;
}
}
assert!(
all_key || all_key2,
"Inconsistent state: not all values match either key or key * 2"
);
}
kv_store.close().await.unwrap();
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_disable_wal_after_wal_enabled() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let options = test_db_options(0, 32, None);
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.build()
.await
.unwrap();
db.put(&[b'a'; 4], &[b'j'; 4]).await.unwrap();
db.put(&[b'b'; 4], &[b'k'; 4]).await.unwrap();
db.close().await.unwrap();
let mut options = test_db_options(0, 32, None);
options.wal_enabled = false;
let db = Db::builder(path, object_store.clone())
.with_settings(options.clone())
.build()
.await
.unwrap();
db.delete_with_options(
&[b'b'; 4],
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put(&[b'a'; 4], &[b'z'; 64]).await.unwrap();
db.close().await.unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(options.clone())
.build()
.await
.unwrap();
let val = db.get(&[b'a'; 4]).await.unwrap();
assert_eq!(val.unwrap(), Bytes::copy_from_slice(&[b'z'; 64]));
let val = db.get(&[b'b'; 4]).await.unwrap();
assert!(val.is_none());
}
#[cfg(feature = "wal_disable")]
#[tokio::test]
async fn test_wal_disabled() {
use crate::{test_utils::assert_iterator, types::RowEntry};
let clock = Arc::new(MockSystemClock::new());
let mut options = test_db_options(0, 300, None);
options.wal_enabled = false;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_kv_store");
let sst_format = SsTableFormat::default();
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store.clone(), None),
sst_format,
path.clone(),
None,
));
let db = Db::builder(path.clone(), object_store.clone())
.with_settings(options)
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&path, object_store.clone()));
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let write_options = WriteOptions {
await_durable: false,
};
db.put_with_options(
&[b'a'; 32],
&[b'j'; 32],
&PutOptions::default(),
&write_options,
)
.await
.unwrap();
db.delete_with_options(&[b'b'; 31], &write_options)
.await
.unwrap();
let write_options = WriteOptions {
await_durable: true,
};
clock.set(10);
db.put_with_options(
&[b'c'; 32],
&[b'l'; 32],
&PutOptions::default(),
&write_options,
)
.await
.unwrap();
let state = wait_for_manifest_condition(
&mut stored_manifest,
|s| !s.l0.is_empty(),
Duration::from_secs(30),
)
.await;
assert_eq!(state.l0.len(), 1);
let l0 = state.l0.front().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
l0,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(&[b'a'; 32], &[b'j'; 32], 1).with_create_ts(0),
RowEntry::new_tombstone(&[b'b'; 31], 2).with_create_ts(0),
RowEntry::new_value(&[b'c'; 32], &[b'l'; 32], 3).with_create_ts(10),
],
)
.await;
}
#[tokio::test]
async fn test_put_flushes_memtable() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 256, None))
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let sst_format = SsTableFormat {
min_filter_keys: 10,
..SsTableFormat::default()
};
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store.clone(), None),
sst_format,
path,
None,
));
let mut last_wal_id = 0;
for i in 0..3 {
let key = [b'a' + i; 16];
let value = [b'b' + i; 50];
kv_store.put(&key, &value).await.unwrap();
let key = [b'j' + i; 16];
let value = [b'k' + i; 50];
kv_store.put(&key, &value).await.unwrap();
let db_state = wait_for_manifest_condition(
&mut stored_manifest,
|s| s.replay_after_wal_id > last_wal_id,
Duration::from_secs(30),
)
.await;
assert_eq!(db_state.replay_after_wal_id, (i as u64) * 2 + 2);
last_wal_id = db_state.replay_after_wal_id
}
let manifest = stored_manifest.refresh().await.unwrap();
let l0 = &manifest.core.l0;
assert_eq!(l0.len(), 3);
let sst_iter_options = SstIteratorOptions::default();
for i in 0u8..3u8 {
let sst1 = l0.get(2 - i as usize).unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
sst1,
table_store.clone(),
sst_iter_options,
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let kv: KeyValue = iter.next().await.unwrap().unwrap().into();
assert_eq!(kv.key.as_ref(), [b'a' + i; 16]);
assert_eq!(kv.value.as_ref(), [b'b' + i; 50]);
let kv: KeyValue = iter.next().await.unwrap().unwrap().into();
assert_eq!(kv.key.as_ref(), [b'j' + i; 16]);
assert_eq!(kv.value.as_ref(), [b'k' + i; 50]);
let kv = iter.next().await.unwrap().map(KeyValue::from);
assert!(kv.is_none());
}
assert!(lookup_metric(&metrics_recorder, IMMUTABLE_MEMTABLE_FLUSHES).is_some_and(|v| v > 0));
}
#[tokio::test]
async fn test_put_flushes_memtable_after_max_wal_flushes() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_flush_memtable_max_wal_flushes";
let mut settings = test_db_options(0, usize::MAX, None);
settings.flush_interval = None;
let kv_store = Db::builder(path, object_store.clone())
.with_settings(settings)
.build()
.await
.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let write_options: WriteOptions = WriteOptions {
await_durable: false,
};
let put_options = PutOptions::default();
for i in 0..(MAX_WAL_FLUSHES_BEFORE_L0_FLUSH - 1) {
let key = format!("key{:08}", i);
kv_store
.put_with_options(key.as_bytes(), b"v", &put_options, &write_options)
.await
.unwrap();
kv_store.flush().await.unwrap();
}
let wal_id = kv_store.inner.wal_buffer.recent_flushed_wal_id();
assert_eq!(wal_id, MAX_WAL_FLUSHES_BEFORE_L0_FLUSH);
{
let guard = kv_store.inner.state.read();
assert!(guard.state().imm_memtable.is_empty());
assert_eq!(guard.state().core().l0.len(), 0);
}
let key = format!("key{:08}", MAX_WAL_FLUSHES_BEFORE_L0_FLUSH - 1);
kv_store
.put_with_options(key.as_bytes(), b"v", &put_options, &write_options)
.await
.unwrap();
let db_state = wait_for_manifest_condition(
&mut stored_manifest,
|s| s.replay_after_wal_id == MAX_WAL_FLUSHES_BEFORE_L0_FLUSH,
Duration::from_secs(30),
)
.await;
assert_eq!(db_state.l0.len(), 1);
for i in 0..(MAX_WAL_FLUSHES_BEFORE_L0_FLUSH - 1) {
let key = format!("key{:08}", i);
kv_store
.put_with_options(key.as_bytes(), b"v", &put_options, &write_options)
.await
.unwrap();
kv_store.flush().await.unwrap();
}
{
let guard = kv_store.inner.state.read();
assert_eq!(guard.state().core().l0.len(), 1);
}
let key = format!("key{:08}", MAX_WAL_FLUSHES_BEFORE_L0_FLUSH);
kv_store
.put_with_options(key.as_bytes(), b"v", &put_options, &write_options)
.await
.unwrap();
let db_state = wait_for_manifest_condition(
&mut stored_manifest,
|s| s.replay_after_wal_id == MAX_WAL_FLUSHES_BEFORE_L0_FLUSH * 2,
Duration::from_secs(30),
)
.await;
assert_eq!(db_state.l0.len(), 2);
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_flush_memtable_with_wal_enabled() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_flush_with_options";
let mut options = test_db_options(0, 256, None);
options.flush_interval = Some(Duration::from_secs(u64::MAX));
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let kv_store = Db::builder(path, object_store.clone())
.with_settings(options)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let sst_format = SsTableFormat {
min_filter_keys: 10,
..SsTableFormat::default()
};
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store.clone(), None),
sst_format,
path,
None,
));
let key1 = b"test_key_1";
let value1 = b"test_value_1";
kv_store
.put_with_options(
key1,
value1,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let key2 = b"test_key_2";
let value2 = b"test_value_2";
kv_store
.put_with_options(
key2,
value2,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let initial_manifest = stored_manifest.refresh().await.unwrap();
let initial_l0_count = initial_manifest.core.l0.len();
let initial_flush_count =
lookup_metric(&metrics_recorder, IMMUTABLE_MEMTABLE_FLUSHES).unwrap();
kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let db_state = wait_for_manifest_condition(
&mut stored_manifest,
|s| s.l0.len() > initial_l0_count,
Duration::from_secs(30),
)
.await;
assert_eq!(db_state.l0.len(), initial_l0_count + 1);
let final_flush_count =
lookup_metric(&metrics_recorder, IMMUTABLE_MEMTABLE_FLUSHES).unwrap();
assert!(final_flush_count > initial_flush_count);
let recent_flushed_wal_id = kv_store.inner.wal_buffer.recent_flushed_wal_id();
assert_eq!(recent_flushed_wal_id, 2);
let retrieved_value1 = kv_store.get(key1).await.unwrap().unwrap();
assert_eq!(retrieved_value1.as_ref(), value1);
let retrieved_value2 = kv_store.get(key2).await.unwrap().unwrap();
assert_eq!(retrieved_value2.as_ref(), value2);
let latest_sst = db_state.l0.back().unwrap();
let sst_iter_options = SstIteratorOptions::default();
let mut iter = SstIterator::new_borrowed_initialized(
..,
latest_sst,
table_store.clone(),
sst_iter_options,
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let mut found_keys = std::collections::HashSet::new();
while let Some(kv) = iter.next().await.unwrap().map(KeyValue::from) {
found_keys.insert(kv.key.to_vec());
}
assert!(found_keys.contains(key1.as_slice()));
assert!(found_keys.contains(key2.as_slice()));
}
#[tokio::test]
async fn test_memtable_flush_also_flushes_wal() {
let main_object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let wal_object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_memtable_flush_also_flushes_wal";
let mut settings = test_db_options(0, 1024, None);
settings.flush_interval = None;
let kv_store = Db::builder(path, main_object_store)
.with_settings(settings)
.with_wal_object_store(wal_object_store.clone())
.build()
.await
.unwrap();
let key = b"wal_flush_key";
let value = b"wal_flush_value";
kv_store
.put_with_options(
key,
value,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(kv_store.inner.wal_buffer.buffered_wal_entries_count(), 1);
kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
assert_eq!(kv_store.inner.wal_buffer.buffered_wal_entries_count(), 0);
let wal_reader = WalReader::new(path, wal_object_store);
let wal_files = wal_reader.list(..).await.unwrap();
assert_eq!(wal_files.len(), 2); let mut rows = Vec::new();
let mut wal_iter = wal_files[1] .iterator()
.await
.expect("expected successful WAL iterator call");
while let Some(entry) = wal_iter
.next()
.await
.expect("expected successful WAL rows read")
{
rows.push(entry);
}
assert_eq!(rows.len(), 1);
let row = &rows[0];
assert_eq!(row.key.as_ref(), key);
assert_eq!(
row.value.as_bytes().expect("expected bytes").as_ref(),
value
);
assert_eq!(row.seq, 1);
}
async fn test_sequence_tracker_persisted_across_flush_and_reload_impl(wal_enabled: bool) {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_sequence_tracker_flush";
let mut settings = test_db_options(0, 256, None);
settings.flush_interval = None;
#[cfg(feature = "wal_disable")]
{
settings.wal_enabled = wal_enabled;
}
#[cfg(not(feature = "wal_disable"))]
let _ = wal_enabled;
let system_clock = Arc::new(MockSystemClock::new());
let kv_store = Db::builder(path, object_store.clone())
.with_settings(settings.clone())
.with_system_clock(system_clock.clone())
.build()
.await
.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let write_options = WriteOptions {
await_durable: false,
};
let put_options = PutOptions::default();
let timestamps_ms = [0_i64, 60_000, 120_000];
for (idx, ts) in timestamps_ms.iter().enumerate() {
system_clock.set(*ts);
let key = format!("key-{idx}").into_bytes();
let value = format!("value-{idx}").into_bytes();
kv_store
.put_with_options(&key, &value, &put_options, &write_options)
.await
.unwrap();
}
kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let target_ts = Utc.timestamp_opt(120, 0).single().unwrap();
let persisted_state = wait_for_manifest_condition(
&mut stored_manifest,
move |core| {
core.sequence_tracker
.find_seq(target_ts, FindOption::RoundDown)
== Some(3)
},
Duration::from_secs(5),
)
.await;
let tracker = persisted_state.sequence_tracker.clone();
let live_tracker = kv_store
.inner
.state
.read()
.state()
.core()
.sequence_tracker
.clone();
assert_eq!(tracker, live_tracker);
let seq1_ts = tracker.find_ts(1, FindOption::RoundDown).unwrap();
assert_eq!(seq1_ts.timestamp(), 0);
let seq2_ts = tracker.find_ts(2, FindOption::RoundDown).unwrap();
assert_eq!(seq2_ts.timestamp(), 60);
let seq3_ts = tracker.find_ts(3, FindOption::RoundDown).unwrap();
assert_eq!(seq3_ts.timestamp(), 120);
let ts_lookup = Utc.timestamp_opt(60, 0).single().unwrap();
assert_eq!(tracker.find_seq(ts_lookup, FindOption::RoundDown), Some(2));
kv_store.close().await.unwrap();
let reopened = Db::builder(path, object_store.clone())
.with_settings(settings)
.with_system_clock(system_clock.clone())
.build()
.await
.unwrap();
let reopened_tracker = reopened
.inner
.state
.read()
.state()
.core()
.sequence_tracker
.clone();
assert_eq!(tracker, reopened_tracker);
reopened.close().await.unwrap();
}
#[tokio::test]
async fn test_sequence_tracker_persisted_across_flush_and_reload_wal_enabled() {
test_sequence_tracker_persisted_across_flush_and_reload_impl(true).await;
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_sequence_tracker_persisted_across_flush_and_reload_wal_disabled() {
test_sequence_tracker_persisted_across_flush_and_reload_impl(false).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_sequence_tracker_not_ahead_of_last_l0_seq_when_flush_races_with_writes() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut settings = test_db_options(0, 2048, None);
settings.flush_interval = None;
settings.manifest_poll_interval = Duration::from_secs(60 * 60);
let system_clock = Arc::new(MockSystemClock::new());
let db = Db::builder(
"/tmp/test_sequence_tracker_flush_race",
object_store.clone(),
)
.with_settings(settings)
.with_system_clock(system_clock.clone())
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let write_options = WriteOptions {
await_durable: false,
};
async fn put_with_timestamp(
db: &Db,
system_clock: &Arc<MockSystemClock>,
idx: usize,
write_options: &WriteOptions,
) {
system_clock.set((idx as i64) * 60_000);
let key = format!("race-key-{idx}").into_bytes();
let value = format!("race-value-{idx}").into_bytes();
db.put_with_options(&key, &value, &PutOptions::default(), write_options)
.await
.unwrap();
}
put_with_timestamp(&db, &system_clock, 0, &write_options).await;
put_with_timestamp(&db, &system_clock, 1, &write_options).await;
fail_parallel::cfg(
fp_registry.clone(),
"after-flush-imm-to-l0-before-manifest",
"pause",
)
.unwrap();
let flush_handle = {
let inner = Arc::clone(&db.inner);
tokio::spawn(async move { inner.flush_memtables(FlushTarget::All).await })
};
let mut wrote_l0_sst = false;
for _ in 0..6000 {
let ssts = db.inner.table_store.list_compacted_ssts(..).await.unwrap();
if !ssts.is_empty() {
wrote_l0_sst = true;
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(
wrote_l0_sst,
"L0 SST was not written before manifest update pause"
);
put_with_timestamp(&db, &system_clock, 2, &write_options).await;
put_with_timestamp(&db, &system_clock, 3, &write_options).await;
fail_parallel::cfg(
fp_registry.clone(),
"after-flush-imm-to-l0-before-manifest",
"off",
)
.unwrap();
flush_handle.await.unwrap().unwrap();
{
let guard = db.inner.state.read();
assert!(guard.state().imm_memtable.is_empty());
}
let manifest_state = {
let guard = db.inner.state.read();
guard.state().manifest.value.core.clone()
};
let last_l0_seq = manifest_state.last_l0_seq;
assert!(
last_l0_seq >= 2,
"expected flushed memtable to advance last_l0_seq"
);
assert!(
manifest_state
.sequence_tracker
.find_ts(last_l0_seq + 1, FindOption::RoundUp)
.is_none(),
"sequence tracker should not advance beyond last_l0_seq (last_l0_seq={})",
last_l0_seq
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_flush_with_options_wal() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_flush_with_options_wal";
let mut options = test_db_options(0, 1024, None);
options.flush_interval = Some(Duration::from_secs(u64::MAX));
fail_parallel::cfg(
fp_registry.clone(),
"write-compacted-sst-io-error",
"return",
)
.unwrap();
let kv_store = Db::builder(path, object_store.clone())
.with_settings(options)
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let key1 = b"wal_test_key_1";
let value1 = b"wal_test_value_1";
kv_store
.put_with_options(
key1,
value1,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let key2 = b"wal_test_key_2";
let value2 = b"wal_test_value_2";
kv_store
.put_with_options(
key2,
value2,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let initial_wal_id = kv_store.inner.wal_buffer.recent_flushed_wal_id();
let flush_result = kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await;
assert!(flush_result.is_ok(), "WAL flush should succeed");
let retrieved_value1 = kv_store.get(key1).await.unwrap().unwrap();
assert_eq!(retrieved_value1.as_ref(), value1);
let retrieved_value2 = kv_store.get(key2).await.unwrap().unwrap();
assert_eq!(retrieved_value2.as_ref(), value2);
let final_wal_id = kv_store.inner.wal_buffer.recent_flushed_wal_id();
assert!(
final_wal_id >= initial_wal_id,
"WAL ID should not decrease after flush"
);
assert!(
kv_store.inner.status().close_reason.is_none(),
"DB should not have an error state"
);
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_flush_with_options_wal_disabled_error() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_flush_with_options_wal_disabled";
let mut options = test_db_options(0, 1024, None);
options.wal_enabled = false; let kv_store = Db::builder(path, object_store.clone())
.with_settings(options)
.build()
.await
.unwrap();
let key1 = b"test_key_1";
let value1 = b"test_value_1";
kv_store
.put_with_options(
key1,
value1,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let flush_result = kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await;
assert!(flush_result.is_err(), "Expected WalDisabled error");
let error = flush_result.unwrap_err();
assert!(
error
.to_string()
.contains("attempted a WAL operation when the WAL is disabled"),
"Expected WalDisabled error message, got: {}",
error
);
let memtable_flush_result = kv_store
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await;
assert!(
memtable_flush_result.is_ok(),
"Memtable flush should work even when WAL is disabled"
);
let retrieved_value1 = kv_store.get(key1).await.unwrap().unwrap();
assert_eq!(retrieved_value1.as_ref(), value1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_apply_wal_memory_backpressure() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_kv_store");
let mut options = test_db_options(0, 1, None);
options.max_unflushed_bytes = 1;
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.with_fp_registry(fp_registry.clone())
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
let metrics_recorder_clone = metrics_recorder.clone();
let write_opts = WriteOptions {
await_durable: false,
};
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
let wait_for = async move |condition: Box<dyn Fn() -> bool>| {
for _ in 0..3000 {
if condition() {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
};
db.put_with_options(b"key1", b"val1", &PutOptions::default(), &write_opts)
.await
.unwrap();
let this_wal_buffer = db.inner.wal_buffer.clone();
wait_for(Box::new(move || {
this_wal_buffer.buffered_wal_entries_count() > 0
}))
.await;
assert_eq!(db.inner.wal_buffer.buffered_wal_entries_count(), 1);
let join_handle = tokio::spawn(async move {
db.put_with_options(b"key2", b"val2", &PutOptions::default(), &write_opts)
.await
.unwrap();
});
let this_recorder = metrics_recorder_clone.clone();
wait_for(Box::new(move || {
lookup_metric(&this_recorder, crate::db_stats::BACKPRESSURE_COUNT)
.is_some_and(|v| v > 0)
}))
.await;
assert!(
lookup_metric(&metrics_recorder_clone, crate::db_stats::BACKPRESSURE_COUNT).unwrap()
>= 1
);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
join_handle.abort();
let _ = join_handle.await;
}
#[tokio::test]
async fn test_apply_backpressure_to_memtable_flush() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut options = test_db_options(0, 1, None);
options.l0_max_ssts = 4;
let db = Db::builder("/tmp/test_kv_store", object_store.clone())
.with_settings(options)
.build()
.await
.unwrap();
db.put(b"key1", b"val1").await.unwrap();
db.put(b"key2", b"val2").await.unwrap();
db.put(b"key3", b"val3").await.unwrap();
db.put(b"key4", b"val4").await.unwrap();
db.put(b"key5", b"val5").await.unwrap();
db.flush().await.unwrap();
let db_state = db.inner.state.read().view();
assert_eq!(db_state.state.imm_memtable.len(), 1);
}
#[tokio::test]
async fn test_put_empty_value() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let key = b"test_key";
let value = b"";
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
}
#[tokio::test]
async fn test_flush_while_iterating() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_system_clock(Arc::new(MockSystemClock::new()))
.build()
.await
.unwrap();
let memtable = {
let lock = kv_store.inner.state.read();
lock.memtable()
.put(RowEntry::new_value(b"abc1111", b"value1111", 1));
lock.memtable()
.put(RowEntry::new_value(b"abc2222", b"value2222", 2));
lock.memtable()
.put(RowEntry::new_value(b"abc3333", b"value3333", 3));
lock.memtable().table().clone()
};
let mut iter = memtable.iter();
let kv: KeyValue = iter.next().await.unwrap().unwrap().into();
assert_eq!(kv.key, b"abc1111".as_slice());
kv_store.flush().await.unwrap();
let kv: KeyValue = iter.next().await.unwrap().unwrap().into();
assert_eq!(kv.key, b"abc2222".as_slice());
let kv: KeyValue = iter.next().await.unwrap().unwrap().into();
assert_eq!(kv.key, b"abc3333".as_slice());
}
#[tokio::test]
async fn test_basic_restore() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let mut next_wal_id = 1;
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 600, None))
.with_system_clock(Arc::new(MockSystemClock::new()))
.build()
.await
.unwrap();
next_wal_id += 1;
let write_opts = WriteOptions {
await_durable: false,
};
let l0_count: u64 = 3;
for i in 0..l0_count {
kv_store
.put_with_options(
&[b'a' + i as u8; 16],
&[b'b' + i as u8; 48],
&PutOptions::default(),
&write_opts,
)
.await
.unwrap();
kv_store.flush().await.unwrap();
kv_store
.put_with_options(
&[b'j' + i as u8; 16],
&[b'k' + i as u8; 48],
&PutOptions::default(),
&write_opts,
)
.await
.unwrap();
kv_store.flush().await.unwrap();
next_wal_id += 2;
}
let sst_count: u64 = 5;
for i in 0..sst_count {
kv_store
.put_with_options(
&i.to_be_bytes(),
&i.to_be_bytes(),
&PutOptions::default(),
&write_opts,
)
.await
.unwrap();
kv_store.flush().await.unwrap();
next_wal_id += 1;
}
kv_store.close().await.unwrap();
let kv_store_restored = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_system_clock(Arc::new(MockSystemClock::new()))
.build()
.await
.unwrap();
next_wal_id += 1;
for i in 0..l0_count {
let val = kv_store_restored.get([b'a' + i as u8; 16]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[b'b' + i as u8; 48])));
let val = kv_store_restored.get([b'j' + i as u8; 16]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[b'k' + i as u8; 48])));
}
for i in 0..sst_count {
let val = kv_store_restored.get(i.to_be_bytes()).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&i.to_be_bytes())));
}
kv_store_restored.close().await.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let stored_manifest =
StoredManifest::load(manifest_store, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
assert_eq!(db_state.next_wal_sst_id, next_wal_id);
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn test_restore_seq_number() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 256, None))
.with_system_clock(Arc::new(MockSystemClock::new()))
.build()
.await
.unwrap();
db.put_with_options(
b"key1",
b"val1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
b"key2",
b"val2",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
b"key3",
b"val3",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
db.close().await.unwrap();
let db_restored = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 256, None))
.with_system_clock(Arc::new(MockSystemClock::new()))
.build()
.await
.unwrap();
let state = db_restored.inner.state.read();
let memtable = state.memtable();
let mut iter = memtable.table().iter();
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(b"key1", b"val1", 1).with_create_ts(0),
RowEntry::new_value(b"key2", b"val2", 2).with_create_ts(0),
RowEntry::new_value(b"key3", b"val3", 3).with_create_ts(0),
],
)
.await;
}
#[tokio::test]
async fn test_read_merges_from_snapshot_across_compaction() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/testdb";
let should_compact_l0 = Arc::new(AtomicBool::new(false));
let this_should_compact_l0 = should_compact_l0.clone();
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
move |_state| this_should_compact_l0.swap(false, Ordering::SeqCst),
)));
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024 * 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator {}))
.with_compactor_builder(
CompactorBuilder::new(path, object_store.clone())
.with_scheduler_supplier(compaction_scheduler.clone()),
)
.build()
.await
.unwrap();
let db = Arc::new(db);
db.merge(b"foo", b"0").await.unwrap();
let snapshot = db.snapshot().await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
db.merge(b"foo", b"1").await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
should_compact_l0.store(true, Ordering::SeqCst);
let db_poll = db.clone();
tokio::time::timeout(Duration::from_secs(10), async move {
loop {
{
let db_state = db_poll.inner.state.read();
if !db_state.state().core().compacted.is_empty() {
return;
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
let result = snapshot.get(b"foo").await.unwrap();
assert_eq!(result, Some(Bytes::copy_from_slice(b"0")));
let result = db.get(b"foo").await.unwrap();
assert_eq!(result, Some(Bytes::copy_from_slice(b"01")));
}
#[tokio::test]
async fn test_all_kv_seq_num_are_greater_than_0() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store_seq_num";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024 * 1024, None))
.build()
.await
.unwrap();
db.put(b"key1", b"value1").await.unwrap();
let val = db.get(b"key1").await.unwrap();
assert_eq!(val, Some(Bytes::from_static(b"value1")));
let state = db.inner.state.read();
let memtable = state.memtable();
assert_eq!(memtable.table().last_seq(), Some(1));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_should_read_uncommitted_data_if_read_level_uncommitted() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
kv_store
.put_with_options(
"foo".as_bytes(),
"bar".as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(val, Some("bar".into()));
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(val, None);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
kv_store.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_should_read_only_committed_data() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
kv_store
.put("foo".as_bytes(), "bar".as_bytes())
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
kv_store
.put_with_options(
"foo".as_bytes(),
"bla".as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(val, Some("bar".into()));
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(val, Some("bla".into()));
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_should_delete_without_awaiting_flush() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
kv_store
.put("foo".as_bytes(), "bar".as_bytes())
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
kv_store
.delete_with_options(
"foo".as_bytes(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
assert_eq!(val, Some("bar".into()));
let val = kv_store
.get_with_options(
"foo".as_bytes(),
&ReadOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
assert_eq!(val, None);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_scan_should_read_only_committed_data() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let kv_store = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
kv_store
.put("key1".as_bytes(), "committed1".as_bytes())
.await
.unwrap();
kv_store
.put("key2".as_bytes(), "committed2".as_bytes())
.await
.unwrap();
kv_store
.put("key3".as_bytes(), "committed3".as_bytes())
.await
.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "pause").unwrap();
kv_store
.put_with_options(
"key2".as_bytes(),
"uncommitted2".as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
kv_store
.put_with_options(
"key4".as_bytes(),
"uncommitted4".as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let mut iter = kv_store
.scan_with_options(
"key1".as_bytes().."key5".as_bytes(),
&ScanOptions::new().with_durability_filter(Remote),
)
.await
.unwrap();
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key1");
assert_eq!(kv.value.as_ref(), b"committed1");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key2");
assert_eq!(kv.value.as_ref(), b"committed2");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key3");
assert_eq!(kv.value.as_ref(), b"committed3");
assert_eq!(iter.next().await.unwrap(), None);
let mut iter = kv_store
.scan_with_options(
"key1".as_bytes().."key5".as_bytes(),
&ScanOptions::new().with_durability_filter(Memory),
)
.await
.unwrap();
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key1");
assert_eq!(kv.value.as_ref(), b"committed1");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key2");
assert_eq!(kv.value.as_ref(), b"uncommitted2");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key3");
assert_eq!(kv.value.as_ref(), b"committed3");
let kv = iter.next().await.unwrap().unwrap();
assert_eq!(kv.key.as_ref(), b"key4");
assert_eq!(kv.value.as_ref(), b"uncommitted4");
assert_eq!(iter.next().await.unwrap(), None);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "off").unwrap();
kv_store.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_recover_imm_from_wal() {
let fp_registry = Arc::new(FailPointRegistry::new());
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "pause").unwrap();
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let mut next_wal_id = 1;
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
next_wal_id += 1;
let key1 = [b'a'; 32];
let value1 = [b'b'; 96];
db.put(key1, value1).await.unwrap();
next_wal_id += 1;
let key2 = [b'c'; 32];
let value2 = [b'd'; 96];
db.put(key2, value2).await.unwrap();
next_wal_id += 1;
let reader = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
next_wal_id += 1;
let db_state = reader.inner.state.read().view();
assert_eq!(db_state.state.imm_memtable.len(), 2);
assert_eq!(
db_state
.state
.imm_memtable
.front()
.unwrap()
.recent_flushed_wal_id(),
1 + 2
);
assert_eq!(
db_state
.state
.imm_memtable
.get(1)
.unwrap()
.recent_flushed_wal_id(),
2
);
assert_eq!(db_state.state.core().next_wal_sst_id, next_wal_id);
assert_eq!(
reader.get(key1).await.unwrap(),
Some(Bytes::copy_from_slice(&value1))
);
assert_eq!(
reader.get(key2).await.unwrap(),
Some(Bytes::copy_from_slice(&value2))
);
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "off").unwrap();
db.close().await.unwrap();
reader.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_recover_imm_from_wal_after_flush_error() {
let fp_registry = Arc::new(FailPointRegistry::new());
fail_parallel::cfg(
fp_registry.clone(),
"write-compacted-sst-io-error",
"return",
)
.unwrap();
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 4096, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let key1 = [b'a'; 32];
let value1 = [b'b'; 96];
let result = db.put(&key1, &value1).await;
assert!(result.is_ok(), "Failed to write key1");
assert_eq!(db.inner.wal_buffer.recent_flushed_wal_id(), 2);
db.close().await.unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "pause").unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let db_state = db.inner.state.read().view();
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "off").unwrap();
assert_eq!(db_state.state.imm_memtable.len(), 1);
assert_eq!(db_state.state.core().l0.len(), 0);
assert_eq!(db_state.state.core().compacted.len(), 0);
assert_eq!(
db_state
.state
.imm_memtable
.front()
.unwrap()
.recent_flushed_wal_id(),
1 + 1
);
assert!(db_state.state.imm_memtable.get(1).is_none());
assert_eq!(db_state.state.core().next_wal_sst_id, 4);
assert_eq!(
db.get(key1).await.unwrap(),
Some(Bytes::copy_from_slice(&value1))
);
}
#[tokio::test]
async fn test_should_fail_write_if_wal_flush_task_panics() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Arc::new(
Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap(),
);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "panic").unwrap();
let result = db.put(b"foo", b"bar").await.unwrap_err();
assert!(result.to_string().contains("background task panicked"));
}
#[tokio::test]
async fn test_wal_id_last_seen_should_exist_even_if_wal_write_fails() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Arc::new(
Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap(),
);
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "panic").unwrap();
let result = db.put(b"foo", b"bar").await.unwrap_err();
assert_eq!(result.kind(), crate::ErrorKind::Closed(CloseReason::Panic));
assert!(result
.to_string()
.contains("background task panicked. name=`wal_writer`"));
db.close().await.unwrap();
let manifest_store = ManifestStore::new(&Path::from(path), object_store.clone());
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store.clone(), None),
SsTableFormat::default(),
path,
None,
));
let next_wal_sst_id = table_store.next_wal_sst_id(0).await.unwrap();
let (_, manifest) = manifest_store.read_latest_manifest().await.unwrap();
assert!(manifest.core.next_wal_sst_id > next_wal_sst_id);
}
async fn do_test_should_read_compacted_db(mut options: Settings) {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let should_compact_l0 = Arc::new(AtomicBool::new(false));
let this_should_compact_l0 = should_compact_l0.clone();
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
move |_state| this_should_compact_l0.swap(false, Ordering::SeqCst),
)));
let compactor_options = options.compactor_options.take();
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.with_compactor_builder(
CompactorBuilder::new(path, object_store.clone())
.with_scheduler_supplier(compaction_scheduler.clone())
.with_options(compactor_options.unwrap()),
)
.build()
.await
.unwrap();
let ms = ManifestStore::new(&Path::from(path), object_store.clone());
let mut sm = StoredManifest::load(Arc::new(ms), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
for i in 0..4 {
db.put(&[b'a' + i; 32], &[1u8 + i; 32]).await.unwrap();
db.put(&[b'm' + i; 32], &[13u8 + i; 32]).await.unwrap();
}
wait_for_manifest_condition(
&mut sm,
|s| {
should_compact_l0.store(true, Ordering::SeqCst);
s.last_compacted_l0_sst_view_id.is_some() && s.l0.is_empty()
},
Duration::from_secs(10),
)
.await;
let manifest = db.manifest();
info!("1 l0: {} {}", manifest.l0.len(), manifest.compacted.len());
for i in 0..4 {
db.put(&[b'f' + i; 32], &[6u8 + i; 32]).await.unwrap();
db.put(&[b's' + i; 32], &[19u8 + i; 32]).await.unwrap();
}
wait_for_manifest_condition(
&mut sm,
|s| {
should_compact_l0.store(true, Ordering::SeqCst);
s.last_compacted_l0_sst_view_id.is_some() && s.l0.is_empty()
},
Duration::from_secs(10),
)
.await;
let manifest = db.manifest();
info!("2 l0: {} {}", manifest.l0.len(), manifest.compacted.len());
db.put(&[b'a'; 32], &[128u8; 32]).await.unwrap();
db.put(&[b'm'; 32], &[129u8; 32]).await.unwrap();
let val = db.get([b'a'; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[128u8; 32])));
let val = db.get([b'm'; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[129u8; 32])));
for i in 1..4 {
let manifest = db.manifest();
info!("3 l0: {} {}", manifest.l0.len(), manifest.compacted.len());
let val = db.get([b'a' + i; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[1u8 + i; 32])));
let val = db.get([b'm' + i; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[13u8 + i; 32])));
}
for i in 0..4 {
let val = db.get([b'f' + i; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[6u8 + i; 32])));
let val = db.get([b's' + i; 32]).await.unwrap();
assert_eq!(val, Some(Bytes::copy_from_slice(&[19u8 + i; 32])));
}
let neg_lookup = db.get(b"abc").await;
assert!(neg_lookup.unwrap().is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_read_from_compacted_db() {
do_test_should_read_compacted_db(test_db_options(
0,
127,
Some(CompactorOptions {
poll_interval: Duration::from_millis(100),
max_sst_size: 256,
max_concurrent_compactions: 1,
manifest_update_timeout: Duration::from_secs(300),
..Default::default()
}),
))
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_read_from_compacted_db_no_filters() {
do_test_should_read_compacted_db(test_db_options(
u32::MAX,
127,
Some(CompactorOptions {
poll_interval: Duration::from_millis(100),
manifest_update_timeout: Duration::from_secs(300),
max_sst_size: 256,
max_concurrent_compactions: 1,
..Default::default()
}),
))
.await
}
#[tokio::test]
async fn test_db_open_should_write_empty_wal() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.build()
.await
.unwrap();
assert_eq!(db.inner.state.read().state().core().next_wal_sst_id, 2);
db.put(b"1", b"1").await.unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.build()
.await
.unwrap();
assert_eq!(db.inner.state.read().state().core().next_wal_sst_id, 4);
}
#[tokio::test]
async fn test_empty_wal_should_fence_old_writer() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
async fn do_put(db: &Db, key: &[u8], val: &[u8]) -> Result<WriteHandle, crate::Error> {
db.put_with_options(
key,
val,
&PutOptions::default(),
&WriteOptions {
await_durable: true,
},
)
.await
}
let db1 = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.build()
.await
.unwrap();
do_put(&db1, b"1", b"1").await.unwrap();
let db2 = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.build()
.await
.unwrap();
let err = do_put(&db1, b"1", b"1").await.unwrap_err();
assert_eq!(err.to_string(), "Closed error: detected newer DB client");
do_put(&db2, b"2", b"2").await.unwrap();
assert_eq!(db2.inner.state.read().state().core().next_wal_sst_id, 5);
}
#[tokio::test]
async fn test_invalid_clock_progression() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let clock = Arc::new(MockSystemClock::new());
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
clock.set(10);
db.put_with_options(
b"1",
b"1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
clock.set(5);
match db
.put_with_options(
b"1",
b"1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
{
Ok(_) => panic!("expected an error on inserting backwards time"),
Err(e) => assert_eq!(e.to_string(), "Invalid error: invalid clock tick, must be monotonic. last_tick=`10`, next_tick=`5`"),
}
}
#[tokio::test]
async fn test_invalid_clock_progression_across_db_instances() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let clock = Arc::new(MockSystemClock::new());
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
clock.set(10);
db.put_with_options(
b"1",
b"1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
let db2 = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 128, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
clock.set(5);
match db2
.put_with_options(
b"1",
b"1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
{
Ok(_) => panic!("expected an error on inserting backwards time"),
Err(e) => assert_eq!(e.to_string(), "Invalid error: invalid clock tick, must be monotonic. last_tick=`10`, next_tick=`5`"),
}
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn should_flush_all_memtables_when_wal_disabled() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db_options = Settings {
wal_enabled: false,
flush_interval: Some(Duration::from_secs(10)),
..Settings::default()
};
let db = Db::builder(path, object_store.clone())
.with_settings(db_options.clone())
.build()
.await
.unwrap();
let mut rng = proptest_util::rng::new_test_rng(None);
let table = sample::table(&mut rng, 1000, 5);
test_utils::seed_database(&db, &table, false).await.unwrap();
db.flush().await.unwrap();
let reopened_db = Db::builder(path, object_store.clone())
.with_settings(db_options.clone())
.build()
.await
.unwrap();
assert_records_in_range(
&table,
&reopened_db,
&ScanOptions::default(),
BytesRange::from(..),
)
.await
}
#[tokio::test]
async fn test_recover_clock_tick_from_wal() {
let clock = Arc::new(MockSystemClock::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
clock.set(10);
db.put_with_options(
&[b'a'; 4],
&[b'j'; 8],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.expect("write batch failed");
clock.set(11);
db.put_with_options(
&[b'b'; 4],
&[b'k'; 8],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.expect("write batch failed");
db.flush().await.unwrap();
db.close().await.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let stored_manifest =
StoredManifest::load(manifest_store, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
let last_clock_tick = db_state.last_l0_clock_tick;
assert_eq!(last_clock_tick, i64::MIN);
let clock = Arc::new(MockSystemClock::new());
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
assert_eq!(db.inner.mono_clock.last_tick.load(Ordering::SeqCst), 11);
}
#[tokio::test]
async fn test_should_update_manifest_clock_tick_on_l0_flush() {
let clock = Arc::new(MockSystemClock::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 32, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
clock.set(10);
db.put(&[b'a'; 4], &[b'j'; 8])
.await
.expect("write batch failed");
clock.set(11);
db.put(&[b'b'; 4], &[b'k'; 8])
.await
.expect("write batch failed");
db.flush().await.unwrap();
db.close().await.unwrap();
let manifest_store = Arc::new(ManifestStore::new(&Path::from(path), object_store.clone()));
let stored_manifest =
StoredManifest::load(manifest_store, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
let last_clock_tick = db_state.last_l0_clock_tick;
assert_eq!(last_clock_tick, 11);
}
#[tokio::test]
#[cfg(feature = "wal_disable")]
async fn test_recover_clock_tick_from_manifest() {
let clock = Arc::new(MockSystemClock::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_kv_store";
let mut options = test_db_options(0, 32, None);
options.wal_enabled = false;
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
clock.set(10);
db.put(&[b'a'; 4], &[b'j'; 28])
.await
.expect("write batch failed");
clock.set(11);
db.put(&[b'b'; 4], &[b'k'; 28])
.await
.expect("write batch failed");
db.flush().await.unwrap();
db.close().await.unwrap();
let clock = Arc::new(MockSystemClock::new());
let mut options = test_db_options(0, 32, None);
options.wal_enabled = false;
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
assert_eq!(db.inner.mono_clock.last_tick.load(Ordering::SeqCst), 11);
}
#[tokio::test]
async fn test_put_get_reopen_delete_with_separate_wal_store() {
async fn count_ssts_in(store: &Arc<InMemory>) -> usize {
store
.list(None)
.filter(|r| {
future::ready(
r.as_ref()
.unwrap()
.location
.extension()
.unwrap()
.to_lowercase()
== "sst",
)
})
.count()
.await
}
let fp_registry = Arc::new(FailPointRegistry::new());
let main_object_store = Arc::new(InMemory::new());
let wal_object_store = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", main_object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_wal_object_store(wal_object_store.clone())
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
assert_eq!(count_ssts_in(&main_object_store).await, 0);
assert_eq!(count_ssts_in(&wal_object_store).await, 1);
let key = b"test_key";
let value = b"test_value";
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "pause").unwrap();
kv_store.put(key, value).await.unwrap();
kv_store.flush().await.unwrap();
assert_eq!(count_ssts_in(&main_object_store).await, 0);
assert_eq!(count_ssts_in(&wal_object_store).await, 2);
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
fail_parallel::cfg(fp_registry.clone(), "write-compacted-sst-io-error", "off").unwrap();
let mut batch = WriteBatch::default();
for i in 0u32..128 {
batch.put(i.to_be_bytes(), i.to_be_bytes());
}
kv_store.write(batch).await.unwrap();
kv_store.flush().await.unwrap();
assert_eq!(count_ssts_in(&main_object_store).await, 1);
assert_eq!(count_ssts_in(&wal_object_store).await, 3);
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
kv_store.close().await.unwrap();
assert_eq!(count_ssts_in(&wal_object_store).await, 3);
let kv_store = Db::builder("/tmp/test_kv_store", main_object_store)
.with_settings(test_db_options(0, 1024, None))
.with_wal_object_store(wal_object_store.clone())
.build()
.await
.unwrap();
assert_eq!(
kv_store.get(key).await.unwrap(),
Some(Bytes::from_static(value))
);
kv_store.delete(key).await.unwrap();
assert_eq!(None, kv_store.get(key).await.unwrap());
kv_store.close().await.unwrap();
}
#[tokio::test]
async fn test_wal_store_reconfiguration_fails() {
let object_store = Arc::new(InMemory::new());
let wal_object_store = Arc::new(InMemory::new());
let kv_store = Db::builder("/tmp/test_kv_store", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_wal_object_store(wal_object_store.clone())
.build()
.await
.unwrap();
kv_store.close().await.unwrap();
let result = Db::builder("/tmp/test_kv_store", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await;
match result {
Err(err) => {
assert!(err.to_string().contains("unsupported"));
}
_ => panic!("expected Unsupported error"),
}
}
#[test]
fn test_write_option_defaults() {
let write_options = WriteOptions::default();
assert!(write_options.await_durable);
}
#[tokio::test]
#[cfg(feature = "zstd")]
async fn test_compression_overflow_bug() {
use crate::config::CompressionCodec;
use std::str::FromStr;
let os: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let compress = CompressionCodec::from_str("zstd").unwrap();
let db_builder = Db::builder("/tmp/test_kv_store", os.clone()).with_settings(Settings {
compression_codec: Some(compress),
..Settings::default()
});
let db = db_builder.build().await.unwrap();
for i in 0..1000 {
let key = format!("k{}", i);
let value = format!("{}{}", "v".repeat(i), i);
let put_option = PutOptions::default();
let write_option = WriteOptions {
await_durable: false,
};
db.put_with_options(key.as_bytes(), value.clone(), &put_option, &write_option)
.await
.expect("failed to put");
}
db.flush().await.expect("flush failed");
db.close().await.expect("failed to close db");
let db_builder = Db::builder("/tmp/test_kv_store", os.clone()).with_settings(Settings {
compression_codec: Some(compress),
..Settings::default()
});
let db = db_builder.build().await.unwrap();
let v = db.get("k1").await.expect("get failed").unwrap();
assert_eq!(v.as_ref(), b"v1");
db.close().await.expect("failed to close db");
}
async fn wait_for_manifest_condition(
sm: &mut StoredManifest,
cond: impl Fn(&ManifestCore) -> bool,
timeout: Duration,
) -> ManifestCore {
let start = tokio::time::Instant::now();
while start.elapsed() < timeout {
let manifest = sm.refresh().await.unwrap();
if cond(&manifest.core) {
return manifest.core.clone();
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!("manifest condition took longer than timeout")
}
fn test_db_options(
min_filter_keys: u32,
l0_sst_size_bytes: usize,
compactor_options: Option<CompactorOptions>,
) -> Settings {
test_db_options_with_ttl(min_filter_keys, l0_sst_size_bytes, compactor_options, None)
}
fn test_db_options_with_ttl(
min_filter_keys: u32,
l0_sst_size_bytes: usize,
compactor_options: Option<CompactorOptions>,
ttl: Option<u64>,
) -> Settings {
Settings {
flush_interval: Some(Duration::from_millis(100)),
#[cfg(feature = "wal_disable")]
wal_enabled: true,
manifest_poll_interval: Duration::from_millis(100),
manifest_update_timeout: Duration::from_secs(300),
max_unflushed_bytes: 134_217_728,
l0_max_ssts: 8,
l0_flush_parallelism: 1,
min_filter_keys,
filter_bits_per_key: 10,
l0_sst_size_bytes,
compactor_options,
compression_codec: None,
object_store_cache_options: ObjectStoreCacheOptions::default(),
garbage_collector_options: None,
default_ttl: ttl,
block_format: None,
}
}
#[tokio::test]
async fn test_snapshot_basic_functionality() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::open("test_db", object_store).await.unwrap();
db.put(b"key1", b"value1").await.unwrap();
db.put(b"key2", b"value2").await.unwrap();
let snapshot = db.snapshot().await.unwrap();
assert_eq!(
snapshot.get(b"key1").await.unwrap(),
Some(Bytes::from(b"value1".as_ref()))
);
assert_eq!(
snapshot.get(b"key2").await.unwrap(),
Some(Bytes::from(b"value2".as_ref()))
);
db.put(b"key3", b"value3").await.unwrap();
assert_eq!(snapshot.get(b"key3").await.unwrap(), None);
assert_eq!(
db.get(b"key3").await.unwrap(),
Some(Bytes::from(b"value3".as_ref()))
);
}
#[tokio::test]
async fn test_recent_snapshot_min_seq_monotonic() {
use crate::oracle::Oracle;
let path = "/tmp/test_recent_snapshot_min_seq_monotonic";
let object_store = Arc::new(InMemory::new());
let settings = Settings {
l0_sst_size_bytes: 4 * 1024, max_unflushed_bytes: 2 * 1024, min_filter_keys: 0,
flush_interval: Some(Duration::from_millis(100)),
..Default::default()
};
let db = Db::builder(path, object_store)
.with_settings(settings)
.build()
.await
.unwrap();
{
let state = db.inner.state.read();
assert_eq!(state.state().core().recent_snapshot_min_seq, 0);
}
db.put(b"key1", b"value1").await.unwrap();
db.inner.flush_memtables(FlushTarget::All).await.unwrap();
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
assert!(
recent_min_seq > 0,
"recent_snapshot_min_seq should be > 0 after flush"
);
}
let _snapshot = db.snapshot().await.unwrap();
let snapshot_seq = db.inner.oracle.last_committed_seq();
db.put(b"key2", b"value2").await.unwrap();
db.inner.flush_memtables(FlushTarget::All).await.unwrap();
let min_active_seq = db.inner.snapshot_manager.min_active_seq();
assert!(min_active_seq.is_some());
assert_eq!(min_active_seq.unwrap(), snapshot_seq);
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
assert_eq!(
recent_min_seq,
min_active_seq.unwrap(),
"recent_snapshot_min_seq should equal snapshot_manager.min_active_seq() after flush"
);
}
drop(_snapshot);
db.put(b"key3", b"value3").await.unwrap();
db.inner.flush_memtables(FlushTarget::All).await.unwrap();
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
let last_l0_seq = state.state().core().last_l0_seq;
assert_eq!(
recent_min_seq, last_l0_seq,
"recent_snapshot_min_seq should equal last_l0_seq when no active snapshots"
);
assert!(recent_min_seq > snapshot_seq);
}
}
#[tokio::test]
async fn test_recent_snapshot_min_seq_uses_transaction_seq() {
let path = "/tmp/test_recent_snapshot_min_seq_uses_transaction_seq";
let object_store = Arc::new(InMemory::new());
let db = Db::builder(path, object_store).build().await.unwrap();
{
let state = db.inner.state.read();
assert_eq!(state.state().core().recent_snapshot_min_seq, 0);
}
db.put(b"key1", b"value1").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
let txn = db.begin(IsolationLevel::Snapshot).await.unwrap();
let txn_seq = txn.seqnum();
db.put(b"key2", b"value2").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
let min_active_seq = db.inner.txn_manager.min_active_seq();
assert_eq!(min_active_seq, Some(txn_seq));
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
assert_eq!(
recent_min_seq, txn_seq,
"recent_snapshot_min_seq should equal txn_manager.min_active_seq() after flush"
);
}
drop(txn);
db.put(b"key3", b"value3").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
let last_l0_seq = state.state().core().last_l0_seq;
assert_eq!(
recent_min_seq, last_l0_seq,
"recent_snapshot_min_seq should equal last_l0_seq when no active transactions"
);
assert!(recent_min_seq > txn_seq);
}
}
#[tokio::test]
async fn test_recent_snapshot_min_seq_prefers_snapshot_when_snapshot_seq_is_lower() {
let path = "/tmp/test_recent_snapshot_min_seq_prefers_snapshot_when_snapshot_seq_is_lower";
let object_store = Arc::new(InMemory::new());
let db = Db::builder(path, object_store).build().await.unwrap();
db.put(b"key1", b"value1").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
let snapshot = db.snapshot().await.unwrap();
let snapshot_seq = snapshot.seq();
db.put(b"key2", b"value2").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
let txn = db.begin(IsolationLevel::Snapshot).await.unwrap();
let txn_seq = txn.seqnum();
assert_eq!(
db.inner.snapshot_manager.min_active_seq(),
Some(snapshot_seq)
);
assert_eq!(db.inner.txn_manager.min_active_seq(), Some(txn_seq));
assert!(snapshot_seq < txn_seq);
db.put(b"key3", b"value3").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
assert_eq!(
recent_min_seq,
snapshot_seq,
"recent_snapshot_min_seq should use the snapshot seq when it is smaller than the transaction seq"
);
}
drop(snapshot);
db.put(b"key4", b"value4").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
assert_eq!(db.inner.snapshot_manager.min_active_seq(), None);
assert_eq!(db.inner.txn_manager.min_active_seq(), Some(txn_seq));
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
assert_eq!(
recent_min_seq,
txn_seq,
"recent_snapshot_min_seq should move to the transaction seq after the snapshot is dropped"
);
}
}
#[tokio::test]
async fn test_recent_snapshot_min_seq_prefers_transaction_when_transaction_seq_is_lower() {
let path =
"/tmp/test_recent_snapshot_min_seq_prefers_transaction_when_transaction_seq_is_lower";
let object_store = Arc::new(InMemory::new());
let db = Db::builder(path, object_store).build().await.unwrap();
db.put(b"key1", b"value1").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
let txn = db.begin(IsolationLevel::Snapshot).await.unwrap();
let txn_seq = txn.seqnum();
db.put(b"key2", b"value2").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
let snapshot = db.snapshot().await.unwrap();
let snapshot_seq = snapshot.seq();
assert_eq!(db.inner.txn_manager.min_active_seq(), Some(txn_seq));
assert_eq!(
db.inner.snapshot_manager.min_active_seq(),
Some(snapshot_seq)
);
assert!(txn_seq < snapshot_seq);
db.put(b"key3", b"value3").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
assert_eq!(
recent_min_seq,
txn_seq,
"recent_snapshot_min_seq should use the transaction seq when it is smaller than the snapshot seq"
);
}
drop(txn);
db.put(b"key4", b"value4").await.unwrap();
db.inner
.flush_memtables(crate::memtable_flusher::FlushTarget::All)
.await
.unwrap();
assert_eq!(db.inner.txn_manager.min_active_seq(), None);
assert_eq!(
db.inner.snapshot_manager.min_active_seq(),
Some(snapshot_seq)
);
{
let state = db.inner.state.read();
let recent_min_seq = state.state().core().recent_snapshot_min_seq;
assert_eq!(
recent_min_seq,
snapshot_seq,
"recent_snapshot_min_seq should move to the snapshot seq after the transaction is dropped"
);
}
}
#[tokio::test]
async fn test_memtable_flush_updates_last_remote_persisted_seq() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test";
let mut opts = test_db_options(0, 256, None);
opts.flush_interval = Some(Duration::MAX);
let db = Db::builder(path, object_store.clone())
.with_settings(opts)
.build()
.await
.unwrap();
let write_opts = WriteOptions {
await_durable: false,
};
db.put_with_options(&b"foo", &b"bar", &PutOptions::default(), &write_opts)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let v = db
.get_with_options(&b"foo", &ReadOptions::new().with_durability_filter(Memory))
.await
.unwrap();
assert_eq!(v, Some(Bytes::from(b"bar".as_ref())));
let v = db
.get_with_options(&b"foo", &ReadOptions::new().with_durability_filter(Remote))
.await
.unwrap();
assert_eq!(v, Some(Bytes::from(b"bar".as_ref())));
}
#[tokio::test]
async fn should_merge_operand_into_empty_key() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_merge_1", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
db.merge(b"key1", b"value1").await.unwrap();
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("value1")));
}
#[tokio::test]
async fn should_merge_multiple_operands_into_same_key() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_merge_2", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
db.merge(b"key1", b"a").await.unwrap();
db.merge(b"key1", b"b").await.unwrap();
db.merge(b"key1", b"c").await.unwrap();
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abc")));
}
#[tokio::test]
async fn should_persist_merge_operands_across_flush() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_merge_3", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
db.merge(b"key1", b"a").await.unwrap();
db.merge(b"key1", b"b").await.unwrap();
db.flush().await.unwrap();
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("ab")));
db.merge(b"key1", b"c").await.unwrap();
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abc")));
}
#[tokio::test]
async fn should_error_when_merging_without_merge_operator() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_merge_4", object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
db.merge(b"key1", b"value1").await.unwrap();
let result = db.get(b"key1").await;
assert!(
result.is_err(),
"Reading merge operand without merge operator should error"
);
match result {
Err(e) => {
let error_string = format!("{}", e);
assert!(
error_string.contains("merge operator missing")
|| error_string.contains("MergeOperatorMissing"),
"Error should be MergeOperatorMissing, got: {:?}",
e
);
}
Ok(_) => unreachable!("Should have errored"),
}
}
#[tokio::test]
async fn should_merge_operands_after_reopen() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_merge_5";
let db = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
db.merge(b"key1", b"a").await.unwrap();
db.merge(b"key1", b"b").await.unwrap();
db.flush().await.unwrap();
db.close().await.unwrap();
let db_reopened = Db::builder(path, object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let result = db_reopened.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("ab")));
db_reopened.merge(b"key1", b"c").await.unwrap();
let result = db_reopened.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abc")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_gc_race_deletes_l0_before_manifest_update() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_gc_race_deletes_l0_before_manifest_update");
let mut settings = test_db_options(0, 1024, None);
settings.flush_interval = None;
let db = Db::builder(path.clone(), object_store.clone())
.with_settings(settings)
.with_fp_registry(fp_registry.clone())
.build()
.await
.expect("failed to build DB");
let db = Arc::new(db);
fail_parallel::cfg(
fp_registry.clone(),
"after-flush-imm-to-l0-before-manifest",
"pause",
)
.expect("failed to set failpoint");
db.put_with_options(
b"key1",
b"value1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.expect("failed to put");
let this_db = db.clone();
let flush_handle = tokio::spawn(async move {
this_db
.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
});
let mut ssts = Vec::new();
for _ in 0..200 {
ssts = db
.inner
.table_store
.list_compacted_ssts(..)
.await
.expect("failed to list compacted ssts");
if !ssts.is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
ssts.len(),
1,
"expected exactly one L0 SST after GC, but found {:?}",
ssts.iter().map(|sst| sst.id).collect::<Vec<_>>()
);
let gc_options = GarbageCollectorOptions {
wal_options: Some(GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_millis(0),
}),
manifest_options: Some(GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_millis(0),
}),
compacted_options: Some(GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_millis(0),
}),
compactions_options: Some(GarbageCollectorDirectoryOptions {
interval: None,
min_age: Duration::from_millis(0),
}),
};
let gc = GarbageCollectorBuilder::new(path.clone(), object_store.clone())
.with_options(gc_options)
.with_system_clock(db.inner.system_clock.clone())
.build();
for _ in 0..5 {
gc.run_gc_once().await;
ssts = db
.inner
.table_store
.list_compacted_ssts(..)
.await
.expect("failed to list compacted ssts after manual GC");
if ssts.is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
ssts.len(),
1,
"expected exactly one L0 SST after GC, but found {:?}",
ssts.iter().map(|sst| sst.id).collect::<Vec<_>>()
);
fail_parallel::cfg(
fp_registry.clone(),
"after-flush-imm-to-l0-before-manifest",
"off",
)
.expect("failed to set failpoint");
flush_handle
.await
.expect("failed to join flush handle")
.expect("flush failed");
let manifest_store = ManifestStore::new(&path, object_store.clone());
let (_, manifest) = manifest_store
.read_latest_manifest()
.await
.expect("failed to read latest manifest");
assert_eq!(
manifest.core.l0.len(),
1,
"expected exactly one L0 SST in manifest"
);
let l0_id = manifest.core.l0[0].sst.id;
assert_eq!(
l0_id, ssts[0].id,
"expected SST {:?} but found SST {:?}",
ssts[0].id, l0_id,
);
let table_store = TableStore::new(
ObjectStores::new(object_store.clone(), None),
SsTableFormat::default(),
path.clone(),
None,
);
let compacted_ssts = table_store
.list_compacted_ssts(..)
.await
.expect("failed to list compacted ssts");
let still_exists = compacted_ssts.iter().any(|m| m.id == l0_id);
assert!(
still_exists,
"manifest references L0 SST {:?} that GC has already deleted",
l0_id
);
db.close().await.expect("failed to close DB");
}
#[tokio::test]
async fn test_write_handle() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_write_handle_db";
let clock = Arc::new(MockSystemClock::new());
let db = Db::builder(path, object_store)
.with_settings(test_db_options(0, 1024, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
let key = b"key1";
let value = b"value1";
clock.set(100);
let handle = db
.put_with_options(
key,
value,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(handle.seqnum(), 1);
assert_eq!(handle.create_ts(), 100);
clock.set(200);
let put_opts = PutOptions {
ttl: Ttl::ExpireAfter(1000),
};
let handle = db
.put_with_options(
b"key2",
b"value2",
&put_opts,
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(handle.seqnum(), 2);
assert_eq!(handle.create_ts(), 200);
clock.set(300);
let handle = db
.delete_with_options(
b"key1",
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(handle.seqnum(), 3);
assert_eq!(handle.create_ts(), 300);
clock.set(400);
let mut batch = WriteBatch::new();
batch.put(b"key3", b"value3");
batch.delete(b"key2");
let handle = db
.write_with_options(
batch,
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(handle.seqnum(), 4);
assert_eq!(handle.create_ts(), 400);
}
#[tokio::test]
async fn test_write_handle_with_batch() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_write_batch_handle";
let clock = Arc::new(MockSystemClock::new());
let db = Db::builder(path, object_store)
.with_settings(test_db_options(0, 1024, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
clock.set(100);
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.delete(b"key2");
let handle = db
.write_with_options(
batch,
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(handle.seqnum(), 1);
assert_eq!(handle.create_ts(), 100);
clock.set(200);
let mut batch = WriteBatch::new();
batch.put(b"key3", b"value3");
batch.put(b"key4", b"value4");
let handle = db
.write_with_options(
batch,
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(handle.seqnum(), 2);
assert_eq!(handle.create_ts(), 200);
clock.set(300);
let mut batch = WriteBatch::new();
batch.delete(b"key1");
let handle = db
.write_with_options(
batch,
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(handle.seqnum(), 3);
assert_eq!(handle.create_ts(), 300);
}
#[tokio::test]
async fn test_write_with_options_empty_batch_returns_empty_batch_error() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_write_with_options_empty_batch", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let err = db
.inner
.write_with_options(
WriteBatch::new(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap_err();
assert!(matches!(err, SlateDBError::EmptyBatch));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_txn_conflict_when_first_commit_paused_post_commit() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_txn_conflict_post_commit_pause", object_store)
.with_settings(test_db_options(0, 1024, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let txn1 = db
.begin(IsolationLevel::SerializableSnapshot)
.await
.unwrap();
txn1.put(b"k1", b"v1").unwrap();
fail_parallel::cfg(fp_registry.clone(), "write-batch-post-commit", "pause").unwrap();
let txn1_start_seq = txn1.seqnum();
let txn1_commit_task = tokio::spawn(async move { txn1.commit().await });
let pause_reached = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let txn1_removed_from_active = db.inner.txn_manager.min_active_seq().is_none();
if txn1_removed_from_active {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.is_ok();
if !pause_reached {
fail_parallel::cfg(fp_registry.clone(), "write-batch-post-commit", "off").unwrap();
let _ = txn1_commit_task.await;
panic!("txn1 did not pause at write-batch-post-commit");
}
let txn_dropped = db
.begin(IsolationLevel::SerializableSnapshot)
.await
.unwrap();
drop(txn_dropped);
let txn2 = db
.begin(IsolationLevel::SerializableSnapshot)
.await
.unwrap();
assert_eq!(
txn2.get(b"k1").await.unwrap(),
Some(Bytes::from_static(b"v1"))
);
fail_parallel::cfg(fp_registry.clone(), "write-batch-post-commit", "off").unwrap();
let _ = txn1_commit_task
.await
.expect("failed to join txn1 commit task")
.expect("txn1 commit should succeed");
assert_eq!(
txn2.seqnum(),
txn1_start_seq + 1, "txn2 should see the commit seqnum after txn1's commit"
);
txn2.put(b"k1", b"v2").unwrap();
txn2.put(b"k2", b"v2").unwrap();
assert!(txn2.commit().await.is_ok());
assert_eq!(
db.get(b"k1").await.unwrap(),
Some(Bytes::from_static(b"v2"))
);
db.close().await.unwrap();
}
#[tokio::test]
async fn should_notify_seq_watcher_on_wal_flush() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_watch_wal", object_store)
.build()
.await
.unwrap();
let mut watcher = db.subscribe();
db.put(b"key1", b"value1").await.unwrap();
db.put(b"key2", b"value2").await.unwrap();
db.put(b"key3", b"value3").await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
.unwrap();
let status = tokio::time::timeout(
Duration::from_secs(10),
watcher.wait_for(|s| s.durable_seq >= 3),
)
.await
.expect("timed out waiting for seq update")
.expect("watch channel closed")
.clone();
assert!(
status.durable_seq >= 3,
"expected durable seq >= 3, got {}",
status.durable_seq
);
db.close().await.unwrap();
}
#[tokio::test]
async fn should_subscribe_to_current_manifest_updates_after_flush() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_watch_current_manifest");
let db = Db::builder(path.clone(), object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let mut watcher = db.subscribe();
let manifest_store = Arc::new(ManifestStore::new(&path, object_store));
let mut stored_manifest =
StoredManifest::load(manifest_store, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
assert_eq!(watcher.borrow().current_manifest.manifest, db.manifest());
db.put(b"key1", b"value1").await.unwrap();
db.put(b"key2", b"value2").await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
wait_for_manifest_condition(
&mut stored_manifest,
|manifest| manifest.last_l0_seq >= 2,
Duration::from_secs(10),
)
.await;
let status = tokio::time::timeout(
Duration::from_secs(10),
watcher
.wait_for(|s| s.current_manifest.manifest.last_l0_seq >= 2 && s.durable_seq >= 2),
)
.await
.expect("timed out waiting for manifest update")
.expect("watch channel closed")
.clone();
assert!(status.durable_seq >= 2);
assert_eq!(status.current_manifest.manifest.last_l0_seq, 2);
db.close().await.unwrap();
}
#[tokio::test]
async fn should_publish_remote_manifest_updates_via_poll() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_watch_remote_manifest");
let db = Db::builder(path.clone(), object_store.clone())
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let mut watcher = db.subscribe();
let initial_checkpoint_count = watcher.borrow().current_manifest.manifest.checkpoints.len();
let manifest_store = Arc::new(ManifestStore::new(&path, object_store));
let mut stored_manifest =
StoredManifest::load(manifest_store, Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
stored_manifest
.write_checkpoint(uuid::Uuid::new_v4(), &CheckpointOptions::default())
.await
.unwrap();
wait_for_manifest_condition(
&mut stored_manifest,
|manifest| manifest.checkpoints.len() > initial_checkpoint_count,
Duration::from_secs(10),
)
.await;
let status = tokio::time::timeout(
Duration::from_secs(10),
watcher.wait_for(|s| {
s.current_manifest.manifest.checkpoints.len() > initial_checkpoint_count
}),
)
.await
.expect("timed out waiting for remote manifest update")
.expect("watch channel closed")
.clone();
assert_eq!(
status.current_manifest.manifest.checkpoints.len(),
initial_checkpoint_count + 1
);
db.close().await.unwrap();
}
#[tokio::test]
async fn should_close_watcher_on_db_drop() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_watch_drop", object_store)
.build()
.await
.unwrap();
let mut watcher = db.subscribe();
db.close().await.unwrap();
let status = watcher
.wait_for(|s| s.close_reason.is_some())
.await
.expect("watch channel closed")
.clone();
assert_eq!(
status.close_reason,
Some(CloseReason::Clean),
"expected close_reason = Clean after db close",
);
drop(db);
let result = watcher.changed().await;
assert!(
result.is_err(),
"expected watch channel closed after db drop, got Ok",
);
}
#[tokio::test]
async fn should_report_close_reason_clean_on_db_close() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_close_reason_clean", object_store)
.build()
.await
.unwrap();
let mut watcher = db.subscribe();
db.close().await.unwrap();
let status = watcher
.wait_for(|s| s.close_reason.is_some())
.await
.expect("watch channel closed")
.clone();
assert_eq!(status.close_reason, Some(CloseReason::Clean));
}
#[tokio::test]
async fn should_report_close_reason_panic_on_background_task_failure() {
let fp_registry = Arc::new(FailPointRegistry::new());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_close_reason_panic", object_store)
.with_settings(test_db_options(0, 128, None))
.with_fp_registry(fp_registry.clone())
.build()
.await
.unwrap();
let mut watcher = db.subscribe();
fail_parallel::cfg(fp_registry.clone(), "write-wal-sst-io-error", "panic").unwrap();
let _ = db.put(b"foo", b"bar").await;
let status = tokio::time::timeout(
Duration::from_secs(10),
watcher.wait_for(|s| s.close_reason.is_some()),
)
.await
.expect("timed out waiting for close reason")
.expect("watch channel closed")
.clone();
assert_eq!(status.close_reason, Some(CloseReason::Panic));
}
#[tokio::test]
async fn should_report_close_reason_fenced_on_fenced_error() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let db = Db::builder("/tmp/test_close_reason_fenced", object_store)
.with_settings(test_db_options(0, 1024, None))
.build()
.await
.unwrap();
let mut watcher = db.subscribe();
db.inner
.status_manager
.write_result(Err(crate::error::SlateDBError::Fenced));
let status = tokio::time::timeout(
Duration::from_secs(10),
watcher.wait_for(|s| s.close_reason.is_some()),
)
.await
.expect("timed out waiting for close reason")
.expect("watch channel closed")
.clone();
assert_eq!(status.close_reason, Some(CloseReason::Fenced));
}
#[cfg(feature = "wal_disable")]
#[tokio::test]
async fn should_notify_seq_watcher_on_l0_flush_when_wal_disabled() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let mut options = test_db_options(0, 256, None);
options.wal_enabled = false;
let db = Db::builder("/tmp/test_watch_l0", object_store)
.with_settings(options)
.build()
.await
.unwrap();
let mut watcher = db.subscribe();
db.put_with_options(
b"key1",
b"value1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
b"key2",
b"value2",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let status = tokio::time::timeout(
Duration::from_secs(10),
watcher.wait_for(|s| s.durable_seq >= 2),
)
.await
.expect("timed out waiting for seq update")
.expect("watch channel closed")
.clone();
assert!(
status.durable_seq >= 2,
"expected durable seq >= 2, got {}",
status.durable_seq
);
db.close().await.unwrap();
}
#[cfg(feature = "wal_disable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reads_succeed_when_compacted_sr_splits_same_key_across_ssts() {
use crate::SstBlockSize;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_merge_split_sr_repro";
let should_compact = Arc::new(AtomicBool::new(false));
let should_compact2 = should_compact.clone();
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
move |_state| {
let result = should_compact2
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.unwrap_or(false);
if result {
info!("TRIGGER COMPACT");
}
result
},
)));
let mut settings = test_db_options(
0,
128,
Some(CompactorOptions {
poll_interval: Duration::from_millis(20),
max_sst_size: 128,
max_concurrent_compactions: 1,
manifest_update_timeout: Duration::from_secs(300),
..Default::default()
}),
);
settings.l0_max_ssts = 10_000;
settings.flush_interval = None;
settings.wal_enabled = false;
let compactor_options = settings.compactor_options.take().unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(settings)
.with_sst_block_size(SstBlockSize::Other(64))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.with_compactor_builder(
CompactorBuilder::new(path, object_store.clone())
.with_scheduler_supplier(compaction_scheduler)
.with_options(compactor_options),
)
.build()
.await
.unwrap();
db.put_with_options(
b"k",
b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa0",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let _snapshot = db.snapshot().await.unwrap();
for i in 0..16u16 {
let val = format!("{}{}", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", i + 1);
db.put_with_options(
b"k",
val.as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
}
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(60), async {
loop {
should_compact.store(true, Ordering::SeqCst);
{
let state = db.inner.state.read();
info!(
"l0: {:?}",
state
.state()
.core()
.l0
.iter()
.map(|t| t.estimate_size())
.collect::<Vec<_>>()
);
info!(
"compacted: {:?}",
state
.state()
.core()
.compacted
.iter()
.map(|t| t.estimate_size())
.collect::<Vec<_>>()
);
if state
.state()
.core()
.compacted
.first()
.is_some_and(|sr| sr.sst_views.len() > 1)
{
break;
}
}
tokio::time::sleep(Duration::from_millis(3000)).await;
}
})
.await
.expect("timed out waiting for compacted SR where one key spans multiple SSTs");
let data = db.get(b"k").await.unwrap().unwrap();
let expected = Bytes::from(b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa16".as_ref());
let data_scan = db
.scan(b"k".as_slice()..)
.await
.unwrap()
.next()
.await
.unwrap()
.unwrap();
info!("data: {:?}", data);
info!("data (scan): {:?}", data_scan.value);
assert_eq!(data, expected);
assert_eq!(data_scan.value, expected);
}
#[cfg(feature = "wal_disable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reads_succeed_when_compacted_sr_splits_same_merge_key_across_ssts() {
use crate::SstBlockSize;
use bytes::{BufMut as _, BytesMut};
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_merge_split_sr_repro";
let should_compact = Arc::new(AtomicBool::new(false));
let should_compact2 = should_compact.clone();
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
move |_state| {
let result = should_compact2
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.unwrap_or(false);
if result {
info!("TRIGGER COMPACT");
}
result
},
)));
let mut settings = test_db_options(
0,
128,
Some(CompactorOptions {
poll_interval: Duration::from_millis(20),
max_sst_size: 128,
max_concurrent_compactions: 1,
manifest_update_timeout: Duration::from_secs(300),
..Default::default()
}),
);
settings.l0_max_ssts = 10_000;
settings.flush_interval = None;
settings.wal_enabled = false;
let compactor_options = settings.compactor_options.take().unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(settings)
.with_sst_block_size(SstBlockSize::Other(64))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.with_compactor_builder(
CompactorBuilder::new(path, object_store.clone())
.with_scheduler_supplier(compaction_scheduler)
.with_options(compactor_options),
)
.build()
.await
.unwrap();
let mut expected = BytesMut::new();
db.put_with_options(
b"k",
b"base",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
expected.put(b"base".as_slice());
let _snapshot = db.snapshot().await.unwrap();
for i in 0..16u8 {
let operand = vec![b'a' + i; 32];
expected.put(operand.as_slice());
db.merge_with_options(
b"k",
operand,
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
}
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
tokio::time::timeout(Duration::from_secs(60), async {
loop {
should_compact.store(true, Ordering::SeqCst);
{
let state = db.inner.state.read();
info!(
"l0: {:?}",
state
.state()
.core()
.l0
.iter()
.map(|t| t.estimate_size())
.collect::<Vec<_>>()
);
info!(
"compacted: {:?}",
state
.state()
.core()
.compacted
.iter()
.map(|t| t.estimate_size())
.collect::<Vec<_>>()
);
if state
.state()
.core()
.compacted
.first()
.is_some_and(|sr| sr.sst_views.len() > 1)
{
break;
}
}
tokio::time::sleep(Duration::from_millis(3000)).await;
}
})
.await
.expect("timed out waiting for compacted SR where one key spans multiple SSTs");
let data = db.get(b"k").await.unwrap().unwrap();
let expected = expected.freeze();
let data_scan = db
.scan(b"k".as_slice()..)
.await
.unwrap()
.next()
.await
.unwrap()
.unwrap();
info!("data: {:?}", data);
info!("data (scan): {:?}", data_scan.value);
assert_eq!(data, expected);
assert_eq!(data_scan.value, expected);
}
#[tokio::test]
async fn test_get_key_value() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_get_key_value";
let clock = Arc::new(MockSystemClock::new());
let db = Db::builder(path, object_store)
.with_settings(test_db_options(0, 1024, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
clock.set(100);
let key = b"key1";
let value = b"value1";
db.put_with_options(
key,
value,
&PutOptions {
ttl: Ttl::ExpireAfter(50),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let kv = db.get_key_value(key).await.unwrap().unwrap();
assert_eq!(kv.key, Bytes::from_static(key));
assert_eq!(kv.value, Bytes::from_static(value));
assert_eq!(kv.seq, 1);
assert_eq!(kv.create_ts, 100);
assert_eq!(kv.expire_ts, Some(150));
}
#[tokio::test]
async fn test_scan_row_entry() {
use crate::types::ValueDeletable;
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_scan_row_entry";
let clock = Arc::new(MockSystemClock::new());
let db = Db::builder(path, object_store)
.with_settings(test_db_options(0, 1024, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
let put_opts = PutOptions {
ttl: Ttl::ExpireAfter(50),
};
let write_opts = WriteOptions {
await_durable: false,
};
clock.set(100);
db.put_with_options(b"key1", b"value1", &put_opts, &write_opts)
.await
.unwrap();
clock.set(110);
db.put_with_options(b"key2", b"value2", &put_opts, &write_opts)
.await
.unwrap();
clock.set(120);
db.put_with_options(b"key3", b"value3", &put_opts, &write_opts)
.await
.unwrap();
let mut iter = db.scan::<Bytes, _>(..).await.unwrap();
let row_entry1 = iter.next_entry().await.unwrap().unwrap();
assert_eq!(row_entry1.key, Bytes::from_static(b"key1"));
assert_eq!(
row_entry1.value,
ValueDeletable::Value(Bytes::from_static(b"value1"))
);
assert_eq!(row_entry1.seq, 1);
assert_eq!(row_entry1.create_ts, Some(100));
assert_eq!(row_entry1.expire_ts, Some(150));
let row_entry2 = iter.next_entry().await.unwrap().unwrap();
assert_eq!(row_entry2.key, Bytes::from_static(b"key2"));
assert_eq!(
row_entry2.value,
ValueDeletable::Value(Bytes::from_static(b"value2"))
);
assert_eq!(row_entry2.seq, 2);
assert_eq!(row_entry2.create_ts, Some(110));
assert_eq!(row_entry2.expire_ts, Some(160));
let row_entry3 = iter.next_entry().await.unwrap().unwrap();
assert_eq!(row_entry3.key, Bytes::from_static(b"key3"));
assert_eq!(
row_entry3.value,
ValueDeletable::Value(Bytes::from_static(b"value3"))
);
assert_eq!(row_entry3.seq, 3);
assert_eq!(row_entry3.create_ts, Some(120));
assert_eq!(row_entry3.expire_ts, Some(170));
assert!(iter.next_entry().await.unwrap().is_none());
}
#[tokio::test]
async fn should_get_key_value_with_expire_at() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_get_key_value_expire_at";
let clock = Arc::new(MockSystemClock::new());
let db = Db::builder(path, object_store)
.with_settings(test_db_options(0, 1024, None))
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
clock.set(100);
db.put_with_options(
b"key1",
b"value1",
&PutOptions {
ttl: Ttl::ExpireAt(500),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
clock.set(200);
db.put_with_options(
b"key2",
b"value2",
&PutOptions {
ttl: Ttl::ExpireAt(500),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let kv1 = db.get_key_value(b"key1").await.unwrap().unwrap();
assert_eq!(kv1.expire_ts, Some(500));
assert_eq!(kv1.create_ts, 100);
let kv2 = db.get_key_value(b"key2").await.unwrap().unwrap();
assert_eq!(kv2.expire_ts, Some(500));
assert_eq!(kv2.create_ts, 200);
}
#[tokio::test]
async fn test_should_record_scan_request_count() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = Db::builder("/tmp/test_should_record_scan_request_count", object_store)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
db.put(b"k1", b"v1").await.unwrap();
let mut iter = db.scan::<&[u8], _>(..).await.unwrap();
let _ = iter.next().await;
assert_eq!(
lookup_metric_with_labels(
&metrics_recorder,
crate::db_stats::REQUEST_COUNT,
&[("op", "scan")]
),
Some(1)
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_should_record_flush_request_count() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = Db::builder("/tmp/test_should_record_flush_request_count", object_store)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
db.put(b"k1", b"v1").await.unwrap();
db.flush().await.unwrap();
assert_eq!(
lookup_metric_with_labels(
&metrics_recorder,
crate::db_stats::REQUEST_COUNT,
&[("op", "flush")]
),
Some(1)
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_should_record_write_ops_and_batch_count() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = Db::builder(
"/tmp/test_should_record_write_ops_and_batch_count",
object_store,
)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
db.put(b"k1", b"v1").await.unwrap();
db.put(b"k2", b"v2").await.unwrap();
assert_eq!(
lookup_metric(&metrics_recorder, crate::db_stats::WRITE_OPS),
Some(2)
);
assert_eq!(
lookup_metric(&metrics_recorder, crate::db_stats::WRITE_BATCH_COUNT),
Some(2)
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_should_record_merge_operator_operands_on_flush_path_during_batch_write() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let path =
"/tmp/test_should_record_merge_operator_operands_on_flush_path_during_batch_write";
let mut options = test_db_options(0, 1024, None);
options.flush_interval = None;
options.max_unflushed_bytes = 1024 * 1024;
let db = Db::builder(path, object_store.clone())
.with_settings(options)
.with_metrics_recorder(metrics_recorder.clone())
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"a");
batch.merge(b"key1", b"b");
db.write_with_options(
batch,
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
assert_eq!(
lookup_merge_operator_operands(&metrics_recorder, MERGE_OPERATOR_READ_PATH),
Some(0)
);
assert_eq!(
lookup_merge_operator_operands(&metrics_recorder, MERGE_OPERATOR_FLUSH_PATH),
Some(3)
);
assert!(
lookup_merge_operator_operands(&metrics_recorder, MERGE_OPERATOR_COMPACT_PATH)
.is_none_or(|value| value == 0)
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_should_record_total_mem_size_bytes() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let mut opts = test_db_options(0, 1024, None);
opts.flush_interval = None;
opts.max_unflushed_bytes = 1024 * 1024;
let db = Db::builder("/tmp/test_should_record_total_mem_size_bytes", object_store)
.with_settings(opts)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
db.put_with_options(
b"k1",
b"v1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
b"k2",
b"v2",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let mem_size = lookup_metric(&metrics_recorder, crate::db_stats::TOTAL_MEM_SIZE_BYTES);
assert!(
mem_size.is_some_and(|v| v > 0),
"expected total_mem_size_bytes > 0, got {:?}",
mem_size
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_should_record_wal_buffer_estimated_bytes() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let mut opts = test_db_options(0, 1024, None);
opts.flush_interval = None;
let db = Db::builder(
"/tmp/test_should_record_wal_buffer_estimated_bytes",
object_store,
)
.with_settings(opts)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
db.put_with_options(
b"k1",
b"v1",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let estimated = lookup_metric(
&metrics_recorder,
crate::db_stats::WAL_BUFFER_ESTIMATED_BYTES,
);
assert!(
estimated.is_some_and(|v| v > 0),
"expected wal_buffer_estimated_bytes > 0, got {:?}",
estimated
);
db.close().await.unwrap();
}
#[tokio::test]
async fn test_should_record_l0_sst_count() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = Db::builder("/tmp/test_should_record_l0_sst_count", object_store)
.with_settings(test_db_options(0, 1024, None))
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
db.put(b"k1", b"v1").await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
let l0_count = lookup_metric(&metrics_recorder, crate::db_stats::L0_SST_COUNT);
assert!(
l0_count.is_some_and(|v| v > 0),
"expected l0_sst_count > 0, got {:?}",
l0_count
);
db.close().await.unwrap();
}
}