#![feature(sync_unsafe_cell)]
#![cfg_attr(target_arch = "aarch64", feature(integer_atomics))]
#![allow(dead_code)]
#![allow(unused)]
#![deny(unused_mut)]
#![deny(clippy::missing_safety_doc)]
#![deny(clippy::undocumented_unsafe_blocks)]
#![deny(clippy::question_mark_used)]
#![deny(clippy::needless_borrow)]
#![allow(bindings_with_variant_name)]
#[cfg(not(unix))]
compile_warn!("cesiumdb is not tested on windows");
#[cfg(not(target_pointer_width = "64"))]
compile_warn!("cesiumdb is not tested on 32-bit systems");
#[allow(unused)]
use std::sync::Arc;
use std::{
sync::atomic::{
AtomicU64,
Ordering,
},
thread,
time::Duration,
};
use bytes::Bytes;
use parking_lot::{
Mutex,
RwLock,
};
use crate::{
Batch::{
Delete,
DeleteNs,
Put,
PutNs,
},
errs::{
CesiumError,
CesiumError::MemtableError,
},
hlc::{
HLC,
HybridLogicalClock,
},
keypair::{
DEFAULT_NS,
KeyBytes,
ValueBytes,
},
state::{
DbStorageBuilder,
DbStorageState,
},
utils::Serializer,
};
pub mod autoconfig;
#[cfg(feature = "benchmarks")]
pub mod block;
#[cfg(not(feature = "benchmarks"))]
pub(crate) mod block;
mod block_alloc;
mod bloom;
#[cfg(feature = "benchmarks")]
pub mod compact;
#[cfg(not(feature = "benchmarks"))]
pub(crate) mod compact;
pub mod compaction;
pub mod errs;
mod hash;
pub mod hlc;
mod index;
pub(crate) mod io;
pub mod keypair;
pub mod levels;
pub(crate) mod manifest;
pub(crate) mod manifest_reader;
pub(crate) mod manifest_writer;
#[cfg(feature = "benchmarks")]
pub mod map;
#[cfg(not(feature = "benchmarks"))]
pub(crate) mod map;
pub mod memtable;
pub mod merge;
pub mod peek;
pub(crate) mod raw_entry;
pub mod segment;
mod segment_builder;
mod segment_iterator;
pub(crate) mod segment_reader;
#[cfg(feature = "benchmarks")]
pub mod segment_writer;
#[cfg(not(feature = "benchmarks"))]
pub(crate) mod segment_writer;
pub mod simd;
pub(crate) mod state;
mod stats;
pub mod utils;
pub mod version;
struct OwnedSegmentIterator {
reader: Option<segment_reader::SegmentReader>,
lower: std::ops::Bound<Bytes>,
upper: std::ops::Bound<Bytes>,
inner: Option<segment_iterator::SegmentScanIterator>,
}
impl OwnedSegmentIterator {
fn new(
reader: segment_reader::SegmentReader,
lower: std::ops::Bound<KeyBytes>,
upper: std::ops::Bound<KeyBytes>,
) -> Self {
use std::ops::Bound;
let lower_bound = match lower {
| Bound::Included(k) => Bound::Included(k.serialize()),
| Bound::Excluded(k) => Bound::Excluded(k.serialize()),
| Bound::Unbounded => Bound::Unbounded,
};
let upper_bound = match upper {
| Bound::Included(k) => Bound::Included(k.serialize()),
| Bound::Excluded(k) => Bound::Excluded(k.serialize()),
| Bound::Unbounded => Bound::Unbounded,
};
Self {
reader: Some(reader),
lower: lower_bound,
upper: upper_bound,
inner: None,
}
}
}
impl Iterator for OwnedSegmentIterator {
type Item = (KeyBytes, ValueBytes);
fn next(&mut self) -> Option<Self::Item> {
if self.inner.is_none() {
let reader = self.reader.take()?;
let lower_ref = match &self.lower {
| std::ops::Bound::Included(b) => std::ops::Bound::Included(&b[..]),
| std::ops::Bound::Excluded(b) => std::ops::Bound::Excluded(&b[..]),
| std::ops::Bound::Unbounded => std::ops::Bound::Unbounded,
};
let upper_ref = match &self.upper {
| std::ops::Bound::Included(b) => std::ops::Bound::Included(&b[..]),
| std::ops::Bound::Excluded(b) => std::ops::Bound::Excluded(&b[..]),
| std::ops::Bound::Unbounded => std::ops::Bound::Unbounded,
};
let iter = reader.scan(lower_ref, upper_ref);
self.inner = Some(iter);
}
loop {
match self.inner.as_mut()?.next()? {
| Ok(pair) => return Some(pair),
| Err(_) => continue, }
}
}
}
pub struct DbScanIterator {
inner: merge::MergeIterator<Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>>,
last_key: Option<(u64, Bytes)>, }
impl Iterator for DbScanIterator {
type Item = (Bytes, Bytes);
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.inner.next() {
| Some((key, value)) => {
let current_ns = key.ns();
let current_key = key.as_bytes();
if let Some((last_ns, ref last_key_bytes)) = self.last_key {
if last_ns == current_ns && last_key_bytes == ¤t_key {
continue;
}
}
self.last_key = Some((current_ns, current_key.clone()));
if value.is_tombstone() {
continue;
}
return Some((current_key, value.as_bytes()));
},
| None => return None,
}
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct ReadAmpStats {
pub total_gets: u64,
pub l0_segments_checked: u64,
pub ln_segments_checked: u64,
}
pub struct Db {
inner: Arc<DbInner>,
clock: Arc<dyn HLC>,
}
impl Db {
pub fn open(opts: DbOptions) -> Arc<Self> {
opts.build()
}
pub fn time(&self) -> u128 {
self.clock.time()
}
pub fn put_ns(&self, ns: u64, key: &[u8], value: &[u8]) -> Result<(), CesiumError> {
self.inner
.batch(&[PutNs(ns, key, value, self.clock.time())])
}
pub fn get_ns(&self, ns: u64, key: &[u8]) -> Result<Option<Bytes>, CesiumError> {
match self
.inner
.get(KeyBytes::new(ns, Bytes::copy_from_slice(key), 0))
{
| Ok(v) => match v {
| None => Ok(None),
| Some(v) => Ok(Some(v.as_bytes())),
},
| Err(e) => Err(e),
}
}
pub fn delete_ns(&self, ns: u64, key: &[u8]) -> Result<(), CesiumError> {
self.inner
.batch::<&[u8], &[u8]>(&[DeleteNs(ns, key, self.clock.time())])
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), CesiumError> {
self.put_ns(DEFAULT_NS, key, value)
}
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>, CesiumError> {
self.get_ns(DEFAULT_NS, key)
}
pub fn delete(&self, key: &[u8]) -> Result<(), CesiumError> {
self.delete_ns(DEFAULT_NS, key)
}
pub fn scan_ns(
&self,
ns: u64,
lower: std::ops::Bound<&[u8]>,
upper: std::ops::Bound<&[u8]>,
) -> DbScanIterator {
self.inner.scan(ns, lower, upper)
}
pub fn scan(
&self,
lower: std::ops::Bound<&[u8]>,
upper: std::ops::Bound<&[u8]>,
) -> DbScanIterator {
self.scan_ns(DEFAULT_NS, lower, upper)
}
pub fn batch<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&self,
ops: &[Batch<K, V>],
) -> Result<(), CesiumError> {
let _ops = ops
.iter()
.map(|b| match b {
| Put(k, v, ts) => PutNs(DEFAULT_NS, k, v, *ts),
| PutNs(ns, k, v, ts) => PutNs(*ns, k, v, *ts),
| Delete(k, ts) => DeleteNs(DEFAULT_NS, k, *ts),
| DeleteNs(ns, k, ts) => DeleteNs(*ns, k, *ts),
})
.collect::<Vec<_>>();
self.inner.batch(&_ops)
}
pub fn sync(&self) -> Result<(), CesiumError> {
self.inner.sync()
}
pub fn close(&self) -> Result<(), CesiumError> {
self.inner.state.lock().shutdown()
}
pub fn compact(&self) -> Result<(), CesiumError> {
let guard = self.inner.state.lock();
guard.compact();
Ok(())
}
pub fn compaction_stats(&self) -> Result<crate::compaction::CompactionStats, CesiumError> {
let guard = self.inner.state.lock();
match guard.compaction_stats() {
| Some(stats) => Ok(stats),
| None => Err(CesiumError::CompactionError(
crate::errs::CompactionError::NotInitialized,
)),
}
}
pub fn version_stats(&self) -> crate::version::VersionStats {
self.inner.version_stats()
}
pub fn read_amp_stats(&self) -> ReadAmpStats {
self.inner.read_amp_stats()
}
pub fn frozen_memtable_count(&self) -> usize {
self.inner.frozen_memtable_count()
}
}
#[repr(C)]
pub struct DbOptions {
engine_opts: DbStorageBuilder,
clock: Arc<dyn HLC>,
}
impl DbOptions {
pub fn new() -> Self {
Self {
engine_opts: DbStorageBuilder::default(),
clock: Arc::new(HybridLogicalClock::new()),
}
}
pub fn engine(&mut self, engine: DbStorageBuilder) -> &mut Self {
self.engine_opts = engine;
self
}
pub fn clock(&mut self, clock: Arc<dyn HLC>) -> &mut Self {
self.clock = clock;
self
}
pub fn data_dir(&mut self, path: std::path::PathBuf) -> &mut Self {
self.engine_opts = self.engine_opts.clone().base_path(path);
self
}
pub fn memtable_size(&mut self, size: u64) -> &mut Self {
self.engine_opts = self.engine_opts.clone().memtable_size(size);
self
}
pub fn target_segment_size(&mut self, size: u64) -> &mut Self {
self.engine_opts = self.engine_opts.clone().target_segment_size(size);
self
}
pub fn target_file_size_multiplier(&mut self, multiplier: u64) -> &mut Self {
let mut scheduler = self.engine_opts.scheduler_config.clone();
scheduler.target_file_size_multiplier = multiplier;
self.engine_opts = self.engine_opts.clone().scheduler_config(scheduler);
self
}
pub fn max_memtables(&mut self, count: u64) -> &mut Self {
self.engine_opts = self.engine_opts.clone().num_memtable_limit(count);
self
}
pub fn scheduler_config(&mut self, config: crate::compaction::SchedulerConfig) -> &mut Self {
self.engine_opts = self.engine_opts.clone().scheduler_config(config);
self
}
pub fn build(&self) -> Arc<Db> {
let mut builder = DbStorageBuilder::new()
.block_size(self.engine_opts.block_size)
.target_segment_size(self.engine_opts.target_segment_size)
.num_memtable_limit(self.engine_opts.num_memtable_limit)
.memtable_size(self.engine_opts.memtable_size)
.scheduler_config(self.engine_opts.scheduler_config.clone());
if let Some(ref path) = self.engine_opts.base_path {
builder = builder.base_path(path.clone());
}
let state = builder.build();
let num_read_threads = std::thread::available_parallelism()
.map(|n| (n.get() / 2).max(2))
.unwrap_or(4);
let read_pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_read_threads)
.thread_name(|i| format!("lsm-reader-{}", i))
.build()
.expect("failed to create read thread pool");
let (curr_memtable, version_manager) = {
let guard = state.lock();
(guard.current_memtable(), Arc::clone(&guard.version_manager))
};
let inner = DbInner {
state,
curr_memtable: RwLock::new(curr_memtable),
version_manager,
read_pool,
total_gets: AtomicU64::new(0),
l0_reads: AtomicU64::new(0),
ln_reads: AtomicU64::new(0),
};
Arc::new(Db {
inner: Arc::new(inner),
clock: self.clock.clone(),
})
}
}
impl Default for DbOptions {
fn default() -> Self {
Self::new()
}
}
#[repr(C)]
pub enum Batch<K: AsRef<[u8]>, V: AsRef<[u8]>> {
Put(K, V, u128),
Delete(K, u128),
PutNs(u64, K, V, u128),
DeleteNs(u64, K, u128),
}
#[repr(C)]
struct DbInner {
state: Mutex<DbStorageState>,
curr_memtable: RwLock<Arc<crate::memtable::Memtable>>,
version_manager: Arc<crate::version::VersionManager>,
read_pool: rayon::ThreadPool,
total_gets: AtomicU64,
l0_reads: AtomicU64,
ln_reads: AtomicU64,
}
impl DbInner {
fn get(&self, key: KeyBytes) -> Result<Option<ValueBytes>, CesiumError> {
self.total_gets.fetch_add(1, Ordering::Relaxed);
{
let mtable = self.curr_memtable.read().clone();
if let Some(val) = mtable.get(&key) {
if val.is_tombstone() {
return Ok(None);
}
return Ok(Some(val));
}
}
{
let guard = self.state.lock();
if let Some(val) = guard.get_from_frozen(&key) {
if val.is_tombstone() {
return Ok(None);
}
return Ok(Some(val));
}
}
{
use rayon::prelude::*;
use crate::utils::Serializer;
let guard = self.state.lock();
let version = guard.version_manager.current();
let key_bytes = key.serialize();
let key_prefix_lower = {
use bytes::{
BufMut,
BytesMut,
};
let mut bytes = BytesMut::with_capacity(8 + key.as_bytes().len() + 16);
bytes.put_u64_le(key.ns());
bytes.put_slice(key.as_bytes().as_ref());
bytes.put_u128_le(0); bytes.freeze()
};
let key_prefix_upper = {
use bytes::{
BufMut,
BytesMut,
};
let mut bytes = BytesMut::with_capacity(8 + key.as_bytes().len() + 16);
bytes.put_u64_le(key.ns());
bytes.put_slice(key.as_bytes().as_ref());
bytes.put_u128_le(u128::MAX); bytes.freeze()
};
let key_for_bloom = {
use bytes::{
BufMut,
BytesMut,
};
let mut bytes = BytesMut::with_capacity(8 + key.as_bytes().len());
bytes.put_u64_le(key.ns());
bytes.put_slice(key.as_bytes().as_ref());
bytes.freeze()
};
for segment in version.l0.iter().rev() {
let reader = match segment.reader() {
| Ok(r) => r,
| Err(_) => continue,
};
if !reader.may_contain(&key_for_bloom) {
continue;
}
self.l0_reads.fetch_add(1, Ordering::Relaxed);
use std::ops::Bound;
let mut scan_iter = reader.scan(
Bound::Included(key_prefix_lower.as_ref()),
Bound::Included(key_prefix_upper.as_ref()),
);
if let Some(Ok((_, val))) = scan_iter.next() {
if val.is_tombstone() {
return Ok(None);
}
return Ok(Some(val));
}
}
for level in &version.levels {
for segment in &level.segments {
if let Ok(reader) = segment.reader() {
if !reader.may_contain(&key_for_bloom) {
continue;
}
self.ln_reads.fetch_add(1, Ordering::Relaxed);
use std::ops::Bound;
let mut scan_iter = reader.scan(
Bound::Included(key_prefix_lower.as_ref()),
Bound::Included(key_prefix_upper.as_ref()),
);
if let Some(Ok((_, val))) = scan_iter.next() {
if val.is_tombstone() {
return Ok(None);
}
return Ok(Some(val));
}
}
}
}
}
Ok(None)
}
fn scan(
&self,
ns: u64,
lower: std::ops::Bound<&[u8]>,
upper: std::ops::Bound<&[u8]>,
) -> DbScanIterator {
use std::ops::Bound;
let lower_key = match lower {
| Bound::Included(k) => {
Bound::Included(KeyBytes::new(ns, Bytes::copy_from_slice(k), u128::MAX))
},
| Bound::Excluded(k) => {
Bound::Excluded(KeyBytes::new(ns, Bytes::copy_from_slice(k), u128::MAX))
},
| Bound::Unbounded => {
Bound::Included(KeyBytes::new(ns, Bytes::new(), u128::MAX))
},
};
let upper_key = match upper {
| Bound::Included(k) => {
Bound::Included(KeyBytes::new(ns, Bytes::copy_from_slice(k), 0))
},
| Bound::Excluded(k) => {
Bound::Excluded(KeyBytes::new(ns, Bytes::copy_from_slice(k), u128::MAX))
},
| Bound::Unbounded => {
Bound::Excluded(KeyBytes::new(ns + 1, Bytes::new(), u128::MAX))
},
};
let mut iters: Vec<Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>> = Vec::new();
{
let mtable = self.curr_memtable.read().clone();
let memtable_iter = mtable.scan(lower_key.clone(), upper_key.clone());
iters
.push(Box::new(memtable_iter)
as Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>);
}
{
let guard = self.state.lock();
let frozen = guard.frozen_memtables_for_scan();
for memtable in frozen.iter().rev() {
let iter = memtable.scan(lower_key.clone(), upper_key.clone());
iters.push(Box::new(iter) as Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>);
}
let version = guard.version_manager.current();
for segment in &version.l0 {
if let Ok(reader) = segment.reader() {
let owned_iter =
OwnedSegmentIterator::new(reader, lower_key.clone(), upper_key.clone());
iters.push(Box::new(owned_iter)
as Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>);
}
}
for level in &version.levels {
for segment in &level.segments {
if let Ok(reader) = segment.reader() {
let owned_iter =
OwnedSegmentIterator::new(reader, lower_key.clone(), upper_key.clone());
iters.push(Box::new(owned_iter)
as Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>);
}
}
}
}
let merge_iter = merge::MergeIterator::new(iters);
DbScanIterator {
inner: merge_iter,
last_key: None,
}
}
fn batch<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&self,
ops: &[Batch<K, V>],
) -> Result<(), CesiumError> {
let mut _batch = Vec::with_capacity(ops.len());
for b in ops.iter() {
match b {
| PutNs(ns, k, v, ts) => {
_batch.push((
KeyBytes::new(*ns, Bytes::copy_from_slice(k.as_ref()), *ts),
ValueBytes::new(*ns, Bytes::copy_from_slice(v.as_ref())),
));
},
| DeleteNs(ns, k, ts) => {
_batch.push((
KeyBytes::new(*ns, Bytes::copy_from_slice(k.as_ref()), *ts),
ValueBytes::new_tombstone(*ns),
));
},
| Put(k, v, ts) => {
_batch.push((
KeyBytes::new(DEFAULT_NS, Bytes::copy_from_slice(k.as_ref()), *ts),
ValueBytes::new(DEFAULT_NS, Bytes::copy_from_slice(v.as_ref())),
));
},
| Delete(k, ts) => {
_batch.push((
KeyBytes::new(DEFAULT_NS, Bytes::copy_from_slice(k.as_ref()), *ts),
ValueBytes::new_tombstone(DEFAULT_NS),
));
},
}
}
let mtable = self.curr_memtable.read().clone();
match mtable.put_batch(_batch.as_ref()) {
| Ok(written) if written == _batch.len() => {
Ok(())
},
| Ok(written) => {
let mut offset = written;
let mut last_attempted = mtable.clone();
while offset < _batch.len() {
self.wait_for_frozen_capacity();
let new_mtable = {
let mut guard = self.state.lock();
let current = guard.current_memtable();
if Arc::ptr_eq(&last_attempted, ¤t) {
guard.new_memtable();
}
let new = guard.current_memtable();
*self.curr_memtable.write() = new.clone();
new
};
match new_mtable.put_batch(&_batch[offset..]) {
| Ok(w) => {
offset += w;
if offset >= _batch.len() {
return Ok(());
}
last_attempted = new_mtable;
},
| Err(e) => {
use crate::errs::MemtableError as MtError;
if matches!(e, MtError::MemtableIsFrozen | MtError::DataExceedsMaximum)
{
last_attempted = new_mtable;
continue; }
return Err(MemtableError(e));
},
}
}
Ok(())
},
| Err(e) => {
use crate::errs::MemtableError as MtError;
match e {
| MtError::DataExceedsMaximum => {
let mut offset = 0usize;
let mut last_attempted = mtable.clone();
while offset < _batch.len() {
self.wait_for_frozen_capacity();
let new_mtable = {
let mut guard = self.state.lock();
let current = guard.current_memtable();
if Arc::ptr_eq(&last_attempted, ¤t) {
guard.new_memtable();
}
let new = guard.current_memtable();
*self.curr_memtable.write() = new.clone();
new
};
match new_mtable.put_batch(&_batch[offset..]) {
| Ok(w) => {
offset += w;
if offset >= _batch.len() {
return Ok(());
}
last_attempted = new_mtable;
},
| Err(e) => {
if matches!(
e,
MtError::MemtableIsFrozen | MtError::DataExceedsMaximum
) {
last_attempted = new_mtable;
continue;
}
return Err(MemtableError(e));
},
}
}
Ok(())
},
| MtError::MemtableIsFrozen => {
let new_mtable = {
let guard = self.state.lock();
let new = guard.current_memtable();
*self.curr_memtable.write() = new.clone();
new
};
match new_mtable.put_batch(_batch.as_ref()) {
| Ok(_) => Ok(()),
| Err(e) => Err(MemtableError(e)),
}
},
| _ => Err(MemtableError(e)),
}
},
}
}
fn sync(&self) -> Result<(), CesiumError> {
let mut guard = self.state.lock();
guard.sync()?;
let new_mtable = guard.current_memtable();
*self.curr_memtable.write() = new_mtable;
Ok(())
}
fn wait_for_frozen_capacity(&self) {
let limit = {
let guard = self.state.lock();
guard.memtable_limit()
};
if limit == 0 {
return;
}
loop {
let frozen = {
let guard = self.state.lock();
guard.frozen_count()
};
if frozen < limit as usize {
break;
}
thread::sleep(Duration::from_millis(1));
}
}
fn version_stats(&self) -> crate::version::VersionStats {
self.state.lock().version_stats()
}
fn read_amp_stats(&self) -> ReadAmpStats {
ReadAmpStats {
total_gets: self.total_gets.load(Ordering::Relaxed),
l0_segments_checked: self.l0_reads.load(Ordering::Relaxed),
ln_segments_checked: self.ln_reads.load(Ordering::Relaxed),
}
}
fn frozen_memtable_count(&self) -> usize {
self.state.lock().frozen_count()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::{
Batch::Put,
Db,
DbOptions,
};
const MAX_KEYS: u64 = 10_000;
fn db_builder() -> Arc<Db> {
Db::open(DbOptions::default())
}
#[test]
fn test_db_put() {
let db = db_builder();
let mut keypair_size = 0;
for i in 0..MAX_KEYS {
let key = format!("key-{}", i).into_bytes();
let val = format!("value-{}", i).into_bytes();
keypair_size += key.len() + val.len();
assert!(db.put(key.as_ref(), val.as_ref()).is_ok());
}
{
let guard = db.inner.state.lock();
assert!(
guard.current_memtable().size() > keypair_size as u64,
"the memtable must be bigger than the keypair size to ensure the keys are actually stored"
);
}
for i in 0..MAX_KEYS {
let key = format!("key-{}", i).into_bytes();
let val = format!("value-{}", i).into_bytes();
keypair_size += key.len() + val.len();
assert!(db.put(key.as_ref(), val.as_ref()).is_ok());
}
{
let guard = db.inner.state.lock();
assert!(
guard.current_memtable().size() > (keypair_size * 2) as u64,
"the memtable must be at least twice as big as before with the new versions"
);
}
}
#[test]
fn db_put_batch() {
let db = db_builder();
let mut keypair_size = 0;
for batch_size in [1, 10, 100].iter() {
let mut batch = Vec::with_capacity(*batch_size);
for i in 0..(*batch_size * 100) {
let key = format!("key-{}", i).into_bytes();
let val = format!("value-{}", i).into_bytes();
keypair_size += key.len() + val.len();
let op = Put(key, val.clone(), db.time());
batch.push(op)
}
assert!(db.batch(&batch).is_ok());
{
let guard = db.inner.state.lock();
assert!(
guard.current_memtable().size() > keypair_size as u64,
"the memtable must be bigger than the keypair size to ensure the keys are actually stored"
);
}
}
}
#[test]
fn test_db_get() {
let db = db_builder();
let result = db.get(b"nonexistent");
assert!(result.is_ok());
assert!(
result.unwrap().is_none(),
"get on empty db should return None"
);
let key = b"test-key";
let val = b"test-value";
assert!(db.put(key, val).is_ok());
let result = db.get(key);
assert!(result.is_ok());
let retrieved = result.unwrap();
assert!(
retrieved.is_some(),
"get should return Some for existing key"
);
assert_eq!(&retrieved.unwrap()[..], val, "retrieved value should match");
let result = db.get(b"different-key");
assert!(result.is_ok());
assert!(
result.unwrap().is_none(),
"get on non-existent key should return None"
);
}
#[test]
fn test_db_get_latest_version() {
let db = db_builder();
let key = b"versioned-key";
let val1 = b"value-1";
let val2 = b"value-2";
let val3 = b"value-3";
assert!(db.put(key, val1).is_ok());
assert!(db.put(key, val2).is_ok());
assert!(db.put(key, val3).is_ok());
let result = db.get(key);
assert!(result.is_ok());
let retrieved = result.unwrap();
assert!(retrieved.is_some());
assert_eq!(
&retrieved.unwrap()[..],
val3,
"get should return the latest value"
);
}
#[test]
fn test_db_delete() {
let db = db_builder();
let key = b"key-to-delete";
let val = b"value";
assert!(db.put(key, val).is_ok());
let result = db.get(key);
assert!(result.is_ok());
assert!(result.unwrap().is_some());
assert!(db.delete(key).is_ok());
let result = db.get(key);
assert!(result.is_ok());
assert!(result.unwrap().is_none(), "deleted key should return None");
}
#[test]
fn test_db_put_ns() {
let db = db_builder();
let ns1: u64 = 1;
let ns2: u64 = 2;
let key = b"same-key";
let val1 = b"value-in-ns1";
let val2 = b"value-in-ns2";
assert!(db.put_ns(ns1, key, val1).is_ok());
assert!(db.put_ns(ns2, key, val2).is_ok());
let result1 = db.get_ns(ns1, key);
assert!(result1.is_ok());
let value1 = result1.unwrap();
assert!(value1.is_some());
assert_eq!(&value1.unwrap()[..], val1);
let result2 = db.get_ns(ns2, key);
assert!(result2.is_ok());
let value2 = result2.unwrap();
assert!(value2.is_some());
assert_eq!(&value2.unwrap()[..], val2);
}
#[test]
fn test_db_get_ns() {
let db = db_builder();
let ns: u64 = 42;
let key = b"namespaced-key";
let val = b"namespaced-value";
let result = db.get_ns(ns, key);
assert!(result.is_ok());
assert!(result.unwrap().is_none());
assert!(db.put_ns(ns, key, val).is_ok());
let result = db.get_ns(ns, key);
assert!(result.is_ok());
let retrieved = result.unwrap();
assert!(retrieved.is_some());
assert_eq!(&retrieved.unwrap()[..], val);
let result = db.get(key);
assert!(result.is_ok());
assert!(
result.unwrap().is_none(),
"key should not exist in default namespace"
);
}
#[test]
fn test_db_delete_ns() {
let db = db_builder();
let ns: u64 = 10;
let key = b"key-to-delete";
let val = b"value";
assert!(db.put_ns(ns, key, val).is_ok());
let result = db.get_ns(ns, key);
assert!(result.is_ok());
assert!(result.unwrap().is_some());
assert!(db.delete_ns(ns, key).is_ok());
let result = db.get_ns(ns, key);
assert!(result.is_ok());
assert!(
result.unwrap().is_none(),
"deleted key in namespace should return None"
);
}
#[test]
fn test_db_options_default() {
let opts = DbOptions::default();
let db = Db::open(opts);
assert!(db.put(b"test", b"value").is_ok());
let result = db.get(b"test");
assert!(result.is_ok());
assert!(result.unwrap().is_some());
}
#[test]
fn test_db_time() {
let db = db_builder();
let time1 = db.time();
let time2 = db.time();
assert!(
time2 >= time1,
"clock should return monotonically increasing values"
);
}
#[test]
fn test_db_batch_mixed_operations() {
use crate::Batch::{
Delete,
DeleteNs,
PutNs,
};
let db = db_builder();
let ns: u64 = 5;
let batch = vec![
Put(b"key1".to_vec(), b"val1".to_vec(), db.time()),
PutNs(ns, b"key2".to_vec(), b"val2".to_vec(), db.time()),
Put(b"key3".to_vec(), b"val3".to_vec(), db.time()),
];
assert!(db.batch(&batch).is_ok());
assert!(db.get(b"key1").unwrap().is_some());
assert!(db.get_ns(ns, b"key2").unwrap().is_some());
assert!(db.get(b"key3").unwrap().is_some());
}
#[test]
fn test_db_empty_key() {
let db = db_builder();
let key = b"";
let val = b"empty-key-value";
assert!(db.put(key, val).is_ok());
let result = db.get(key);
assert!(result.is_ok());
assert!(result.unwrap().is_some());
}
#[test]
fn test_db_empty_value() {
let db = db_builder();
let key = b"key-with-empty-value";
let val = b"";
assert!(db.put(key, val).is_ok());
let result = db.get(key);
assert!(result.is_ok());
let retrieved = result.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().len(), 0);
}
#[test]
fn test_db_large_key_value() {
let db = db_builder();
let key = vec![b'k'; 1000];
let val = vec![b'v'; 10000];
assert!(db.put(&key, &val).is_ok());
let result = db.get(&key);
assert!(result.is_ok());
let retrieved = result.unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().len(), val.len());
}
#[test]
fn test_scan_empty_db() {
use std::ops::Bound;
let db = db_builder();
let results: Vec<_> = db.scan(Bound::Unbounded, Bound::Unbounded).collect();
assert_eq!(results.len(), 0);
}
#[test]
fn test_scan_single_memtable() {
use std::ops::Bound;
let db = db_builder();
db.put(b"a", b"1").unwrap();
db.put(b"b", b"2").unwrap();
db.put(b"c", b"3").unwrap();
let results: Vec<_> = db.scan(Bound::Unbounded, Bound::Unbounded).collect();
assert_eq!(results.len(), 3);
assert_eq!(&results[0].0[..], b"a");
assert_eq!(&results[0].1[..], b"1");
assert_eq!(&results[1].0[..], b"b");
assert_eq!(&results[1].1[..], b"2");
assert_eq!(&results[2].0[..], b"c");
assert_eq!(&results[2].1[..], b"3");
}
#[test]
fn test_scan_with_tombstones() {
use std::ops::Bound;
let db = db_builder();
db.put(b"a", b"1").unwrap();
db.put(b"b", b"2").unwrap();
db.put(b"c", b"3").unwrap();
db.delete(b"b").unwrap();
let results: Vec<_> = db.scan(Bound::Unbounded, Bound::Unbounded).collect();
assert_eq!(results.len(), 2, "Should filter out tombstone for 'b'");
assert_eq!(&results[0].0[..], b"a");
assert_eq!(&results[1].0[..], b"c");
}
#[test]
fn test_scan_unbounded() {
use std::ops::Bound;
let db = db_builder();
for i in 0..10 {
let key = format!("key-{:02}", i);
let val = format!("val-{:02}", i);
db.put(key.as_bytes(), val.as_bytes()).unwrap();
}
let results: Vec<_> = db.scan(Bound::Unbounded, Bound::Unbounded).collect();
assert_eq!(results.len(), 10);
}
#[test]
fn test_scan_bounded() {
use std::ops::Bound;
let db = db_builder();
db.put(b"a", b"1").unwrap();
db.put(b"b", b"2").unwrap();
db.put(b"c", b"3").unwrap();
db.put(b"d", b"4").unwrap();
db.put(b"e", b"5").unwrap();
let results: Vec<_> = db
.scan(Bound::Included(b"b"), Bound::Excluded(b"d"))
.collect();
assert_eq!(results.len(), 2);
assert_eq!(&results[0].0[..], b"b");
assert_eq!(&results[0].1[..], b"2");
assert_eq!(&results[1].0[..], b"c");
assert_eq!(&results[1].1[..], b"3");
}
#[test]
fn test_scan_namespace_isolation() {
use std::ops::Bound;
let db = db_builder();
db.put_ns(1, b"key", b"ns1-value").unwrap();
db.put_ns(2, b"key", b"ns2-value").unwrap();
db.put_ns(1, b"key2", b"ns1-value2").unwrap();
let results_ns1: Vec<_> = db.scan_ns(1, Bound::Unbounded, Bound::Unbounded).collect();
let results_ns2: Vec<_> = db.scan_ns(2, Bound::Unbounded, Bound::Unbounded).collect();
assert_eq!(results_ns1.len(), 2);
assert_eq!(results_ns2.len(), 1);
assert_eq!(&results_ns2[0].1[..], b"ns2-value");
}
#[test]
fn test_scan_across_frozen_memtables() {
use std::ops::Bound;
let db = db_builder();
db.put(b"a", b"1").unwrap();
db.put(b"b", b"2").unwrap();
db.sync().unwrap();
db.put(b"c", b"3").unwrap();
db.put(b"d", b"4").unwrap();
let results: Vec<_> = db.scan(Bound::Unbounded, Bound::Unbounded).collect();
assert_eq!(
results.len(),
4,
"Should scan across both frozen and current memtable"
);
assert_eq!(&results[0].0[..], b"a");
assert_eq!(&results[1].0[..], b"b");
assert_eq!(&results[2].0[..], b"c");
assert_eq!(&results[3].0[..], b"d");
}
#[test]
fn test_scan_mvcc_ordering() {
use std::ops::Bound;
let db = db_builder();
std::thread::sleep(std::time::Duration::from_millis(1));
db.put(b"key1", b"FIRST").unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
db.put(b"key1", b"SECOND").unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
db.put(b"key1", b"THIRD_NEWEST").unwrap();
let results: Vec<_> = db.scan(Bound::Unbounded, Bound::Unbounded).collect();
assert_eq!(results.len(), 1, "Should have 1 unique key");
assert_eq!(
&results[0].1[..],
b"THIRD_NEWEST",
"Should return the newest version"
);
}
#[test]
fn test_scan_with_mvcc() {
use std::ops::Bound;
let db = db_builder();
db.put(b"key1", b"v1").unwrap();
db.put(b"key2", b"v2").unwrap();
db.put(b"key1", b"v1-updated").unwrap();
let results: Vec<_> = db.scan(Bound::Unbounded, Bound::Unbounded).collect();
assert_eq!(results.len(), 2);
assert_eq!(&results[0].0[..], b"key1");
assert_eq!(&results[0].1[..], b"v1-updated");
assert_eq!(&results[1].0[..], b"key2");
assert_eq!(&results[1].1[..], b"v2");
}
}