mod batch;
pub mod bplustree;
mod cache;
mod checkpoint;
mod clock;
mod commit;
mod compaction;
mod comparator;
mod compression;
mod error;
mod iter;
mod levels;
mod lockfile;
mod lsm;
mod memtable;
mod snapshot;
mod sstable;
mod stall;
mod task;
mod transaction;
mod vfs;
mod vlog;
mod wal;
#[cfg(test)]
mod test;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::Arc;
pub use comparator::{BytewiseComparator, Comparator, InternalKeyComparator, TimestampComparator};
use sstable::bloom::LevelDBBloomFilter;
use crate::clock::{DefaultLogicalClock, LogicalClock};
pub use crate::error::{Error, Result};
pub use crate::lsm::{Tree, TreeBuilder};
pub use crate::transaction::{
Durability,
HistoryOptions,
Mode,
ReadOptions,
Transaction,
WriteOptions,
};
pub trait IntoBytes {
fn as_slice(&self) -> &[u8];
fn into_bytes(self) -> Value;
}
impl IntoBytes for &[u8] {
fn as_slice(&self) -> &[u8] {
self
}
fn into_bytes(self) -> Value {
self.to_vec()
}
}
impl<const N: usize> IntoBytes for &[u8; N] {
fn as_slice(&self) -> &[u8] {
&self[..]
}
fn into_bytes(self) -> Value {
self.to_vec()
}
}
impl IntoBytes for Vec<u8> {
fn as_slice(&self) -> &[u8] {
self.as_slice()
}
fn into_bytes(self) -> Value {
self
}
}
impl IntoBytes for &Vec<u8> {
fn as_slice(&self) -> &[u8] {
&self[..]
}
fn into_bytes(self) -> Value {
self.clone()
}
}
impl IntoBytes for &str {
fn as_slice(&self) -> &[u8] {
self.as_bytes()
}
fn into_bytes(self) -> Value {
self.as_bytes().to_vec()
}
}
impl IntoBytes for Box<[u8]> {
fn as_slice(&self) -> &[u8] {
self.as_ref()
}
fn into_bytes(self) -> Value {
self.into_vec()
}
}
pub type IterResult = Result<(Key, Option<Value>)>;
pub type Key = Vec<u8>;
pub type Value = Vec<u8>;
pub type Version = u64;
pub type KeysResult = Result<Key>;
pub type RangeResult = Result<(Key, Value)>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum VLogChecksumLevel {
#[default]
Disabled = 0,
Full = 1,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum WalRecoveryMode {
#[default]
TolerateCorruptedWithRepair,
AbsoluteConsistency,
}
#[derive(Clone)]
pub struct Options {
pub block_size: usize,
pub block_restart_interval: usize,
pub filter_policy: Option<Arc<dyn FilterPolicy>>,
pub comparator: Arc<dyn Comparator>,
pub(crate) internal_comparator: Arc<dyn Comparator>,
pub compression_per_level: Vec<CompressionType>,
pub(crate) block_cache: Arc<cache::BlockCache>,
pub path: PathBuf,
pub level_count: u8,
pub max_memtable_size: usize,
pub index_partition_size: usize,
pub vlog_max_file_size: u64,
pub vlog_checksum_verification: VLogChecksumLevel,
pub enable_vlog: bool,
pub vlog_value_threshold: usize,
pub enable_versioning: bool,
pub enable_versioned_index: bool,
pub versioned_history_retention_ns: u64,
pub(crate) clock: Arc<dyn LogicalClock>,
pub flush_on_close: bool,
pub wal_recovery_mode: WalRecoveryMode,
pub level0_max_files: usize,
pub max_bytes_for_level: u64,
pub level_multiplier: f64,
pub memtable_stall_threshold: usize,
pub l0_stall_threshold: usize,
}
impl Default for Options {
fn default() -> Self {
let bf = LevelDBBloomFilter::new(10);
let clock = Arc::new(DefaultLogicalClock::new());
let comparator: Arc<dyn Comparator> = Arc::new(crate::BytewiseComparator {});
let internal_comparator: Arc<dyn Comparator> =
Arc::new(InternalKeyComparator::new(Arc::clone(&comparator)));
Self {
block_size: 64 * 1024, block_restart_interval: 16,
comparator,
internal_comparator,
compression_per_level: Vec::new(),
filter_policy: Some(Arc::new(bf)),
block_cache: Arc::new(cache::BlockCache::with_capacity_bytes(1 << 20)), path: PathBuf::from(""),
level_count: 6,
max_memtable_size: 100 * 1024 * 1024, index_partition_size: 16384, vlog_max_file_size: 256 * 1024 * 1024, vlog_checksum_verification: VLogChecksumLevel::Disabled,
enable_vlog: false,
vlog_value_threshold: 1024, enable_versioning: false,
enable_versioned_index: false,
versioned_history_retention_ns: 0, clock,
flush_on_close: true,
wal_recovery_mode: WalRecoveryMode::default(),
level0_max_files: 4,
max_bytes_for_level: 256 * 1024 * 1024, level_multiplier: 10.0,
memtable_stall_threshold: 2,
l0_stall_threshold: 12,
}
}
}
impl Options {
pub fn new() -> Self {
Self::default()
}
pub const fn with_block_size(mut self, value: usize) -> Self {
self.block_size = value;
self
}
pub const fn with_block_restart_interval(mut self, value: usize) -> Self {
self.block_restart_interval = value;
self
}
pub fn with_filter_policy(mut self, value: Option<Arc<dyn FilterPolicy>>) -> Self {
self.filter_policy = value;
self
}
pub fn with_comparator(mut self, value: Arc<dyn Comparator>) -> Self {
self.internal_comparator = Arc::new(InternalKeyComparator::new(Arc::clone(&value)));
self.comparator = value;
self
}
pub fn without_compression(mut self) -> Self {
self.compression_per_level = Vec::new();
self
}
pub fn with_compression_per_level(mut self, levels: Vec<CompressionType>) -> Self {
self.compression_per_level = levels;
self
}
pub fn with_l0_no_compression(mut self) -> Self {
self.compression_per_level =
vec![CompressionType::None, CompressionType::SnappyCompression];
self
}
pub fn with_path(mut self, value: PathBuf) -> Self {
self.path = value;
self
}
pub const fn with_level_count(mut self, value: u8) -> Self {
self.level_count = value;
self
}
pub const fn with_max_memtable_size(mut self, value: usize) -> Self {
self.max_memtable_size = value;
self
}
pub fn with_block_cache_capacity(mut self, capacity_bytes: u64) -> Self {
self.block_cache = Arc::new(cache::BlockCache::with_capacity_bytes(capacity_bytes));
self
}
pub const fn with_index_partition_size(mut self, size: usize) -> Self {
self.index_partition_size = size;
self
}
pub const fn with_vlog_max_file_size(mut self, value: u64) -> Self {
self.vlog_max_file_size = value;
self
}
pub const fn with_vlog_checksum_verification(mut self, value: VLogChecksumLevel) -> Self {
self.vlog_checksum_verification = value;
self
}
pub const fn with_enable_vlog(mut self, value: bool) -> Self {
self.enable_vlog = value;
self
}
pub const fn with_vlog_value_threshold(mut self, value: usize) -> Self {
self.vlog_value_threshold = value;
self
}
pub fn with_versioning(mut self, value: bool, retention_ns: u64) -> Self {
self.enable_versioning = value;
self.versioned_history_retention_ns = retention_ns;
if value {
self.enable_vlog = true;
self.vlog_value_threshold = 0;
}
self
}
pub const fn with_versioned_index(mut self, value: bool) -> Self {
self.enable_versioned_index = value;
self
}
pub const fn with_flush_on_close(mut self, value: bool) -> Self {
self.flush_on_close = value;
self
}
pub const fn with_wal_recovery_mode(mut self, mode: WalRecoveryMode) -> Self {
self.wal_recovery_mode = mode;
self
}
pub const fn with_memtable_stall_threshold(mut self, value: usize) -> Self {
self.memtable_stall_threshold = value;
self
}
pub const fn with_l0_stall_threshold(mut self, value: usize) -> Self {
self.l0_stall_threshold = value;
self
}
pub(crate) fn manifest_file_path(&self, id: u64) -> PathBuf {
self.manifest_dir().join(format!("{id:020}.manifest"))
}
pub(crate) fn sstable_file_path(&self, id: u64) -> PathBuf {
self.sstable_dir().join(format!("{id:020}.sst"))
}
pub(crate) fn vlog_file_path(&self, id: u64) -> PathBuf {
self.vlog_dir().join(format!("{id:020}.vlog"))
}
pub(crate) fn wal_dir(&self) -> PathBuf {
self.path.join("wal")
}
pub(crate) fn sstable_dir(&self) -> PathBuf {
self.path.join("sstables")
}
pub(crate) fn vlog_dir(&self) -> PathBuf {
self.path.join("vlog")
}
pub(crate) fn manifest_dir(&self) -> PathBuf {
self.path.join("manifest")
}
pub(crate) fn versioned_index_dir(&self) -> PathBuf {
self.path.join("versioned_index")
}
pub(crate) fn is_vlog_filename(&self, filename: &str) -> bool {
filename.len() == 25
&& std::path::Path::new(filename)
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("vlog"))
}
pub(crate) fn extract_vlog_file_id(&self, filename: &str) -> Option<u32> {
if self.is_vlog_filename(filename) {
if let Some(id_part) = filename.strip_suffix(".vlog") {
if id_part.len() == 20 && id_part.chars().all(|c| c.is_ascii_digit()) {
return id_part.parse::<u32>().ok();
}
}
}
None
}
pub fn validate(&self) -> Result<()> {
if self.enable_versioning {
if !self.enable_vlog {
return Err(Error::InvalidArgument(
"Versioned queries require VLog to be enabled. Set enable_vlog to true."
.to_string(),
));
}
if self.vlog_value_threshold > 0 {
return Err(Error::InvalidArgument(
"Versioned queries require all values to be stored in VLog. Set vlog_value_threshold to 0.".to_string(),
));
}
}
if self.enable_versioned_index && !self.enable_versioning {
return Err(Error::InvalidArgument(
"Versioned index requires versioning to be enabled. Call with_versioning(true, retention_ns) first.".to_string(),
));
}
if self.level_count == 0 {
return Err(Error::InvalidArgument("Level count must be at least 1".to_string()));
}
if self.memtable_stall_threshold < 2 {
return Err(Error::InvalidArgument(
"memtable_stall_threshold must be >= 2".to_string(),
));
}
if self.l0_stall_threshold < self.level0_max_files {
return Err(Error::InvalidArgument(format!(
"l0_stall_threshold ({}) must be >= level0_max_files ({})",
self.l0_stall_threshold, self.level0_max_files
)));
}
Ok(())
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum CompressionType {
None = 0,
SnappyCompression = 1,
}
impl CompressionType {
pub const fn as_str(&self) -> &'static str {
match *self {
Self::None => "none",
Self::SnappyCompression => "snappy",
}
}
}
impl TryFrom<u8> for CompressionType {
type Error = Error;
fn try_from(byte: u8) -> Result<Self> {
match byte {
0 => Ok(Self::None),
1 => Ok(Self::SnappyCompression),
_ => Err(Error::Compression(format!("Unknown compression type: {}", byte))),
}
}
}
pub trait FilterPolicy: Send + Sync {
fn name(&self) -> &str;
fn may_contain(&self, filter: &[u8], key: &[u8]) -> bool;
fn create_filter(&self, keys: &[Vec<u8>]) -> Vec<u8>;
}
use std::ops::Bound;
pub(crate) type InternalKeyRangeBound = Bound<InternalKey>;
pub(crate) type InternalKeyRange = (InternalKeyRangeBound, InternalKeyRangeBound);
pub(crate) fn user_range_to_internal_range(
lower: Bound<&[u8]>,
upper: Bound<&[u8]>,
) -> InternalKeyRange {
let start_bound = match lower {
Bound::Unbounded => Bound::Unbounded,
Bound::Included(key) => Bound::Included(InternalKey::new(
key.into_bytes(),
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Max,
INTERNAL_KEY_TIMESTAMP_MAX,
)),
Bound::Excluded(key) => {
Bound::Excluded(InternalKey::new(key.into_bytes(), 0, InternalKeyKind::Set, 0))
}
};
let end_bound = match upper {
Bound::Unbounded => Bound::Unbounded,
Bound::Included(key) => {
Bound::Included(InternalKey::new(key.into_bytes(), 0, InternalKeyKind::Set, 0))
}
Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
key.into_bytes(),
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Max,
INTERNAL_KEY_TIMESTAMP_MAX,
)),
};
(start_bound, end_bound)
}
pub(crate) const INTERNAL_KEY_SEQ_NUM_MAX: u64 = (1 << 56) - 1;
pub(crate) const INTERNAL_KEY_TIMESTAMP_MAX: u64 = u64::MAX;
#[inline(always)]
fn read_u64_be(buffer: &[u8], offset: usize) -> u64 {
unsafe { u64::from_be_bytes(*(buffer.as_ptr().add(offset) as *const [u8; 8])) }
}
fn trailer_to_kind(trailer: u64) -> InternalKeyKind {
let kind_byte = trailer as u8;
match kind_byte {
0 => InternalKeyKind::Delete,
1 => InternalKeyKind::SoftDelete,
2 => InternalKeyKind::Set,
3 => InternalKeyKind::Merge,
4 => InternalKeyKind::LogData,
5 => InternalKeyKind::RangeDelete,
6 => InternalKeyKind::Replace,
7 => InternalKeyKind::Separator,
24 => InternalKeyKind::Max,
_ => InternalKeyKind::Invalid,
}
}
#[inline(always)]
fn trailer_to_seq_num(trailer: u64) -> u64 {
trailer >> 8
}
#[inline(always)]
fn is_delete_kind(kind: InternalKeyKind) -> bool {
matches!(
kind,
InternalKeyKind::Delete | InternalKeyKind::SoftDelete | InternalKeyKind::RangeDelete
)
}
#[inline(always)]
fn is_hard_delete_marker(kind: InternalKeyKind) -> bool {
matches!(kind, InternalKeyKind::Delete | InternalKeyKind::RangeDelete)
}
#[inline(always)]
fn is_replace_kind(kind: InternalKeyKind) -> bool {
matches!(kind, InternalKeyKind::Replace)
}
#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub enum InternalKeyKind {
Delete = 0,
SoftDelete = 1,
Set = 2,
Merge = 3,
LogData = 4,
RangeDelete = 5,
Replace = 6, Separator = 7,
Max = 24, Invalid = 191,
}
impl From<u8> for InternalKeyKind {
fn from(value: u8) -> Self {
trailer_to_kind(value as u64)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub(crate) struct InternalKey {
pub(crate) user_key: Key,
pub(crate) timestamp: u64,
pub(crate) trailer: u64,
}
impl InternalKey {
pub(crate) fn new(user_key: Key, seq_num: u64, kind: InternalKeyKind, timestamp: u64) -> Self {
Self {
user_key,
timestamp,
trailer: (seq_num << 8) | kind as u64,
}
}
pub(crate) fn size(&self) -> usize {
self.user_key.len() + 16 }
pub(crate) fn decode(encoded_key: &[u8]) -> Self {
let n = encoded_key.len() - 16; let trailer = read_u64_be(encoded_key, n);
let timestamp = read_u64_be(encoded_key, n + 8);
let user_key = encoded_key[..n].to_vec();
Self {
user_key,
timestamp,
trailer,
}
}
#[inline]
pub(crate) fn user_key_from_encoded(encoded: &[u8]) -> &[u8] {
&encoded[..encoded.len() - 16]
}
#[inline]
pub(crate) fn trailer_from_encoded(encoded: &[u8]) -> u64 {
let n = encoded.len() - 16;
read_u64_be(encoded, n)
}
#[inline]
pub(crate) fn seq_num_from_encoded(encoded: &[u8]) -> u64 {
trailer_to_seq_num(Self::trailer_from_encoded(encoded))
}
pub(crate) fn encode(&self) -> Vec<u8> {
let mut buf = self.user_key.clone();
buf.extend_from_slice(&self.trailer.to_be_bytes());
buf.extend_from_slice(&self.timestamp.to_be_bytes());
buf
}
#[inline]
pub(crate) fn seq_num(&self) -> u64 {
trailer_to_seq_num(self.trailer)
}
pub(crate) fn kind(&self) -> InternalKeyKind {
trailer_to_kind(self.trailer)
}
#[inline]
pub(crate) fn is_tombstone(&self) -> bool {
is_delete_kind(self.kind())
}
pub(crate) fn is_hard_delete_marker(&self) -> bool {
is_hard_delete_marker(self.kind())
}
pub(crate) fn is_replace(&self) -> bool {
is_replace_kind(self.kind())
}
pub(crate) fn cmp_by_timestamp(&self, other: &Self) -> Ordering {
match self.user_key.cmp(&other.user_key) {
Ordering::Equal => other.timestamp.cmp(&self.timestamp),
ordering => ordering,
}
}
}
impl Ord for InternalKey {
fn cmp(&self, other: &Self) -> Ordering {
match self.user_key.cmp(&other.user_key) {
Ordering::Equal => match other.seq_num().cmp(&self.seq_num()) {
Ordering::Equal => match self.kind().cmp(&other.kind()) {
Ordering::Equal => other.timestamp.cmp(&self.timestamp), ord => ord,
},
ord => ord,
},
ord => ord,
}
}
}
impl PartialOrd for InternalKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Clone, Copy)]
pub struct InternalKeyRef<'a> {
encoded: &'a [u8],
}
impl<'a> InternalKeyRef<'a> {
#[inline]
pub fn from_encoded(encoded: &'a [u8]) -> Self {
debug_assert!(encoded.len() >= 16);
Self {
encoded,
}
}
#[inline]
pub fn user_key(&self) -> &'a [u8] {
InternalKey::user_key_from_encoded(self.encoded)
}
#[inline]
pub fn encoded(&self) -> &'a [u8] {
self.encoded
}
#[inline]
pub fn trailer(&self) -> u64 {
InternalKey::trailer_from_encoded(self.encoded)
}
#[inline]
pub fn seq_num(&self) -> u64 {
InternalKey::seq_num_from_encoded(self.encoded)
}
#[inline]
pub fn timestamp(&self) -> u64 {
let n = self.encoded.len() - 8;
read_u64_be(self.encoded, n)
}
#[inline]
pub fn kind(&self) -> InternalKeyKind {
trailer_to_kind(self.trailer())
}
#[inline]
pub fn is_tombstone(&self) -> bool {
is_delete_kind(self.kind())
}
#[inline]
pub fn is_hard_delete_marker(&self) -> bool {
is_hard_delete_marker(self.kind())
}
#[inline]
pub fn is_replace(&self) -> bool {
is_replace_kind(self.kind())
}
pub(crate) fn to_owned(self) -> InternalKey {
InternalKey::decode(self.encoded)
}
}
impl PartialEq for InternalKeyRef<'_> {
fn eq(&self, other: &Self) -> bool {
self.encoded == other.encoded
}
}
impl Eq for InternalKeyRef<'_> {}
impl std::fmt::Debug for InternalKeyRef<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InternalKeyRef")
.field("user_key", &self.user_key())
.field("seq_num", &self.seq_num())
.field("kind", &self.kind())
.field("timestamp", &self.timestamp())
.finish()
}
}
pub trait LSMIterator {
fn seek(&mut self, target: &[u8]) -> Result<bool>;
fn seek_first(&mut self) -> Result<bool>;
fn seek_last(&mut self) -> Result<bool>;
fn next(&mut self) -> Result<bool>;
fn prev(&mut self) -> Result<bool>;
fn valid(&self) -> bool;
fn key(&self) -> InternalKeyRef<'_>;
fn value_encoded(&self) -> Result<&[u8]>;
fn value(&self) -> Result<Value> {
Ok(self.value_encoded()?.to_vec())
}
}