use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use dashmap::DashMap;
use smallvec::SmallVec;
use crossbeam_skiplist::SkipMap;
use crate::deferred_index::{DeferredSortedIndex, DeferredIndexConfig};
use crate::group_commit::EventDrivenGroupCommit;
use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
use sochdb_core::{Result, SochDBError};
#[derive(Clone, Debug)]
pub struct SsiBloomFilter {
bits: Option<Vec<u64>>,
expected_capacity: usize,
num_hashes: u32,
}
impl SsiBloomFilter {
const BITS_PER_ITEM: f64 = 9.6;
const DEFAULT_NUM_HASHES: u32 = 7;
const MIN_CAPACITY: usize = 64;
#[inline]
pub fn new(expected_items: usize) -> Self {
Self {
bits: None,
expected_capacity: expected_items.max(Self::MIN_CAPACITY),
num_hashes: Self::DEFAULT_NUM_HASHES,
}
}
pub fn with_word_capacity(words: usize) -> Self {
Self {
bits: None,
expected_capacity: words.max(1) * 64 / 10, num_hashes: Self::DEFAULT_NUM_HASHES,
}
}
#[inline]
fn ensure_allocated(&mut self) {
if self.bits.is_none() {
let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
let num_words = num_bits.div_ceil(64);
self.bits = Some(vec![0u64; num_words]);
}
}
#[inline]
pub fn insert(&mut self, key: &[u8]) {
self.ensure_allocated();
let bits = self.bits.as_mut().unwrap();
let num_bits = bits.len() * 64;
if num_bits == 0 {
return;
}
let h1 = Self::hash1(key);
let h2 = Self::hash2(key);
for i in 0..self.num_hashes {
let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
let bit_idx = (h as usize) % num_bits;
let word_idx = bit_idx / 64;
let bit_pos = bit_idx % 64;
bits[word_idx] |= 1 << bit_pos;
}
}
#[inline]
pub fn may_contain(&self, key: &[u8]) -> bool {
let bits = match &self.bits {
Some(b) => b,
None => return false, };
let num_bits = bits.len() * 64;
if num_bits == 0 {
return false;
}
let h1 = Self::hash1(key);
let h2 = Self::hash2(key);
for i in 0..self.num_hashes {
let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
let bit_idx = (h as usize) % num_bits;
let word_idx = bit_idx / 64;
let bit_pos = bit_idx % 64;
if bits[word_idx] & (1 << bit_pos) == 0 {
return false; }
}
true }
#[inline]
pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
let (self_bits, other_bits) = match (&self.bits, &other.bits) {
(Some(s), Some(o)) => (s, o),
_ => return false, };
let min_len = self_bits.len().min(other_bits.len());
for i in 0..min_len {
if self_bits[i] & other_bits[i] != 0 {
return true; }
}
false }
#[inline]
fn hash1(key: &[u8]) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
hasher.finish()
}
#[inline]
fn hash2(key: &[u8]) -> u64 {
twox_hash::xxh3::hash64(key)
}
pub fn size_bytes(&self) -> usize {
self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
}
pub fn is_empty(&self) -> bool {
match &self.bits {
Some(bits) => bits.iter().all(|&w| w == 0),
None => true,
}
}
}
pub type InlineKey = SmallVec<[u8; 32]>;
#[derive(Debug, Clone)]
pub struct Version {
pub value: Option<Vec<u8>>,
pub txn_id: u64,
pub commit_ts: u64,
}
#[derive(Debug, Default)]
pub struct VersionChain {
committed: Vec<Version>,
uncommitted: Option<Version>,
}
impl VersionChain {
#[inline]
pub fn new() -> Self {
Self {
committed: Vec::new(),
uncommitted: None,
}
}
#[inline]
pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
match &mut self.uncommitted {
Some(v) if v.txn_id == txn_id => {
v.value = value;
}
Some(_) => {
self.uncommitted = Some(Version {
value,
txn_id,
commit_ts: 0,
});
}
None => {
self.uncommitted = Some(Version {
value,
txn_id,
commit_ts: 0,
});
}
}
}
pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
if let Some(ref mut v) = self.uncommitted {
if v.txn_id == txn_id && v.commit_ts == 0 {
v.commit_ts = commit_ts;
let committed_version = self.uncommitted.take().unwrap();
let insert_pos = self.committed.partition_point(|existing| existing.commit_ts > commit_ts);
self.committed.insert(insert_pos, committed_version);
return true;
}
}
false
}
#[inline]
pub fn abort(&mut self, txn_id: u64) {
if let Some(ref v) = self.uncommitted {
if v.txn_id == txn_id {
self.uncommitted = None;
}
}
}
#[inline]
pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
if let Some(txn_id) = current_txn_id {
if let Some(ref v) = self.uncommitted {
if v.txn_id == txn_id {
return Some(v);
}
}
}
let idx = self.committed.partition_point(|v| v.commit_ts >= snapshot_ts);
self.committed.get(idx)
}
#[inline]
pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
if let Some(ref v) = self.uncommitted {
return v.txn_id != my_txn_id;
}
false
}
pub fn gc(&mut self, min_active_ts: u64) {
if self.committed.len() <= 1 {
return;
}
let split_idx = self.committed.partition_point(|v| v.commit_ts > min_active_ts);
let keep_count = if split_idx < self.committed.len() {
split_idx + 1 } else {
split_idx
};
self.committed.truncate(keep_count);
}
#[inline]
pub fn version_count(&self) -> usize {
self.committed.len() + if self.uncommitted.is_some() { 1 } else { 0 }
}
#[cfg(test)]
pub fn versions(&self) -> Vec<Version> {
let mut result = self.committed.clone();
if let Some(ref v) = self.uncommitted {
result.push(v.clone());
}
result
}
}
const WRITE_SET_INITIAL_CAPACITY: usize = 32;
const READ_SET_INITIAL_CAPACITY: usize = 64;
#[derive(Debug, Clone)]
pub struct MvccTransaction {
pub txn_id: u64,
pub snapshot_ts: u64,
pub write_set: HashSet<InlineKey>,
pub read_set: HashSet<InlineKey>,
pub write_bloom: SsiBloomFilter,
pub read_bloom: SsiBloomFilter,
pub state: TxnState,
pub mode: TransactionMode,
}
impl MvccTransaction {
#[inline]
pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
}
#[inline]
pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
}
#[inline]
pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
}
#[inline]
pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
let (write_capacity, read_capacity) = match mode {
TransactionMode::ReadOnly => (0, 0), TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
};
Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
}
#[inline]
pub fn with_capacity(
txn_id: u64,
snapshot_ts: u64,
write_capacity: usize,
read_capacity: usize,
mode: TransactionMode,
) -> Self {
Self {
txn_id,
snapshot_ts,
write_set: HashSet::with_capacity(write_capacity),
read_set: HashSet::with_capacity(read_capacity),
write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
state: TxnState::Active,
mode,
}
}
#[inline]
pub fn is_read_only(&self) -> bool {
self.write_set.is_empty()
}
#[inline]
pub fn is_single_key_write(&self) -> bool {
self.write_set.len() == 1 && self.read_set.len() <= 1
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxnState {
Active,
Committed,
Aborted,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum TransactionMode {
ReadOnly,
WriteOnly,
#[default]
ReadWrite,
}
impl TransactionMode {
#[inline]
pub fn tracks_reads(&self) -> bool {
matches!(self, TransactionMode::ReadWrite)
}
#[inline]
pub fn tracks_writes(&self) -> bool {
matches!(self, TransactionMode::WriteOnly | TransactionMode::ReadWrite)
}
#[inline]
pub fn needs_ssi_validation(&self) -> bool {
matches!(self, TransactionMode::ReadWrite)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictType {
ReadWrite,
WriteRead,
}
#[derive(Debug, Clone)]
pub struct ConflictEdge {
pub from_txn: u64,
pub to_txn: u64,
pub conflict_type: ConflictType,
}
#[allow(clippy::type_complexity)]
pub struct MvccManager {
active_txns: DashMap<u64, MvccTransaction>,
ts_counter: AtomicU64,
min_active_ts: AtomicU64,
recent_commits: DashMap<
u64,
(
u64,
SsiBloomFilter,
SsiBloomFilter,
HashSet<InlineKey>,
HashSet<InlineKey>,
),
>,
max_recent_commits: usize,
}
impl Default for MvccManager {
fn default() -> Self {
Self::new()
}
}
impl MvccManager {
pub fn new() -> Self {
Self {
active_txns: DashMap::new(),
ts_counter: AtomicU64::new(1),
min_active_ts: AtomicU64::new(0),
recent_commits: DashMap::new(),
max_recent_commits: 1000, }
}
pub fn begin(&self, txn_id: u64) -> MvccTransaction {
self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
}
#[inline]
pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
}
#[inline]
pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
}
pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
self.active_txns.insert(txn_id, txn.clone());
self.update_min_active_ts();
txn
}
pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
self.active_txns.get(&txn_id).map(|t| t.clone())
}
#[inline]
pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
}
#[inline]
pub fn record_read(&self, txn_id: u64, key: &[u8]) {
if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
if !txn.mode.tracks_reads() {
return;
}
if txn.read_set.len() < 10000 {
txn.read_set.insert(SmallVec::from_slice(key));
txn.read_bloom.insert(key);
}
}
}
pub fn record_write(&self, txn_id: u64, key: &[u8]) {
if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
txn.write_set.insert(SmallVec::from_slice(key));
txn.write_bloom.insert(key);
}
}
pub fn alloc_commit_ts(&self) -> u64 {
self.ts_counter.fetch_add(1, Ordering::SeqCst)
}
pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
let txn = self.active_txns.get(&txn_id)?.clone();
if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
self.active_txns.remove(&txn_id);
self.update_min_active_ts();
return None;
}
}
let commit_ts = self.alloc_commit_ts();
let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
&& !removed_txn.read_set.is_empty()
&& !removed_txn.write_set.is_empty();
if needs_ssi_tracking {
let write_set_for_return = removed_txn.write_set.clone();
self.track_commit_owned(
txn_id,
commit_ts,
removed_txn.read_bloom,
removed_txn.write_bloom,
removed_txn.read_set,
removed_txn.write_set,
);
self.update_min_active_ts();
Some((commit_ts, write_set_for_return))
} else {
self.update_min_active_ts();
Some((commit_ts, removed_txn.write_set))
}
}
#[inline]
fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
if txn.write_set.is_empty() {
return true;
}
if self.recent_commits.is_empty() {
return true;
}
if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
return true;
}
let my_snapshot = txn.snapshot_ts;
let mut any_may_intersect = false;
for entry in self.recent_commits.iter() {
let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
if *other_commit_ts <= my_snapshot {
continue;
}
if txn.write_bloom.may_intersect(other_read_bloom)
|| other_write_bloom.may_intersect(&txn.read_bloom)
{
any_may_intersect = true;
break;
}
}
if !any_may_intersect {
return true;
}
let mut in_conflict_with: Vec<u64> = Vec::new();
let mut out_conflict_to: Vec<u64> = Vec::new();
for entry in self.recent_commits.iter() {
let (
other_txn_id,
(
other_commit_ts,
_other_read_bloom,
other_write_bloom,
other_read_set,
other_write_set,
),
) = entry.pair();
if *other_commit_ts <= my_snapshot {
continue;
}
let mut has_in_conflict = false;
for key in txn.read_set.iter() {
if other_write_bloom.may_contain(key) {
if other_write_set.contains(key) {
has_in_conflict = true;
break;
}
}
}
if has_in_conflict {
in_conflict_with.push(*other_txn_id);
}
let mut has_out_conflict = false;
for key in other_read_set.iter() {
if txn.write_bloom.may_contain(key) {
if txn.write_set.contains(key) {
has_out_conflict = true;
break;
}
}
}
if has_out_conflict {
out_conflict_to.push(*other_txn_id);
}
}
if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
return false; }
true
}
fn track_commit_owned(
&self,
txn_id: u64,
commit_ts: u64,
read_bloom: SsiBloomFilter,
write_bloom: SsiBloomFilter,
read_set: HashSet<InlineKey>,
write_set: HashSet<InlineKey>,
) {
if read_set.is_empty() || write_set.is_empty() {
return; }
self.recent_commits.insert(
txn_id,
(
commit_ts,
read_bloom,
write_bloom,
read_set,
write_set,
),
);
if self.recent_commits.len() > self.max_recent_commits * 2 {
let min_active = self.min_active_ts.load(Ordering::Relaxed);
self.recent_commits
.retain(|_, (ts, _, _, _, _)| *ts >= min_active);
}
}
#[allow(dead_code)]
fn track_commit(
&self,
txn_id: u64,
commit_ts: u64,
read_bloom: SsiBloomFilter,
write_bloom: SsiBloomFilter,
read_set: &HashSet<InlineKey>,
write_set: &HashSet<InlineKey>,
) {
if read_set.is_empty() || write_set.is_empty() {
return;
}
self.recent_commits.insert(
txn_id,
(
commit_ts,
read_bloom,
write_bloom,
read_set.clone(),
write_set.clone(),
),
);
}
pub fn abort(&self, txn_id: u64) {
self.active_txns.remove(&txn_id);
self.update_min_active_ts();
}
pub fn min_active_snapshot(&self) -> u64 {
self.min_active_ts.load(Ordering::SeqCst)
}
pub fn active_transaction_count(&self) -> usize {
self.active_txns.len()
}
fn update_min_active_ts(&self) {
let min = self
.active_txns
.iter()
.map(|entry| entry.value().snapshot_ts)
.min()
.unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
self.min_active_ts.store(min, Ordering::SeqCst);
}
}
struct EpochDirtyList {
epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
current_epoch: AtomicU64,
}
const EPOCH_RING_SIZE: usize = 4;
impl EpochDirtyList {
fn new() -> Self {
Self {
epochs: [
parking_lot::Mutex::new(Vec::new()),
parking_lot::Mutex::new(Vec::new()),
parking_lot::Mutex::new(Vec::new()),
parking_lot::Mutex::new(Vec::new()),
],
current_epoch: AtomicU64::new(0),
}
}
#[inline]
fn record_version(&self, key: Vec<u8>) {
let epoch = self.current_epoch.load(Ordering::Relaxed);
let idx = (epoch as usize) % EPOCH_RING_SIZE;
self.epochs[idx].lock().push(key);
}
#[inline]
fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
let epoch = self.current_epoch.load(Ordering::Relaxed);
let idx = (epoch as usize) % EPOCH_RING_SIZE;
let mut guard = self.epochs[idx].lock();
guard.extend(keys);
}
fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
let mut guard = self.epochs[old_idx].lock();
let keys = std::mem::take(&mut *guard);
(old_epoch, keys)
}
#[allow(dead_code)]
fn current(&self) -> u64 {
self.current_epoch.load(Ordering::Relaxed)
}
}
struct ScanRangeIterator<'a> {
memtable: &'a MvccMemTable,
start: Vec<u8>,
end: Vec<u8>,
snapshot_ts: u64,
current_txn_id: Option<u64>,
use_ordered: bool,
ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
initialized: bool,
}
impl<'a> Iterator for ScanRangeIterator<'a> {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
if !self.initialized {
self.initialized = true;
if self.use_ordered {
if let Some(ref def_idx) = self.memtable.deferred_index {
let start = self.start.clone();
let end = self.end.clone();
let snapshot_ts = self.snapshot_ts;
let current_txn_id = self.current_txn_id;
let data = &self.memtable.data;
let keys: Vec<Vec<u8>> = if end.is_empty() {
def_idx.range_from(&start).collect()
} else {
def_idx.range(&start, &end).collect()
};
let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
keys.into_iter()
.filter_map(move |key| {
if let Some(chain) = data.get(&key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
Some((key, value.clone()))
} else {
None
}
})
);
self.ordered_iter = Some(iter);
} else if let Some(ref idx) = self.memtable.ordered_index {
let start = self.start.clone();
let end = self.end.clone();
let snapshot_ts = self.snapshot_ts;
let current_txn_id = self.current_txn_id;
let data = &self.memtable.data;
let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty() {
Box::new(
idx.range(start..)
.filter_map(move |entry| {
let key = entry.key();
if let Some(chain) = data.get(key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
Some((key.clone(), value.clone()))
} else {
None
}
})
)
} else {
Box::new(
idx.range(start..end)
.filter_map(move |entry| {
let key = entry.key();
if let Some(chain) = data.get(key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
Some((key.clone(), value.clone()))
} else {
None
}
})
)
};
self.ordered_iter = Some(iter);
}
} else {
let start = self.start.clone();
let end = self.end.clone();
let snapshot_ts = self.snapshot_ts;
let current_txn_id = self.current_txn_id;
let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
self.memtable.data.iter()
.filter_map(move |entry| {
let key = entry.key();
if key.as_slice() < start.as_slice() {
return None;
}
if !end.is_empty() && key.as_slice() >= end.as_slice() {
return None;
}
if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
Some((key.clone(), value.clone()))
} else {
None
}
})
);
self.unordered_iter = Some(iter);
}
}
if let Some(ref mut iter) = self.ordered_iter {
iter.next()
} else if let Some(ref mut iter) = self.unordered_iter {
iter.next()
} else {
None
}
}
}
pub struct MvccMemTable {
data: DashMap<Vec<u8>, VersionChain>,
deferred_index: Option<DeferredSortedIndex>,
ordered_index: Option<SkipMap<Vec<u8>, ()>>,
#[allow(dead_code)]
use_deferred: bool,
size_bytes: AtomicU64,
dirty_list: EpochDirtyList,
}
impl Default for MvccMemTable {
fn default() -> Self {
Self::new()
}
}
impl MvccMemTable {
pub fn new() -> Self {
Self::with_ordered_index(true)
}
pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
Self::with_index_mode(enable_ordered_index, true)
}
pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
Self {
data: DashMap::new(),
deferred_index: if enable_ordered_index && use_deferred {
Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
max_unsorted_entries: 10_000, enabled: true,
}))
} else {
None
},
ordered_index: if enable_ordered_index && !use_deferred {
Some(SkipMap::new())
} else {
None
},
use_deferred,
size_bytes: AtomicU64::new(0),
dirty_list: EpochDirtyList::new(),
}
}
pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
let key_len = key.len();
self.dirty_list.record_version(key.clone());
if let Some(ref idx) = self.deferred_index {
idx.insert(key.clone());
} else if let Some(ref idx) = self.ordered_index {
idx.insert(key.clone(), ());
}
let mut entry = self.data.entry(key).or_default();
if entry.has_write_conflict(txn_id) {
return Err(SochDBError::Internal(
"Write-write conflict detected".into(),
));
}
entry.add_uncommitted(value, txn_id);
self.size_bytes
.fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
Ok(())
}
pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
let mut total_size = 0u64;
self.dirty_list.record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
for (key, value) in writes {
if let Some(ref idx) = self.deferred_index {
idx.insert(key.clone());
} else if let Some(ref idx) = self.ordered_index {
idx.insert(key.clone(), ());
}
let mut entry = self.data.entry(key.clone()).or_default();
if entry.has_write_conflict(txn_id) {
return Err(SochDBError::Internal(
"Write-write conflict detected".into(),
));
}
let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
entry.add_uncommitted(value.clone(), txn_id);
total_size += (key.len() + value_size) as u64;
}
self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
Ok(())
}
pub fn read(
&self,
key: &[u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Option<Vec<u8>> {
self.data.get(key).and_then(|chain| {
chain
.read_at(snapshot_ts, current_txn_id)
.and_then(|v| v.value.clone())
})
}
pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
for key in write_set {
if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
chain.commit(txn_id, commit_ts);
}
}
}
#[allow(dead_code)]
pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
for mut entry in self.data.iter_mut() {
entry.value_mut().commit(txn_id, commit_ts);
}
}
pub fn abort(&self, txn_id: u64) {
for mut entry in self.data.iter_mut() {
entry.value_mut().abort(txn_id);
}
}
pub fn scan_prefix(
&self,
prefix: &[u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Vec<(Vec<u8>, Vec<u8>)> {
let estimated_size = (self.data.len() / 10).max(64);
let mut results = Vec::with_capacity(estimated_size);
if let Some(ref idx) = self.deferred_index {
for key in idx.range_from(prefix) {
if !key.starts_with(prefix) {
break;
}
if let Some(chain) = self.data.get(&key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key, value.clone()));
}
}
} else if let Some(ref idx) = self.ordered_index {
for entry in idx.range(prefix.to_vec()..) {
let key = entry.key();
if !key.starts_with(prefix) {
break;
}
if let Some(chain) = self.data.get(key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key.clone(), value.clone()));
}
}
} else {
for entry in self.data.iter() {
let key = entry.key();
if !key.starts_with(prefix) {
continue;
}
if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key.clone(), value.clone()));
}
}
}
results
}
pub fn scan_all(
&self,
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Vec<(Vec<u8>, Vec<u8>)> {
let mut results = Vec::with_capacity(self.data.len());
for entry in self.data.iter() {
if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((entry.key().clone(), value.clone()));
}
}
results
}
pub fn scan_prefix_iter<'a>(
&'a self,
prefix: &'a [u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
self.data.iter().filter_map(move |entry| {
let key = entry.key();
if !key.starts_with(prefix) {
return None;
}
if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
Some((key.clone(), value.clone()))
} else {
None
}
})
}
pub fn scan_range(
&self,
start: &[u8],
end: &[u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Vec<(Vec<u8>, Vec<u8>)> {
let mut results = Vec::new();
if let Some(ref idx) = self.deferred_index {
if end.is_empty() {
for key in idx.range_from(start) {
if let Some(chain) = self.data.get(&key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key, value.clone()));
}
}
} else {
for key in idx.range(start, end) {
if let Some(chain) = self.data.get(&key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key, value.clone()));
}
}
}
} else if let Some(ref idx) = self.ordered_index {
if end.is_empty() {
for entry in idx.range(start.to_vec()..) {
let key = entry.key();
if let Some(chain) = self.data.get(key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key.clone(), value.clone()));
}
}
} else {
for entry in idx.range(start.to_vec()..end.to_vec()) {
let key = entry.key();
if let Some(chain) = self.data.get(key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key.clone(), value.clone()));
}
}
}
} else {
for entry in self.data.iter() {
let key = entry.key();
if key.as_slice() < start {
continue;
}
if !end.is_empty() && key.as_slice() >= end {
continue;
}
if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key.clone(), value.clone()));
}
}
}
results
}
pub fn scan_range_iter<'a>(
&'a self,
start: &'a [u8],
end: &'a [u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
if let Some(ref idx) = self.deferred_index {
idx.compact();
}
let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
ScanRangeIterator {
memtable: self,
start: start.to_vec(),
end: end.to_vec(),
snapshot_ts,
current_txn_id,
use_ordered,
ordered_iter: None,
unordered_iter: None,
initialized: false,
}
}
pub fn size(&self) -> u64 {
self.size_bytes.load(Ordering::Relaxed)
}
pub fn gc(&self, min_active_ts: u64) -> usize {
let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
if dirty_keys.is_empty() {
return 0;
}
let mut gc_count = 0;
let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
for key in unique_keys {
if let Some(mut entry) = self.data.get_mut(&key) {
let before = entry.value().version_count();
entry.value_mut().gc(min_active_ts);
gc_count += before.saturating_sub(entry.value().version_count());
}
}
gc_count
}
#[allow(dead_code)]
pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
let mut gc_count = 0;
for mut entry in self.data.iter_mut() {
let before = entry.value().version_count();
entry.value_mut().gc(min_active_ts);
gc_count += before.saturating_sub(entry.value().version_count());
}
gc_count
}
}
use crate::key_buffer::ArenaKeyHandle;
struct ArenaEpochDirtyList {
epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
current_epoch: AtomicU64,
}
impl ArenaEpochDirtyList {
fn new() -> Self {
Self {
epochs: [
parking_lot::Mutex::new(Vec::new()),
parking_lot::Mutex::new(Vec::new()),
parking_lot::Mutex::new(Vec::new()),
parking_lot::Mutex::new(Vec::new()),
],
current_epoch: AtomicU64::new(0),
}
}
#[inline]
fn record_version(&self, key: ArenaKeyHandle) {
let epoch = self.current_epoch.load(Ordering::Relaxed);
let idx = (epoch as usize) % EPOCH_RING_SIZE;
self.epochs[idx].lock().push(key);
}
fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
let mut guard = self.epochs[old_idx].lock();
let keys = std::mem::take(&mut *guard);
(old_epoch, keys)
}
}
pub struct ArenaMvccMemTable {
data: DashMap<ArenaKeyHandle, VersionChain>,
ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
size_bytes: AtomicU64,
dirty_list: ArenaEpochDirtyList,
}
impl ArenaMvccMemTable {
pub fn new() -> Self {
Self::with_ordered_index(true)
}
pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
Self {
data: DashMap::new(),
ordered_index: if enable_ordered_index {
Some(SkipMap::new())
} else {
None
},
size_bytes: AtomicU64::new(0),
dirty_list: ArenaEpochDirtyList::new(),
}
}
pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
let key_len = key.len();
let key_handle = ArenaKeyHandle::new(key);
self.dirty_list.record_version(key_handle.clone());
if let Some(ref idx) = self.ordered_index {
idx.insert(key_handle.clone(), ());
}
let mut entry = self.data.entry(key_handle).or_default();
if entry.has_write_conflict(txn_id) {
return Err(SochDBError::Internal(
"Write-write conflict detected".into(),
));
}
entry.add_uncommitted(value, txn_id);
self.size_bytes
.fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
Ok(())
}
pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
let mut total_size = 0u64;
for (key, value) in writes {
let key_handle = ArenaKeyHandle::new(key);
self.dirty_list.record_version(key_handle.clone());
if let Some(ref idx) = self.ordered_index {
idx.insert(key_handle.clone(), ());
}
let mut entry = self.data.entry(key_handle).or_default();
if entry.has_write_conflict(txn_id) {
return Err(SochDBError::Internal(
"Write-write conflict detected".into(),
));
}
let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
entry.add_uncommitted(value.clone(), txn_id);
total_size += (key.len() + value_size) as u64;
}
self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
Ok(())
}
pub fn read(
&self,
key: &[u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Option<Vec<u8>> {
let key_handle = ArenaKeyHandle::new(key);
self.data.get(&key_handle).and_then(|chain| {
chain
.read_at(snapshot_ts, current_txn_id)
.and_then(|v| v.value.clone())
})
}
pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
for key in write_set {
let key_handle = ArenaKeyHandle::new(key.as_slice());
if let Some(mut chain) = self.data.get_mut(&key_handle) {
chain.commit(txn_id, commit_ts);
}
}
}
pub fn abort(&self, txn_id: u64) {
for mut entry in self.data.iter_mut() {
entry.value_mut().abort(txn_id);
}
}
pub fn scan_prefix(
&self,
prefix: &[u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Vec<(Vec<u8>, Vec<u8>)> {
let mut results = Vec::new();
let prefix_handle = ArenaKeyHandle::new(prefix);
if let Some(ref idx) = self.ordered_index {
for entry in idx.range(prefix_handle..) {
let key = entry.key();
if !key.as_bytes().starts_with(prefix) {
break;
}
if let Some(chain) = self.data.get(key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key.as_bytes().to_vec(), value.clone()));
}
}
} else {
for entry in self.data.iter() {
let key = entry.key();
if !key.as_bytes().starts_with(prefix) {
continue;
}
if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key.as_bytes().to_vec(), value.clone()));
}
}
}
results
}
pub fn size(&self) -> u64 {
self.size_bytes.load(Ordering::Relaxed)
}
pub fn gc(&self, min_active_ts: u64) -> usize {
let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
if dirty_keys.is_empty() {
return 0;
}
let mut gc_count = 0;
let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
for key in unique_keys {
if let Some(mut entry) = self.data.get_mut(&key) {
let before = entry.value().version_count();
entry.value_mut().gc(min_active_ts);
gc_count += before.saturating_sub(entry.value().version_count());
}
}
gc_count
}
}
impl Default for ArenaMvccMemTable {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MemTableType {
Standard,
Arena,
}
impl Default for MemTableType {
fn default() -> Self {
MemTableType::Standard
}
}
pub enum MemTableKind {
Standard(MvccMemTable),
Arena(ArenaMvccMemTable),
}
impl MemTableKind {
pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
match kind {
MemTableType::Standard => {
MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
}
MemTableType::Arena => {
MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
}
}
}
#[inline]
pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
match self {
MemTableKind::Standard(m) => m.write(key, value, txn_id),
MemTableKind::Arena(m) => m.write(&key, value, txn_id),
}
}
#[inline]
pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
match self {
MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
MemTableKind::Arena(m) => {
let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
.iter()
.map(|(k, v)| (k.as_slice(), v.clone()))
.collect();
m.write_batch(&arena_writes, txn_id)
}
}
}
#[inline]
pub fn read(
&self,
key: &[u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Option<Vec<u8>> {
match self {
MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
}
}
#[inline]
pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
match self {
MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
}
}
#[inline]
pub fn abort(&self, txn_id: u64) {
match self {
MemTableKind::Standard(m) => m.abort(txn_id),
MemTableKind::Arena(m) => m.abort(txn_id),
}
}
#[inline]
pub fn scan_prefix(
&self,
prefix: &[u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Vec<(Vec<u8>, Vec<u8>)> {
match self {
MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
}
}
#[inline]
pub fn scan_range(
&self,
start: &[u8],
end: &[u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Vec<(Vec<u8>, Vec<u8>)> {
match self {
MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
MemTableKind::Arena(m) => {
let mut results = Vec::new();
if let Some(ref idx) = m.ordered_index {
let start_handle = ArenaKeyHandle::new(start);
let end_handle = ArenaKeyHandle::new(end);
if end.is_empty() {
for entry in idx.range(start_handle..) {
let key = entry.key();
if let Some(chain) = m.data.get(key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key.as_bytes().to_vec(), value.clone()));
}
}
} else {
for entry in idx.range(start_handle..end_handle) {
let key = entry.key();
if let Some(chain) = m.data.get(key)
&& let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key.as_bytes().to_vec(), value.clone()));
}
}
}
} else {
for entry in m.data.iter() {
let key = entry.key();
let key_bytes = key.as_bytes();
if key_bytes < start {
continue;
}
if !end.is_empty() && key_bytes >= end {
continue;
}
if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
&& let Some(value) = &v.value
{
results.push((key_bytes.to_vec(), value.clone()));
}
}
}
results
}
}
}
#[inline]
pub fn scan_range_iter<'a>(
&'a self,
start: &'a [u8],
end: &'a [u8],
snapshot_ts: u64,
current_txn_id: Option<u64>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
match self {
MemTableKind::Standard(m) => {
Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
}
MemTableKind::Arena(_) => {
let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
Box::new(results.into_iter())
}
}
}
#[inline]
pub fn size(&self) -> u64 {
match self {
MemTableKind::Standard(m) => m.size(),
MemTableKind::Arena(m) => m.size(),
}
}
#[inline]
pub fn gc(&self, min_active_ts: u64) -> usize {
match self {
MemTableKind::Standard(m) => m.gc(min_active_ts),
MemTableKind::Arena(m) => m.gc(min_active_ts),
}
}
pub fn kind(&self) -> MemTableType {
match self {
MemTableKind::Standard(_) => MemTableType::Standard,
MemTableKind::Arena(_) => MemTableType::Arena,
}
}
}
pub struct DurableStorage {
path: PathBuf,
wal: Arc<TxnWal>,
mvcc: Arc<MvccManager>,
memtable: Arc<MemTableKind>,
txn_write_buffers: DashMap<u64, TxnWalBuffer>,
group_commit: Option<Arc<EventDrivenGroupCommit>>,
needs_recovery: AtomicU64, last_checkpoint_lsn: AtomicU64,
sync_mode: AtomicU64,
commits_since_sync: AtomicU64,
arrival_rate_ema: AtomicU64,
last_commit_us: AtomicU64,
fsync_latency_us: AtomicU64,
}
impl DurableStorage {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with_config(path, true)
}
pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
}
pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with_full_config(path, true, MemTableType::Arena)
}
pub fn open_with_full_config<P: AsRef<Path>>(
path: P,
enable_ordered_index: bool,
memtable_type: MemTableType,
) -> Result<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path)?;
let wal_path = path.join("wal.log");
let wal = Arc::new(TxnWal::new(&wal_path)?);
let storage = Self {
path,
wal: wal.clone(),
mvcc: Arc::new(MvccManager::new()),
memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
txn_write_buffers: DashMap::new(),
group_commit: None,
needs_recovery: AtomicU64::new(0),
last_checkpoint_lsn: AtomicU64::new(0),
sync_mode: AtomicU64::new(1), commits_since_sync: AtomicU64::new(0),
arrival_rate_ema: AtomicU64::new(1_000_000), last_commit_us: AtomicU64::new(0),
fsync_latency_us: AtomicU64::new(5000), };
if storage.check_recovery_needed()? {
storage.needs_recovery.store(1, Ordering::SeqCst);
}
Ok(storage)
}
pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with_group_commit_and_config(path, true)
}
pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
path: P,
enable_ordered_index: bool,
) -> Result<Self> {
let mut storage = Self::open_with_config(path, enable_ordered_index)?;
let wal = storage.wal.clone();
let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
for &txn_id in txn_ids {
let entry = TxnWalEntry::txn_commit(txn_id);
wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
}
wal.flush().map_err(|e| e.to_string())?;
wal.sync().map_err(|e| e.to_string())?;
Ok(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64)
});
storage.group_commit = Some(Arc::new(gc));
Ok(storage)
}
pub fn open_with_policy<P: AsRef<Path>>(
path: P,
policy: crate::index_policy::IndexPolicy,
group_commit: bool,
) -> Result<Self> {
use crate::index_policy::IndexPolicy;
let (enable_ordered_index, memtable_type) = match policy {
IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
(false, MemTableType::Arena)
}
IndexPolicy::Balanced => {
(true, MemTableType::Standard)
}
IndexPolicy::ScanOptimized => {
(true, MemTableType::Standard)
}
};
if group_commit {
let mut storage = Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
let wal = storage.wal.clone();
let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
for &txn_id in txn_ids {
let entry = TxnWalEntry::txn_commit(txn_id);
wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
}
wal.flush().map_err(|e| e.to_string())?;
wal.sync().map_err(|e| e.to_string())?;
Ok(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64)
});
storage.group_commit = Some(Arc::new(gc));
Ok(storage)
} else {
Self::open_with_full_config(path, enable_ordered_index, memtable_type)
}
}
pub fn memtable_type(&self) -> MemTableType {
self.memtable.kind()
}
fn check_recovery_needed(&self) -> Result<bool> {
let marker_path = self.path.join(".clean_shutdown");
if marker_path.exists() {
std::fs::remove_file(&marker_path)?;
}
Ok(true)
}
pub fn recover(&self) -> Result<RecoveryStats> {
if self.needs_recovery.load(Ordering::SeqCst) == 0 {
return Ok(RecoveryStats::default());
}
let (writes, txn_count) = self.wal.replay_for_recovery()?;
let recovery_txn_id = self.wal.alloc_txn_id();
let commit_ts = self.mvcc.alloc_commit_ts();
let mut write_set: HashSet<InlineKey> = HashSet::new();
for (key, value) in &writes {
write_set.insert(SmallVec::from_slice(key));
self.memtable
.write(key.clone(), Some(value.clone()), recovery_txn_id)?;
}
self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
self.needs_recovery.store(0, Ordering::SeqCst);
Ok(RecoveryStats {
transactions_recovered: txn_count,
writes_recovered: writes.len(),
commit_ts,
})
}
pub fn begin_transaction(&self) -> Result<u64> {
let txn_id = self.wal.begin_transaction()?;
self.mvcc.begin(txn_id);
Ok(txn_id)
}
pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
let txn_id = self.wal.begin_transaction()?;
self.mvcc.begin_with_mode(txn_id, mode);
Ok(txn_id)
}
#[inline]
pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
let snapshot_ts = self
.mvcc
.get_snapshot_ts(txn_id)
.ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
self.mvcc.record_read(txn_id, key);
Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
}
pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
self.write_refs(txn_id, &key, &value)?;
Ok(())
}
#[inline]
pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
self.mvcc.record_write(txn_id, key);
self.txn_write_buffers
.entry(txn_id)
.or_insert_with(|| TxnWalBuffer::new(txn_id))
.append(key, value);
self.memtable
.write(key.to_vec(), Some(value.to_vec()), txn_id)?;
Ok(())
}
pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
self.mvcc.record_write(txn_id, &key);
self.txn_write_buffers
.entry(txn_id)
.or_insert_with(|| TxnWalBuffer::new(txn_id))
.append(&key, &[]);
self.memtable.write(key, None, txn_id)?;
Ok(())
}
#[inline]
pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
if writes.is_empty() {
return Ok(());
}
let mut buffer_entry = self
.txn_write_buffers
.entry(txn_id)
.or_insert_with(|| TxnWalBuffer::new(txn_id));
for (key, value) in writes {
self.mvcc.record_write(txn_id, key);
buffer_entry.append(key, value);
}
drop(buffer_entry);
let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
.iter()
.map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
.collect();
self.memtable.write_batch(&owned_writes, txn_id)?;
Ok(())
}
pub fn commit(&self, txn_id: u64) -> Result<u64> {
if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
&& !buffer.is_empty()
{
self.wal.flush_buffer(&buffer)?;
}
if let Some(gc) = &self.group_commit {
gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
let (commit_ts, write_set) = self
.mvcc
.commit(txn_id)
.ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
self.memtable.commit(txn_id, commit_ts, &write_set);
Ok(commit_ts)
} else {
let sync_mode = self.sync_mode.load(Ordering::Relaxed);
let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
self.update_arrival_rate();
let entry = TxnWalEntry::txn_commit(txn_id);
self.wal.append_no_flush(&entry)?;
let should_sync = match sync_mode {
0 => false, 1 => commits >= self.adaptive_batch_threshold(), _ => true, };
if should_sync {
let start = std::time::Instant::now();
self.wal.flush()?;
self.wal.sync()?;
let latency_us = start.elapsed().as_micros() as u64;
let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
let new_latency = (old_latency * 9 + latency_us) / 10;
self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
self.commits_since_sync.store(0, Ordering::Relaxed);
}
let (commit_ts, write_set) = self
.mvcc
.commit(txn_id)
.ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
self.memtable.commit(txn_id, commit_ts, &write_set);
Ok(commit_ts)
}
}
#[inline]
fn update_arrival_rate(&self) {
let now_us = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as u64;
let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
if last > 0 {
let delta_us = now_us.saturating_sub(last);
if delta_us > 0 && delta_us < 10_000_000 {
let instant_rate = 1_000_000_000 / delta_us;
let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
let new_rate = (old_rate * 9 + instant_rate) / 10;
self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
}
}
}
#[inline]
fn adaptive_batch_threshold(&self) -> u64 {
let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0;
if lambda <= 0.0 || tau <= 0.0 {
return 100; }
let n_opt = (2.0 * tau * lambda).sqrt();
(n_opt as u64).clamp(1, 1000)
}
pub fn set_sync_mode(&self, mode: u64) {
self.sync_mode.store(mode.min(2), Ordering::Relaxed);
}
pub fn flush_group_commit(&self) {
if let Some(gc) = &self.group_commit {
gc.flush_batch();
}
}
pub fn abort(&self, txn_id: u64) -> Result<()> {
self.txn_write_buffers.remove(&txn_id);
self.wal.abort_transaction(txn_id)?;
self.mvcc.abort(txn_id);
self.memtable.abort(txn_id);
Ok(())
}
#[inline]
pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let snapshot_ts = self
.mvcc
.get_snapshot_ts(txn_id)
.ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
}
#[inline]
pub fn scan_range(
&self,
txn_id: u64,
start: &[u8],
end: &[u8],
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let snapshot_ts = self
.mvcc
.get_snapshot_ts(txn_id)
.ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
Ok(self
.memtable
.scan_range(start, end, snapshot_ts, Some(txn_id)))
}
#[inline]
pub fn scan_range_iter<'a>(
&'a self,
txn_id: u64,
start: &'a [u8],
end: &'a [u8],
) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
self.memtable.scan_range_iter(start, end, snapshot_ts, Some(txn_id))
}
pub fn fsync(&self) -> Result<()> {
self.wal.sync()
}
pub fn checkpoint(&self) -> Result<u64> {
let txn_id = 0; let entry = TxnWalEntry::checkpoint(txn_id);
let lsn = self.wal.append_sync(&entry)?;
self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
Ok(lsn)
}
pub fn stats(&self) -> StorageStats {
let wal_size = self.wal.size_bytes();
let active_txns = self.mvcc.active_transaction_count();
StorageStats {
memtable_size_bytes: self.memtable.size(),
wal_size_bytes: wal_size,
active_transactions: active_txns,
min_active_snapshot: self.mvcc.min_active_snapshot(),
last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
}
}
pub fn gc(&self) -> usize {
let min_ts = self.mvcc.min_active_snapshot();
self.memtable.gc(min_ts)
}
pub fn shutdown(&self) -> Result<()> {
self.fsync()?;
let marker_path = self.path.join(".clean_shutdown");
std::fs::write(&marker_path, b"clean")?;
Ok(())
}
}
impl Drop for DurableStorage {
fn drop(&mut self) {
let _ = self.shutdown();
}
}
#[derive(Debug, Default)]
pub struct RecoveryStats {
pub transactions_recovered: usize,
pub writes_recovered: usize,
pub commit_ts: u64,
}
#[derive(Debug, Default)]
pub struct StorageStats {
pub memtable_size_bytes: u64,
pub wal_size_bytes: u64,
pub active_transactions: usize,
pub min_active_snapshot: u64,
pub last_checkpoint_lsn: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_basic_transaction() {
let dir = tempdir().unwrap();
let storage = DurableStorage::open(dir.path()).unwrap();
let txn_id = storage.begin_transaction().unwrap();
storage
.write(txn_id, b"key1".to_vec(), b"value1".to_vec())
.unwrap();
storage
.write(txn_id, b"key2".to_vec(), b"value2".to_vec())
.unwrap();
let v1 = storage.read(txn_id, b"key1").unwrap();
assert_eq!(v1, Some(b"value1".to_vec()));
let commit_ts = storage.commit(txn_id).unwrap();
assert!(commit_ts > 0);
let txn2 = storage.begin_transaction().unwrap();
let v1 = storage.read(txn2, b"key1").unwrap();
assert_eq!(v1, Some(b"value1".to_vec()));
storage.abort(txn2).unwrap();
}
#[test]
fn test_snapshot_isolation() {
let dir = tempdir().unwrap();
let storage = DurableStorage::open(dir.path()).unwrap();
let t1 = storage.begin_transaction().unwrap();
storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
storage.commit(t1).unwrap();
let t2 = storage.begin_transaction().unwrap();
let t3 = storage.begin_transaction().unwrap();
storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
storage.commit(t3).unwrap();
let v = storage.read(t2, b"key").unwrap();
assert_eq!(v, Some(b"v1".to_vec()));
let t4 = storage.begin_transaction().unwrap();
let v = storage.read(t4, b"key").unwrap();
assert_eq!(v, Some(b"v2".to_vec()));
storage.abort(t2).unwrap();
storage.abort(t4).unwrap();
}
#[test]
fn test_abort_transaction() {
let dir = tempdir().unwrap();
let storage = DurableStorage::open(dir.path()).unwrap();
let t1 = storage.begin_transaction().unwrap();
storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
storage.commit(t1).unwrap();
let t2 = storage.begin_transaction().unwrap();
storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
storage.abort(t2).unwrap();
let t3 = storage.begin_transaction().unwrap();
let v = storage.read(t3, b"key").unwrap();
assert_eq!(v, Some(b"v1".to_vec()));
storage.abort(t3).unwrap();
}
#[test]
fn test_crash_recovery() {
let dir = tempdir().unwrap();
{
let storage = DurableStorage::open(dir.path()).unwrap();
storage.set_sync_mode(2);
let txn = storage.begin_transaction().unwrap();
storage
.write(txn, b"persist".to_vec(), b"data".to_vec())
.unwrap();
storage.commit(txn).unwrap();
std::mem::forget(storage);
}
{
let storage = DurableStorage::open(dir.path()).unwrap();
let stats = storage.recover().unwrap();
assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
let txn = storage.begin_transaction().unwrap();
let v = storage.read(txn, b"persist").unwrap();
assert_eq!(v, Some(b"data".to_vec()));
storage.abort(txn).unwrap();
}
}
#[test]
fn test_scan_prefix() {
let dir = tempdir().unwrap();
let storage = DurableStorage::open(dir.path()).unwrap();
let txn = storage.begin_transaction().unwrap();
storage
.write(txn, b"user:1".to_vec(), b"alice".to_vec())
.unwrap();
storage
.write(txn, b"user:2".to_vec(), b"bob".to_vec())
.unwrap();
storage
.write(txn, b"order:1".to_vec(), b"order1".to_vec())
.unwrap();
storage.commit(txn).unwrap();
let txn2 = storage.begin_transaction().unwrap();
let users = storage.scan(txn2, b"user:").unwrap();
assert_eq!(users.len(), 2);
storage.abort(txn2).unwrap();
}
#[test]
fn test_delete() {
let dir = tempdir().unwrap();
let storage = DurableStorage::open(dir.path()).unwrap();
let t1 = storage.begin_transaction().unwrap();
storage
.write(t1, b"key".to_vec(), b"value".to_vec())
.unwrap();
storage.commit(t1).unwrap();
let t2 = storage.begin_transaction().unwrap();
assert!(storage.read(t2, b"key").unwrap().is_some());
storage.abort(t2).unwrap();
let t3 = storage.begin_transaction().unwrap();
storage.delete(t3, b"key".to_vec()).unwrap();
storage.commit(t3).unwrap();
let t4 = storage.begin_transaction().unwrap();
assert!(storage.read(t4, b"key").unwrap().is_none());
storage.abort(t4).unwrap();
}
#[test]
fn test_gc() {
let dir = tempdir().unwrap();
let storage = DurableStorage::open(dir.path()).unwrap();
for i in 0..10 {
let txn = storage.begin_transaction().unwrap();
storage
.write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
.unwrap();
storage.commit(txn).unwrap();
}
let gc_count = storage.gc();
let _ = gc_count; }
#[test]
fn test_group_commit() {
use std::sync::Arc;
use std::thread;
let dir = tempdir().unwrap();
let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
let mut handles = vec![];
for i in 0..4 {
let storage = Arc::clone(&storage);
handles.push(thread::spawn(move || {
let txn = storage.begin_transaction().unwrap();
storage
.write(
txn,
format!("key{}", i).into_bytes(),
format!("val{}", i).into_bytes(),
)
.unwrap();
storage.commit(txn).unwrap()
}));
}
let mut commit_times = vec![];
for h in handles {
commit_times.push(h.join().unwrap());
}
assert!(commit_times.iter().all(|&ts| ts > 0));
let txn = storage.begin_transaction().unwrap();
for i in 0..4 {
let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
assert_eq!(val, Some(format!("val{}", i).into_bytes()));
}
storage.abort(txn).unwrap();
}
#[test]
fn test_arena_memtable_basic_write_read() {
let memtable = ArenaMvccMemTable::new();
memtable
.write(b"key1", Some(b"value1".to_vec()), 1)
.unwrap();
memtable
.write(b"key2", Some(b"value2".to_vec()), 1)
.unwrap();
assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
}
#[test]
fn test_arena_memtable_update() {
let memtable = ArenaMvccMemTable::new();
memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
}
#[test]
fn test_arena_memtable_delete() {
let memtable = ArenaMvccMemTable::new();
memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
memtable.write(b"key", None, 1).unwrap();
assert_eq!(memtable.read(b"key", 0, Some(1)), None);
}
#[test]
fn test_arena_memtable_scan_prefix() {
let memtable = ArenaMvccMemTable::new();
memtable
.write(b"user:1:name", Some(b"Alice".to_vec()), 1)
.unwrap();
memtable
.write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
.unwrap();
memtable
.write(b"user:2:name", Some(b"Bob".to_vec()), 1)
.unwrap();
memtable
.write(b"order:1", Some(b"order_data".to_vec()), 1)
.unwrap();
let mut write_set = HashSet::new();
write_set.insert(InlineKey::from_slice(b"user:1:name"));
write_set.insert(InlineKey::from_slice(b"user:1:email"));
write_set.insert(InlineKey::from_slice(b"user:2:name"));
write_set.insert(InlineKey::from_slice(b"order:1"));
memtable.commit(1, 10, &write_set);
let results = memtable.scan_prefix(b"user:1:", 11, None);
assert_eq!(results.len(), 2);
let results = memtable.scan_prefix(b"user:", 11, None);
assert_eq!(results.len(), 3);
}
#[test]
fn test_arena_memtable_write_batch() {
let memtable = ArenaMvccMemTable::new();
let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
(b"k1", Some(b"v1".to_vec())),
(b"k2", Some(b"v2".to_vec())),
(b"k3", Some(b"v3".to_vec())),
];
memtable.write_batch(&writes, 1).unwrap();
assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
}
#[test]
fn test_arena_memtable_gc() {
let memtable = ArenaMvccMemTable::new();
for i in 0..10 {
memtable
.write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
.unwrap();
let mut write_set = HashSet::new();
write_set.insert(InlineKey::from_slice(b"key"));
memtable.commit(i + 1, (i + 1) * 10, &write_set);
}
let gc_count = memtable.gc(90);
let _ = gc_count; }
#[test]
fn test_arena_memtable_size_tracking() {
let memtable = ArenaMvccMemTable::new();
assert_eq!(memtable.size(), 0);
memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
assert!(memtable.size() > 0);
}
#[test]
fn test_arena_memtable_abort() {
let memtable = ArenaMvccMemTable::new();
memtable
.write(b"key", Some(b"uncommitted".to_vec()), 1)
.unwrap();
assert_eq!(
memtable.read(b"key", 0, Some(1)),
Some(b"uncommitted".to_vec())
);
assert_eq!(memtable.read(b"key", 0, Some(2)), None);
memtable.abort(1);
assert_eq!(memtable.read(b"key", 0, Some(1)), None);
}
#[test]
fn test_memtable_kind_standard() {
let memtable = MemTableKind::new(MemTableType::Standard, true);
assert_eq!(memtable.kind(), MemTableType::Standard);
memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
memtable.commit(1, 100, &write_set);
let v = memtable.read(b"key1", 101, None);
assert_eq!(v, Some(b"value1".to_vec()));
}
#[test]
fn test_memtable_kind_arena() {
let memtable = MemTableKind::new(MemTableType::Arena, true);
assert_eq!(memtable.kind(), MemTableType::Arena);
memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
memtable.commit(1, 100, &write_set);
let v = memtable.read(b"key1", 101, None);
assert_eq!(v, Some(b"value1".to_vec()));
}
#[test]
fn test_memtable_kind_scan_range() {
for kind in [MemTableType::Standard, MemTableType::Arena] {
let memtable = MemTableKind::new(kind, true);
for i in 0..5 {
let key = format!("key{}", i);
let value = format!("value{}", i);
memtable.write(key.into_bytes(), Some(value.into_bytes()), 1).unwrap();
}
let write_set: HashSet<InlineKey> = (0..5)
.map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
.collect();
memtable.commit(1, 100, &write_set);
let results = memtable.scan_range(b"key1", b"key4", 101, None);
assert_eq!(results.len(), 3, "kind={:?} should have 3 results (key1, key2, key3)", kind);
}
}
#[test]
fn test_durable_storage_arena() {
let dir = tempdir().unwrap();
let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
assert_eq!(storage.memtable_type(), MemTableType::Arena);
let txn_id = storage.begin_transaction().unwrap();
storage.write(txn_id, b"key1".to_vec(), b"value1".to_vec()).unwrap();
storage.commit(txn_id).unwrap();
let txn2 = storage.begin_transaction().unwrap();
let v = storage.read(txn2, b"key1").unwrap();
assert_eq!(v, Some(b"value1".to_vec()));
storage.abort(txn2).unwrap();
}
#[test]
fn test_durable_storage_full_config() {
let dir = tempdir().unwrap();
let storage = DurableStorage::open_with_full_config(
dir.path(),
true,
MemTableType::Arena,
).unwrap();
assert_eq!(storage.memtable_type(), MemTableType::Arena);
let txn = storage.begin_transaction().unwrap();
for i in 0..10 {
let key = format!("key{:02}", i);
let value = format!("value{}", i);
storage.write(txn, key.into_bytes(), value.into_bytes()).unwrap();
}
storage.commit(txn).unwrap();
let txn2 = storage.begin_transaction().unwrap();
let results = storage.scan(txn2, b"key0").unwrap();
assert_eq!(results.len(), 10); storage.abort(txn2).unwrap();
}
}