pub mod buffer_pool;
pub mod checkpoint;
pub mod free_space;
pub mod memtable;
pub mod metrics;
pub mod sstable;
pub mod wal;
use std::collections::VecDeque;
use std::collections::{BTreeMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use crate::compaction::leveled::{KeyRange, LeveledCompactionConfig, SSTableMeta};
use crate::error::{Error, Result};
use crate::kv::{KVStore, KVTransaction};
use crate::lsm::buffer_pool::{BufferPool, BufferPoolConfig};
use crate::lsm::checkpoint::{load_checkpoint_meta, save_checkpoint_meta, CheckpointMeta};
use crate::lsm::memtable::{ImmutableMemTable, MemTable, MemTableConfig};
use crate::lsm::metrics::{LsmMetrics, LsmMetricsSnapshot};
use crate::lsm::sstable::{SSTableConfig, SSTableEntry, SSTableReader, SSTableWriter};
use crate::lsm::wal::{
detect_wal_format_version, SyncMode, WalBatchOp, WalConfig, WalEntry, WalEntryPayload,
WalOpType, WalReader, WalWriter, WAL_FORMAT_VERSION,
};
use crate::storage::format::WriteThrottleConfig;
use crate::txn::TxnManager;
use crate::types::{Key, TxnId, TxnMode, Value};
use tracing::{info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThreadMode {
MultiThread,
SingleThread,
}
#[derive(Debug, Clone)]
pub struct LsmKVConfig {
pub wal: WalConfig,
pub checkpoint: CheckpointConfig,
pub memtable: MemTableConfig,
pub sstable: SSTableConfig,
pub compaction: LeveledCompactionConfig,
pub buffer_pool: BufferPoolConfig,
pub thread_mode: ThreadMode,
pub write_throttle: WriteThrottleConfig,
}
#[derive(Debug, Clone)]
pub struct CheckpointConfig {
pub wal_size_threshold: u64,
pub min_interval_ms: u64,
pub auto_checkpoint: bool,
}
impl Default for CheckpointConfig {
fn default() -> Self {
Self {
wal_size_threshold: 64 * 1024 * 1024,
min_interval_ms: 60_000,
auto_checkpoint: true,
}
}
}
impl Default for LsmKVConfig {
fn default() -> Self {
let wal = WalConfig {
sync_mode: SyncMode::BatchSync {
max_batch_size: 1024,
max_wait_ms: 10,
},
..WalConfig::default()
};
#[cfg(feature = "compression-lz4")]
let sstable = SSTableConfig {
compression: crate::lsm::sstable::CompressionType::Lz4,
..SSTableConfig::default()
};
#[cfg(not(feature = "compression-lz4"))]
let sstable = SSTableConfig::default();
Self {
wal,
checkpoint: CheckpointConfig::default(),
memtable: MemTableConfig::default(),
sstable,
compaction: LeveledCompactionConfig::default(),
buffer_pool: BufferPoolConfig::default(),
thread_mode: ThreadMode::MultiThread,
write_throttle: WriteThrottleConfig::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct RecoveryResult {
pub entries_recovered: usize,
pub last_lsn: u64,
pub warnings: Vec<String>,
pub stop_reason: Option<String>,
pub checkpoint_lsn: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct CheckpointResult {
pub checkpoint_lsn: u64,
pub wal_bytes_reclaimed: u64,
pub duration_ms: u64,
}
#[derive(Debug)]
pub struct TimestampOracle {
next: AtomicU64,
}
impl TimestampOracle {
pub fn new(start: u64) -> Self {
Self {
next: AtomicU64::new(start),
}
}
pub fn next_timestamp(&self) -> u64 {
self.next.fetch_add(1, Ordering::Relaxed)
}
pub fn current_timestamp(&self) -> u64 {
self.next.load(Ordering::Relaxed).saturating_sub(1)
}
}
#[derive(Debug)]
pub struct LsmTxnManager {
next_txn_id: AtomicU64,
}
impl Default for LsmTxnManager {
fn default() -> Self {
Self {
next_txn_id: AtomicU64::new(1),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct LsmTxnManagerRef<'a> {
store: &'a LsmKV,
}
impl<'a> LsmTxnManagerRef<'a> {
fn allocate_txn_id(&self) -> TxnId {
TxnId(
self.store
.txn_manager
.next_txn_id
.fetch_add(1, Ordering::Relaxed),
)
}
}
#[derive(Debug)]
pub struct LsmKV {
pub config: LsmKVConfig,
pub data_dir: PathBuf,
pub sst_dir: PathBuf,
pub wal_path: PathBuf,
pub wal: RwLock<WalWriter>,
pub active_memtable: RwLock<MemTable>,
pub immutable_memtables: RwLock<VecDeque<Arc<ImmutableMemTable>>>,
pub levels: RwLock<Vec<Vec<SSTableMeta>>>,
pub buffer_pool: BufferPool,
pub metrics: Arc<LsmMetrics>,
pub ts_oracle: TimestampOracle,
pub txn_manager: LsmTxnManager,
pub commit_lock: Mutex<()>,
pub next_sstable_id: AtomicU64,
pub wal_used_bytes: AtomicU64,
pub last_checkpoint_ms: AtomicU64,
}
impl LsmKV {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let (store, _recovery) = Self::open_with_config(path, LsmKVConfig::default())?;
Ok(store)
}
pub fn open_with_config(
path: impl AsRef<Path>,
config: LsmKVConfig,
) -> Result<(Self, RecoveryResult)> {
let data_dir = path.as_ref().to_path_buf();
fs::create_dir_all(&data_dir)?;
let wal_path = data_dir.join("lsm.wal");
let sst_dir = data_dir.join("sst");
fs::create_dir_all(&sst_dir)?;
let metrics = Arc::new(LsmMetrics::default());
let checkpoint_path = data_dir.join("checkpoint.meta");
let (wal_writer, recovered, next_ts, recovery, last_checkpoint_ms) = if wal_path.exists() {
let wal_version = detect_wal_format_version(&wal_path, &config.wal)?;
if wal_version < WAL_FORMAT_VERSION {
Self::migrate_legacy_wal(&wal_path, &checkpoint_path, &config.wal)?;
}
let start = Instant::now();
let checkpoint = load_checkpoint_meta(&checkpoint_path)?;
let checkpoint_lsn = checkpoint.as_ref().map(|meta| meta.checkpoint_lsn);
let last_checkpoint_ms = checkpoint.as_ref().map(|meta| meta.created_at).unwrap_or(0);
let mut reader = WalReader::open(&wal_path, config.wal.clone())?;
let replay = reader.replay()?;
let mut mem = MemTable::new();
let entries: Vec<_> = if let Some(start_lsn) = checkpoint_lsn {
replay
.entries
.into_iter()
.filter(|entry| entry.lsn > start_lsn)
.collect()
} else {
replay.entries
};
let mut last_lsn = apply_wal_replay(&mut mem, &entries);
if let Some(start_lsn) = checkpoint_lsn {
last_lsn = last_lsn.max(start_lsn);
}
let next = last_lsn.saturating_add(1).max(1);
for warning in &replay.warnings {
warn!(warning = %warning, "WAL recovery warning");
}
if let Some(reason) = &replay.stop_reason {
warn!(stop_reason = %reason, "WAL recovery stopped early");
}
let recovery = RecoveryResult {
entries_recovered: entries.len(),
last_lsn,
warnings: replay.warnings,
stop_reason: replay.stop_reason,
checkpoint_lsn,
};
let duration_ms = start.elapsed().as_millis() as u64;
info!(
entries_recovered = recovery.entries_recovered,
checkpoint_lsn = ?recovery.checkpoint_lsn,
duration_ms,
"WAL recovery completed"
);
(
WalWriter::open(&wal_path, config.wal.clone())?,
mem,
next,
recovery,
last_checkpoint_ms,
)
} else {
(
WalWriter::create(&wal_path, config.wal.clone(), 1, 1)?,
MemTable::new(),
1,
RecoveryResult {
entries_recovered: 0,
last_lsn: 0,
warnings: Vec::new(),
stop_reason: None,
checkpoint_lsn: None,
},
0,
)
};
let wal_used_bytes = wal_writer.used_bytes();
let next_sstable_id = next_sstable_id_from_dir(&sst_dir)?;
let levels = load_sstable_levels(&sst_dir, config.compaction.max_levels)?;
let store = Self {
wal: RwLock::new(wal_writer),
active_memtable: RwLock::new(recovered),
immutable_memtables: RwLock::new(VecDeque::new()),
levels: RwLock::new(levels),
buffer_pool: BufferPool::new(config.buffer_pool),
metrics: Arc::clone(&metrics),
ts_oracle: TimestampOracle::new(next_ts),
txn_manager: LsmTxnManager::default(),
commit_lock: Mutex::new(()),
next_sstable_id: AtomicU64::new(next_sstable_id),
wal_used_bytes: AtomicU64::new(wal_used_bytes),
last_checkpoint_ms: AtomicU64::new(last_checkpoint_ms),
data_dir,
sst_dir,
wal_path,
config,
};
store.refresh_memtable_size_metrics();
Ok((store, recovery))
}
fn migrate_legacy_wal(
wal_path: &Path,
checkpoint_path: &Path,
config: &WalConfig,
) -> Result<()> {
let mut reader = WalReader::open_allow_legacy(wal_path, config.clone())?;
let replay = reader.replay()?;
for warning in &replay.warnings {
warn!(warning = %warning, "Legacy WAL replay warning during migration");
}
if let Some(reason) = &replay.stop_reason {
warn!(
stop_reason = %reason,
"Legacy WAL replay stopped early during migration"
);
}
let entries = replay.entries;
let first_lsn = entries.first().map(|entry| entry.lsn).unwrap_or(1);
let temp_path = wal_path.with_extension("wal.migrate");
let backup_path = wal_path.with_extension("wal.bak");
if temp_path.exists() {
fs::remove_file(&temp_path)?;
}
let mut writer = WalWriter::create(&temp_path, config.clone(), 1, first_lsn)?;
for entry in &entries {
writer.append(entry)?;
}
writer.force_sync()?;
drop(writer);
if backup_path.exists() {
fs::remove_file(&backup_path)?;
}
fs::rename(wal_path, &backup_path)?;
fs::rename(&temp_path, wal_path)?;
let created_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let meta = CheckpointMeta::new(0, created_at);
save_checkpoint_meta(checkpoint_path, &meta)?;
info!(
entries_migrated = entries.len(),
backup = ?backup_path,
"Legacy WAL migration completed"
);
Ok(())
}
pub fn flush(&self) -> Result<()> {
let old = {
let mut guard = self
.active_memtable
.write()
.expect("lsm active_memtable lock poisoned");
std::mem::take(&mut *guard)
};
let imm = Arc::new(old.freeze());
{
let mut queue = self
.immutable_memtables
.write()
.expect("lsm immutable_memtables lock poisoned");
queue.push_back(imm);
while queue.len() > self.config.memtable.max_immutable_count {
queue.pop_front();
}
}
self.metrics.inc_memtable_flush_count();
self.refresh_memtable_size_metrics();
Ok(())
}
pub fn checkpoint(&self) -> Result<CheckpointResult> {
let _guard = self.commit_lock.lock().expect("lsm commit_lock poisoned");
let start = Instant::now();
self.flush()?;
self.persist_immutable_memtables()?;
let checkpoint_lsn = self.ts_oracle.current_timestamp();
let created_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let meta = CheckpointMeta::new(checkpoint_lsn, created_at);
let checkpoint_path = self.data_dir.join("checkpoint.meta");
save_checkpoint_meta(&checkpoint_path, &meta)?;
let mut wal = self.wal.write().expect("lsm wal lock poisoned");
let wal_bytes_reclaimed = wal.used_bytes();
let end_offset = wal.end_offset();
wal.advance_start(end_offset)?;
self.wal_used_bytes
.store(wal.used_bytes(), Ordering::Relaxed);
self.last_checkpoint_ms.store(created_at, Ordering::Relaxed);
Ok(CheckpointResult {
checkpoint_lsn,
wal_bytes_reclaimed,
duration_ms: start.elapsed().as_millis() as u64,
})
}
pub fn should_checkpoint(&self) -> bool {
if !self.config.checkpoint.auto_checkpoint {
return false;
}
let wal_used = self.wal_used_bytes.load(Ordering::Relaxed);
if wal_used <= self.config.checkpoint.wal_size_threshold {
return false;
}
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let last = self.last_checkpoint_ms.load(Ordering::Relaxed);
now_ms.saturating_sub(last) >= self.config.checkpoint.min_interval_ms
}
pub fn compact(&self) -> Result<()> {
Ok(())
}
pub fn metrics(&self) -> LsmMetricsSnapshot {
let counters = self.metrics.counters_snapshot();
let sstable_count_per_level = self
.levels
.read()
.expect("lsm levels lock poisoned")
.iter()
.map(|l| l.len())
.collect::<Vec<_>>();
let bp_stats = self.buffer_pool.stats();
let bp_total = bp_stats.hits + bp_stats.misses;
let hit_rate = if bp_total == 0 {
1.0
} else {
(bp_stats.hits as f64) / (bp_total as f64)
};
LsmMetricsSnapshot {
wal_write_bytes: counters.wal_write_bytes,
wal_sync_duration_ms: counters.wal_sync_duration_ms,
memtable_size_bytes: counters.memtable_size_bytes,
memtable_flush_count: counters.memtable_flush_count,
sstable_read_bytes: counters.sstable_read_bytes,
sstable_count_per_level,
buffer_pool_hit_rate: hit_rate,
buffer_pool_size_bytes: self.buffer_pool.current_size_bytes() as u64,
compaction_bytes_written: counters.compaction_bytes_written,
compaction_duration_ms: counters.compaction_duration_ms,
}
}
pub fn disk_usage(&self) -> u64 {
fs::metadata(&self.wal_path).map(|m| m.len()).unwrap_or(0)
}
fn sstable_path_for(&self, file_id: u64) -> PathBuf {
self.sst_dir.join(format!("{file_id}.sst"))
}
fn persist_immutable_memtables(&self) -> Result<()> {
let immutables = {
let mut guard = self
.immutable_memtables
.write()
.expect("lsm immutable_memtables lock poisoned");
std::mem::take(&mut *guard)
};
if immutables.is_empty() {
return Ok(());
}
let mut levels = self.levels.write().expect("lsm levels lock poisoned");
for mem in immutables {
let entries = mem.scan_prefix(b"", u64::MAX);
if entries.is_empty() {
continue;
}
let file_id = self.next_sstable_id.fetch_add(1, Ordering::Relaxed);
let path = self.sstable_path_for(file_id);
let mut writer = SSTableWriter::create(&path, self.config.sstable)?;
let mut first_key: Option<Key> = None;
let mut last_key: Option<Key> = None;
for (key, entry) in entries {
if first_key.is_none() {
first_key = Some(key.clone());
}
last_key = Some(key.clone());
writer.append(SSTableEntry {
key,
value: entry.value,
timestamp: entry.timestamp,
sequence: entry.sequence,
})?;
}
writer.finish()?;
let size_bytes = fs::metadata(&path)?.len();
let key_range = KeyRange {
first_key: first_key.unwrap(),
last_key: last_key.unwrap(),
};
let meta = SSTableMeta {
id: file_id,
level: 0,
size_bytes,
key_range,
};
levels[0].push(meta);
}
self.refresh_memtable_size_metrics();
Ok(())
}
fn refresh_memtable_size_metrics(&self) {
let active_bytes = self
.active_memtable
.read()
.expect("lsm active_memtable lock poisoned")
.memory_usage_bytes() as u64;
let imm_bytes = self
.immutable_memtables
.read()
.expect("lsm immutable_memtables lock poisoned")
.iter()
.map(|t| t.memory_usage_bytes() as u64)
.sum::<u64>();
self.metrics
.set_memtable_size_bytes(active_bytes.saturating_add(imm_bytes));
}
fn get_visible_at(
&self,
key: &[u8],
read_timestamp: u64,
) -> Option<crate::lsm::memtable::MemTableEntry> {
if let Some(e) = self
.active_memtable
.read()
.expect("lsm active_memtable lock poisoned")
.get(key, read_timestamp)
{
return Some(e);
}
let imm = self
.immutable_memtables
.read()
.expect("lsm immutable_memtables lock poisoned");
for t in imm.iter().rev() {
if let Some(e) = t.get(key, read_timestamp) {
return Some(e);
}
}
let levels = self.levels.read().expect("lsm levels lock poisoned");
let mut best: Option<crate::lsm::memtable::MemTableEntry> = None;
for level in levels.iter() {
for meta in level.iter() {
let path = self.sstable_path_for(meta.id);
let Ok(mut reader) = SSTableReader::open(&path) else {
continue;
};
let Ok(found) = reader.get_with_buffer_pool(
&self.buffer_pool,
&self.metrics,
meta.id,
key,
read_timestamp,
) else {
continue;
};
let Some(found) = found else {
continue;
};
let candidate = crate::lsm::memtable::MemTableEntry {
value: found.value,
timestamp: found.timestamp,
sequence: found.sequence,
};
let better = match &best {
None => true,
Some(cur) => {
(candidate.timestamp > cur.timestamp)
|| (candidate.timestamp == cur.timestamp
&& candidate.sequence > cur.sequence)
}
};
if better {
best = Some(candidate);
}
}
}
best
}
fn latest_timestamp(&self, key: &[u8]) -> u64 {
let mut best: Option<(u64, u64)> = None;
if let Some(e) = self.get_visible_at(key, u64::MAX) {
best = Some((e.timestamp, e.sequence));
}
match best {
Some((ts, _seq)) => ts,
None => 0,
}
}
fn scan_prefix_visible(
&self,
prefix: &[u8],
read_timestamp: u64,
) -> BTreeMap<Key, crate::lsm::memtable::MemTableEntry> {
let mut out: BTreeMap<Key, crate::lsm::memtable::MemTableEntry> = BTreeMap::new();
let active = self
.active_memtable
.read()
.expect("lsm active_memtable lock poisoned");
for (k, e) in active.scan_prefix(prefix, read_timestamp) {
out.insert(k, e);
}
let imm = self
.immutable_memtables
.read()
.expect("lsm immutable_memtables lock poisoned");
for t in imm.iter().rev() {
for (k, e) in t.scan_prefix(prefix, read_timestamp) {
match out.get(&k) {
None => {
out.insert(k, e);
}
Some(cur) => {
let better = (e.timestamp > cur.timestamp)
|| (e.timestamp == cur.timestamp && e.sequence > cur.sequence);
if better {
out.insert(k, e);
}
}
}
}
}
let levels = self.levels.read().expect("lsm levels lock poisoned");
for level in levels.iter() {
for meta in level.iter() {
let path = self.sstable_path_for(meta.id);
let Ok(mut reader) = SSTableReader::open(&path) else {
continue;
};
let Ok(entries) = reader.scan_prefix_with_buffer_pool(
&self.buffer_pool,
&self.metrics,
meta.id,
prefix,
read_timestamp,
) else {
continue;
};
for e in entries {
let k = e.key.clone();
let candidate = crate::lsm::memtable::MemTableEntry {
value: e.value,
timestamp: e.timestamp,
sequence: e.sequence,
};
match out.get(&k) {
None => {
out.insert(k, candidate);
}
Some(cur) => {
let better = (candidate.timestamp > cur.timestamp)
|| (candidate.timestamp == cur.timestamp
&& candidate.sequence > cur.sequence);
if better {
out.insert(k, candidate);
}
}
}
}
}
}
out
}
fn scan_range_visible(
&self,
start: &[u8],
end: &[u8],
read_timestamp: u64,
) -> BTreeMap<Key, crate::lsm::memtable::MemTableEntry> {
let mut out: BTreeMap<Key, crate::lsm::memtable::MemTableEntry> = BTreeMap::new();
let active = self
.active_memtable
.read()
.expect("lsm active_memtable lock poisoned");
for (k, e) in active.scan_range(start, end, read_timestamp) {
out.insert(k, e);
}
let imm = self
.immutable_memtables
.read()
.expect("lsm immutable_memtables lock poisoned");
for t in imm.iter().rev() {
for (k, e) in t.scan_range(start, end, read_timestamp) {
match out.get(&k) {
None => {
out.insert(k, e);
}
Some(cur) => {
let better = (e.timestamp > cur.timestamp)
|| (e.timestamp == cur.timestamp && e.sequence > cur.sequence);
if better {
out.insert(k, e);
}
}
}
}
}
let levels = self.levels.read().expect("lsm levels lock poisoned");
for level in levels.iter() {
for meta in level.iter() {
let path = self.sstable_path_for(meta.id);
let Ok(mut reader) = SSTableReader::open(&path) else {
continue;
};
let Ok(entries) = reader.scan_range_with_buffer_pool(
&self.buffer_pool,
&self.metrics,
meta.id,
start,
end,
read_timestamp,
) else {
continue;
};
for e in entries {
let k = e.key.clone();
let candidate = crate::lsm::memtable::MemTableEntry {
value: e.value,
timestamp: e.timestamp,
sequence: e.sequence,
};
match out.get(&k) {
None => {
out.insert(k, candidate);
}
Some(cur) => {
let better = (candidate.timestamp > cur.timestamp)
|| (candidate.timestamp == cur.timestamp
&& candidate.sequence > cur.sequence);
if better {
out.insert(k, candidate);
}
}
}
}
}
}
out
}
}
fn next_sstable_id_from_dir(dir: &Path) -> Result<u64> {
let mut max_id = 0u64;
if dir.exists() {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("sst") {
continue;
}
let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
continue;
};
let Ok(id) = stem.parse::<u64>() else {
continue;
};
max_id = max_id.max(id);
}
}
Ok(max_id.saturating_add(1))
}
fn load_sstable_levels(dir: &Path, max_levels: usize) -> Result<Vec<Vec<SSTableMeta>>> {
let mut levels = vec![Vec::new(); max_levels];
if max_levels == 0 || !dir.exists() {
return Ok(levels);
}
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("sst") {
continue;
}
let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
continue;
};
let Ok(file_id) = stem.parse::<u64>() else {
continue;
};
let reader = match SSTableReader::open(&path) {
Ok(reader) => reader,
Err(err) => {
warn!(error = %err, path = ?path, "Skipping unreadable SSTable");
continue;
}
};
let Some((first_key, last_key)) = reader.key_range() else {
continue;
};
let size_bytes = fs::metadata(&path)?.len();
let meta = SSTableMeta {
id: file_id,
level: 0,
size_bytes,
key_range: KeyRange {
first_key,
last_key,
},
};
levels[0].push(meta);
}
Ok(levels)
}
fn apply_wal_replay(mem: &mut MemTable, entries: &[crate::lsm::wal::WalEntry]) -> u64 {
let mut last = 0u64;
for e in entries {
last = last.max(e.lsn);
match &e.payload {
WalEntryPayload::Put { key, value } => {
mem.put(key.clone(), value.clone(), e.lsn, 0);
}
WalEntryPayload::Delete { key } => {
mem.delete(key.clone(), e.lsn, 0);
}
WalEntryPayload::Batch(ops) => {
let mut seq = 0u64;
for op in ops {
match op.op_type {
crate::lsm::wal::WalOpType::Put => {
let val = op.value.clone().unwrap_or_default();
mem.put(op.key.clone(), val, e.lsn, seq);
}
crate::lsm::wal::WalOpType::Delete => {
mem.delete(op.key.clone(), e.lsn, seq);
}
}
seq = seq.wrapping_add(1);
}
}
}
}
last
}
#[derive(Debug)]
pub struct LsmTransaction<'a> {
start_ts: u64,
txn_id: TxnId,
mode: TxnMode,
read_set: HashSet<Vec<u8>>,
write_set: BTreeMap<Vec<u8>, Option<Vec<u8>>>,
store: &'a LsmKV,
}
impl<'a> LsmTransaction<'a> {
fn new(store: &'a LsmKV, txn_id: TxnId, mode: TxnMode, start_ts: u64) -> Self {
Self {
start_ts,
txn_id,
mode,
read_set: HashSet::new(),
write_set: BTreeMap::new(),
store,
}
}
pub(crate) fn rollback_in_place(&mut self) -> Result<()> {
self.read_set.clear();
self.write_set.clear();
Ok(())
}
fn write_iter_prefix<'b>(
&'b self,
prefix: &'b [u8],
) -> impl Iterator<Item = (&'b Key, &'b Option<Value>)> + 'b {
let prefix_vec = prefix.to_vec();
self.write_set
.range(prefix_vec..)
.take_while(move |(k, _)| k.starts_with(prefix))
}
}
impl<'a> KVTransaction<'a> for LsmTransaction<'a> {
fn id(&self) -> TxnId {
self.txn_id
}
fn mode(&self) -> TxnMode {
self.mode
}
fn get(&mut self, key: &Key) -> Result<Option<Value>> {
if let Some(v) = self.write_set.get(key) {
return Ok(v.clone());
}
self.read_set.insert(key.clone());
let entry = self.store.get_visible_at(key, self.start_ts);
Ok(entry.and_then(|e| e.value))
}
fn put(&mut self, key: Key, value: Value) -> Result<()> {
if self.mode == TxnMode::ReadOnly {
return Err(Error::TxnReadOnly);
}
self.write_set.insert(key, Some(value));
Ok(())
}
fn delete(&mut self, key: Key) -> Result<()> {
if self.mode == TxnMode::ReadOnly {
return Err(Error::TxnReadOnly);
}
self.write_set.insert(key, None);
Ok(())
}
fn scan_prefix(
&mut self,
prefix: &[u8],
) -> Result<Box<dyn Iterator<Item = (Key, Value)> + '_>> {
let mut map: BTreeMap<Key, Option<Value>> = self
.store
.scan_prefix_visible(prefix, self.start_ts)
.into_iter()
.map(|(k, e)| (k, e.value))
.collect();
self.read_set.extend(map.keys().cloned());
let overlays: Vec<(Key, Option<Value>)> = self
.write_iter_prefix(prefix)
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (k, v) in overlays {
self.read_set.insert(k.clone());
match v {
Some(val) => {
map.insert(k, Some(val));
}
None => {
map.remove(&k);
}
}
}
let iter = map.into_iter().filter_map(|(k, v)| v.map(|vv| (k, vv)));
Ok(Box::new(iter))
}
fn scan_range(
&mut self,
start: &[u8],
end: &[u8],
) -> Result<Box<dyn Iterator<Item = (Key, Value)> + '_>> {
let mut map: BTreeMap<Key, Option<Value>> = self
.store
.scan_range_visible(start, end, self.start_ts)
.into_iter()
.map(|(k, e)| (k, e.value))
.collect();
self.read_set.extend(map.keys().cloned());
let overlays: Vec<(Key, Option<Value>)> = self
.write_set
.range(start.to_vec()..end.to_vec())
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (k, v) in overlays {
self.read_set.insert(k.clone());
match v {
Some(val) => {
map.insert(k, Some(val));
}
None => {
map.remove(&k);
}
}
}
let iter = map.into_iter().filter_map(|(k, v)| v.map(|vv| (k, vv)));
Ok(Box::new(iter))
}
fn commit_self(mut self) -> Result<()> {
if self.mode == TxnMode::ReadOnly || self.write_set.is_empty() {
return Ok(());
}
let _commit_guard = self
.store
.commit_lock
.lock()
.expect("lsm commit_lock poisoned");
for key in self.read_set.iter() {
if self.store.latest_timestamp(key) > self.start_ts {
return Err(Error::TxnConflict);
}
}
for key in self.write_set.keys() {
if self.store.latest_timestamp(key) > self.start_ts {
return Err(Error::TxnConflict);
}
}
let commit_ts = self.store.ts_oracle.next_timestamp();
let mut ops = Vec::with_capacity(self.write_set.len());
for (k, v) in self.write_set.iter() {
let op = match v {
Some(val) => WalBatchOp {
op_type: WalOpType::Put,
key: k.clone(),
value: Some(val.clone()),
},
None => WalBatchOp {
op_type: WalOpType::Delete,
key: k.clone(),
value: None,
},
};
ops.push(op);
}
{
let entry = WalEntry::batch(commit_ts, ops);
let mut wal = self.store.wal.write().expect("lsm wal lock poisoned");
let stats = wal.append_with_stats(&entry)?;
self.store.metrics.add_wal_write_bytes(stats.bytes_written);
let sync_duration_ms = if stats.sync_duration_ms == 0
&& !matches!(self.store.config.wal.sync_mode, SyncMode::EveryWrite)
{
wal.force_sync()?
} else {
stats.sync_duration_ms
};
self.store
.metrics
.add_wal_sync_duration_ms(sync_duration_ms);
self.store
.wal_used_bytes
.store(wal.used_bytes(), Ordering::Relaxed);
}
{
let active = self
.store
.active_memtable
.read()
.expect("lsm active_memtable lock poisoned");
let mut seq = 1u64;
for (k, v) in std::mem::take(&mut self.write_set) {
match v {
Some(val) => active.put(k, val, commit_ts, seq),
None => active.delete(k, commit_ts, seq),
}
seq = seq.wrapping_add(1);
}
}
self.store.refresh_memtable_size_metrics();
if self
.store
.active_memtable
.read()
.expect("lsm active_memtable lock poisoned")
.memory_usage_bytes()
>= self.store.config.memtable.flush_threshold
{
self.store.flush()?;
}
Ok(())
}
fn rollback_self(mut self) -> Result<()> {
self.write_set.clear();
Ok(())
}
}
impl<'a> TxnManager<'a, LsmTransaction<'a>> for LsmTxnManagerRef<'a> {
fn begin(&'a self, mode: TxnMode) -> Result<LsmTransaction<'a>> {
let start_ts = self.store.ts_oracle.next_timestamp();
Ok(LsmTransaction::new(
self.store,
self.allocate_txn_id(),
mode,
start_ts,
))
}
fn commit(&'a self, txn: LsmTransaction<'a>) -> Result<()> {
txn.commit_self()
}
fn rollback(&'a self, txn: LsmTransaction<'a>) -> Result<()> {
txn.rollback_self()
}
}
impl KVStore for LsmKV {
type Transaction<'a>
= LsmTransaction<'a>
where
Self: 'a;
type Manager<'a>
= LsmTxnManagerRef<'a>
where
Self: 'a;
fn txn_manager(&self) -> Self::Manager<'_> {
LsmTxnManagerRef { store: self }
}
fn begin(&self, mode: TxnMode) -> Result<Self::Transaction<'_>> {
let manager = LsmTxnManagerRef { store: self };
let start_ts = self.ts_oracle.next_timestamp();
Ok(LsmTransaction::new(
self,
manager.allocate_txn_id(),
mode,
start_ts,
))
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod kv_store {
use super::*;
fn test_config() -> LsmKVConfig {
LsmKVConfig {
wal: WalConfig {
segment_size: 4096,
max_segments: 2,
sync_mode: SyncMode::NoSync,
},
..Default::default()
}
}
fn new_test_store() -> LsmKV {
let cfg = test_config();
let data_dir = tempfile::tempdir().expect("tempdir").keep();
let sst_dir = data_dir.join("sst");
fs::create_dir_all(&sst_dir).expect("create sst dir");
let wal_path = data_dir.join("lsm.wal");
let wal = WalWriter::create(&wal_path, cfg.wal.clone(), 1, 1).expect("wal create");
let levels = vec![Vec::new(); cfg.compaction.max_levels];
LsmKV {
config: cfg,
data_dir,
sst_dir,
wal_path,
wal: RwLock::new(wal),
active_memtable: RwLock::new(MemTable::new()),
immutable_memtables: RwLock::new(VecDeque::new()),
levels: RwLock::new(levels),
buffer_pool: BufferPool::new(BufferPoolConfig::default()),
metrics: Arc::new(LsmMetrics::default()),
ts_oracle: TimestampOracle::new(1),
txn_manager: LsmTxnManager::default(),
commit_lock: Mutex::new(()),
next_sstable_id: AtomicU64::new(1),
wal_used_bytes: AtomicU64::new(0),
last_checkpoint_ms: AtomicU64::new(0),
}
}
fn new_test_store_with_sync(sync_mode: SyncMode) -> LsmKV {
let mut cfg = test_config();
cfg.wal.sync_mode = sync_mode;
let data_dir = tempfile::tempdir().expect("tempdir").keep();
let sst_dir = data_dir.join("sst");
fs::create_dir_all(&sst_dir).expect("create sst dir");
let wal_path = data_dir.join("lsm.wal");
let wal = WalWriter::create(&wal_path, cfg.wal.clone(), 1, 1).expect("wal create");
let levels = vec![Vec::new(); cfg.compaction.max_levels];
LsmKV {
config: cfg,
data_dir,
sst_dir,
wal_path,
wal: RwLock::new(wal),
active_memtable: RwLock::new(MemTable::new()),
immutable_memtables: RwLock::new(VecDeque::new()),
levels: RwLock::new(levels),
buffer_pool: BufferPool::new(BufferPoolConfig::default()),
metrics: Arc::new(LsmMetrics::default()),
ts_oracle: TimestampOracle::new(1),
txn_manager: LsmTxnManager::default(),
commit_lock: Mutex::new(()),
next_sstable_id: AtomicU64::new(1),
wal_used_bytes: AtomicU64::new(0),
last_checkpoint_ms: AtomicU64::new(0),
}
}
#[test]
fn commit_forces_fsync_across_sync_modes() {
let modes = [
SyncMode::EveryWrite,
SyncMode::BatchSync {
max_batch_size: 1024 * 1024,
max_wait_ms: 60_000,
},
SyncMode::NoSync,
];
for mode in modes {
let store = new_test_store_with_sync(mode);
crate::lsm::wal::reset_sync_calls();
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"k".to_vec(), b"v".to_vec()).unwrap();
tx.commit_self().unwrap();
assert!(
crate::lsm::wal::sync_calls() >= 1,
"expected fsync during commit for sync mode"
);
}
}
#[test]
fn commit_makes_writes_visible() {
let store = new_test_store();
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"k".to_vec(), b"v".to_vec()).unwrap();
assert_eq!(tx.get(&b"k".to_vec()).unwrap(), Some(b"v".to_vec()));
tx.commit_self().unwrap();
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"k".to_vec()).unwrap(), Some(b"v".to_vec()));
}
#[test]
fn rollback_discards_writes() {
let store = new_test_store();
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"k".to_vec(), b"v".to_vec()).unwrap();
tx.rollback_self().unwrap();
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"k".to_vec()).unwrap(), None);
}
#[test]
fn read_only_rejects_writes() {
let store = new_test_store();
let mut tx = store.begin(TxnMode::ReadOnly).unwrap();
assert!(tx.put(b"k".to_vec(), b"v".to_vec()).is_err());
}
#[test]
fn detects_write_conflict() {
let store = new_test_store();
let mut a = store.begin(TxnMode::ReadWrite).unwrap();
let mut b = store.begin(TxnMode::ReadWrite).unwrap();
a.put(b"k".to_vec(), b"v1".to_vec()).unwrap();
a.commit_self().unwrap();
b.put(b"k".to_vec(), b"v2".to_vec()).unwrap();
assert!(b.commit_self().is_err());
}
#[test]
fn scan_populates_read_set_for_conflict_detection() {
let store = new_test_store();
let mut init = store.begin(TxnMode::ReadWrite).unwrap();
init.put(b"p:a".to_vec(), b"v1".to_vec()).unwrap();
init.commit_self().unwrap();
let mut scan_tx = store.begin(TxnMode::ReadWrite).unwrap();
let got: Vec<(Key, Value)> = scan_tx.scan_prefix(b"p:").unwrap().collect();
assert_eq!(got.len(), 1);
assert_eq!(got[0].0, b"p:a".to_vec());
assert_eq!(got[0].1, b"v1".to_vec());
let mut updater = store.begin(TxnMode::ReadWrite).unwrap();
updater.put(b"p:a".to_vec(), b"v2".to_vec()).unwrap();
updater.commit_self().unwrap();
scan_tx.put(b"q:z".to_vec(), b"ok".to_vec()).unwrap();
assert!(scan_tx.commit_self().is_err());
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod txn {
use super::*;
fn test_config() -> LsmKVConfig {
LsmKVConfig {
wal: WalConfig {
segment_size: 4096,
max_segments: 2,
sync_mode: SyncMode::NoSync,
},
..Default::default()
}
}
fn new_test_store() -> LsmKV {
let cfg = test_config();
let data_dir = tempfile::tempdir().expect("tempdir").keep();
let sst_dir = data_dir.join("sst");
fs::create_dir_all(&sst_dir).expect("create sst dir");
let wal_path = data_dir.join("lsm.wal");
let wal = WalWriter::create(&wal_path, cfg.wal.clone(), 1, 1).expect("wal create");
let levels = vec![Vec::new(); cfg.compaction.max_levels];
LsmKV {
config: cfg,
data_dir,
sst_dir,
wal_path,
wal: RwLock::new(wal),
active_memtable: RwLock::new(MemTable::new()),
immutable_memtables: RwLock::new(VecDeque::new()),
levels: RwLock::new(levels),
buffer_pool: BufferPool::new(BufferPoolConfig::default()),
metrics: Arc::new(LsmMetrics::default()),
ts_oracle: TimestampOracle::new(1),
txn_manager: LsmTxnManager::default(),
commit_lock: Mutex::new(()),
next_sstable_id: AtomicU64::new(1),
wal_used_bytes: AtomicU64::new(0),
last_checkpoint_ms: AtomicU64::new(0),
}
}
#[test]
fn detects_read_write_conflict_after_get() {
let store = new_test_store();
let mut init = store.begin(TxnMode::ReadWrite).unwrap();
init.put(b"k".to_vec(), b"v1".to_vec()).unwrap();
init.commit_self().unwrap();
let mut a = store.begin(TxnMode::ReadWrite).unwrap();
assert_eq!(a.get(&b"k".to_vec()).unwrap(), Some(b"v1".to_vec()));
let mut b = store.begin(TxnMode::ReadWrite).unwrap();
b.put(b"k".to_vec(), b"v2".to_vec()).unwrap();
b.commit_self().unwrap();
a.put(b"other".to_vec(), b"x".to_vec()).unwrap();
assert!(a.commit_self().is_err());
}
#[test]
fn rollback_discards_write_set() {
let store = new_test_store();
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"k".to_vec(), b"v".to_vec()).unwrap();
tx.rollback_self().unwrap();
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"k".to_vec()).unwrap(), None);
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod methods {
use super::*;
fn test_config() -> LsmKVConfig {
LsmKVConfig {
wal: WalConfig {
segment_size: 4096,
max_segments: 2,
sync_mode: SyncMode::NoSync,
},
..Default::default()
}
}
#[test]
fn open_creates_wal_and_returns_metrics() {
let dir = tempfile::tempdir().expect("tempdir");
let (store, _recovery) = LsmKV::open_with_config(dir.path(), test_config()).expect("open");
let m = store.metrics();
assert_eq!(m.wal_write_bytes, 0);
assert_eq!(m.memtable_flush_count, 0);
assert!(store.disk_usage() > 0);
}
#[test]
fn open_replays_wal_entries() {
let dir = tempfile::tempdir().expect("tempdir");
{
let (store, _recovery) =
LsmKV::open_with_config(dir.path(), test_config()).expect("open");
let mut wal = store.wal.write().unwrap();
wal.append(&crate::lsm::wal::WalEntry::put(
10,
b"k".to_vec(),
b"v".to_vec(),
))
.unwrap();
}
let (store, _recovery) =
LsmKV::open_with_config(dir.path(), test_config()).expect("reopen");
let mut tx = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(tx.get(&b"k".to_vec()).unwrap(), Some(b"v".to_vec()));
}
#[test]
fn flush_moves_active_to_immutable() {
let dir = tempfile::tempdir().expect("tempdir");
let (store, _recovery) = LsmKV::open_with_config(dir.path(), test_config()).expect("open");
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"k".to_vec(), b"v".to_vec()).unwrap();
tx.commit_self().unwrap();
store.flush().unwrap();
let m = store.metrics();
assert_eq!(m.memtable_flush_count, 1);
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"k".to_vec()).unwrap(), Some(b"v".to_vec()));
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod write_path {
use super::*;
fn test_config() -> LsmKVConfig {
LsmKVConfig {
wal: WalConfig {
segment_size: 4096,
max_segments: 2,
sync_mode: SyncMode::NoSync,
},
memtable: MemTableConfig {
flush_threshold: 1,
..Default::default()
},
..Default::default()
}
}
#[test]
fn commit_appends_wal_and_reopen_replays() {
let dir = tempfile::tempdir().expect("tempdir");
{
let (store, _recovery) =
LsmKV::open_with_config(dir.path(), test_config()).expect("open");
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"k".to_vec(), b"v".to_vec()).unwrap();
tx.commit_self().unwrap();
}
let (store, _recovery) =
LsmKV::open_with_config(dir.path(), test_config()).expect("reopen");
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"k".to_vec()).unwrap(), Some(b"v".to_vec()));
}
#[test]
fn flush_trigger_moves_to_immutable() {
let dir = tempfile::tempdir().expect("tempdir");
let (store, _recovery) = LsmKV::open_with_config(dir.path(), test_config()).expect("open");
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"k".to_vec(), b"v".to_vec()).unwrap();
tx.commit_self().unwrap();
let metrics = store.metrics();
assert_eq!(metrics.memtable_flush_count, 1);
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"k".to_vec()).unwrap(), Some(b"v".to_vec()));
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod recovery_tests {
use super::*;
use crate::lsm::checkpoint::{save_checkpoint_meta, CheckpointMeta};
use std::io::{Read, Seek, SeekFrom, Write};
fn test_config() -> LsmKVConfig {
LsmKVConfig {
wal: WalConfig {
segment_size: 4096,
max_segments: 2,
sync_mode: SyncMode::NoSync,
},
..Default::default()
}
}
#[test]
fn recovery_uses_checkpoint_lsn_when_present() {
let dir = tempfile::tempdir().expect("tempdir");
let (store, _recovery) = LsmKV::open_with_config(dir.path(), test_config()).expect("open");
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"before".to_vec(), b"1".to_vec()).unwrap();
tx.commit_self().unwrap();
let checkpoint_lsn = store.ts_oracle.current_timestamp();
let meta = CheckpointMeta::new(checkpoint_lsn, 0);
let checkpoint_path = dir.path().join("checkpoint.meta");
save_checkpoint_meta(&checkpoint_path, &meta).unwrap();
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"after".to_vec(), b"2".to_vec()).unwrap();
tx.commit_self().unwrap();
let (store, recovery) = LsmKV::open_with_config(dir.path(), test_config()).expect("reopen");
assert!(recovery.checkpoint_lsn.is_some());
assert_eq!(recovery.entries_recovered, 1);
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"after".to_vec()).unwrap(), Some(b"2".to_vec()));
}
#[test]
fn recovery_falls_back_when_checkpoint_missing() {
let dir = tempfile::tempdir().expect("tempdir");
let (store, _recovery) = LsmKV::open_with_config(dir.path(), test_config()).expect("open");
let mut tx = store.begin(TxnMode::ReadWrite).unwrap();
tx.put(b"k".to_vec(), b"v".to_vec()).unwrap();
tx.commit_self().unwrap();
let (store, recovery) = LsmKV::open_with_config(dir.path(), test_config()).expect("reopen");
assert!(recovery.checkpoint_lsn.is_none());
assert!(recovery.entries_recovered >= 1);
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"k".to_vec()).unwrap(), Some(b"v".to_vec()));
}
#[test]
fn recovery_stops_on_corrupted_entry() {
let dir = tempfile::tempdir().expect("tempdir");
let wal_path = dir.path().join("lsm.wal");
let wal_cfg = WalConfig {
segment_size: 4096,
max_segments: 1,
sync_mode: SyncMode::NoSync,
};
let mut writer = WalWriter::create(&wal_path, wal_cfg.clone(), 1, 1).unwrap();
let e1 = WalEntry::put(1, b"a".to_vec(), b"1".to_vec());
let e2 = WalEntry::put(2, b"b".to_vec(), b"2".to_vec());
let _off1 = writer.append(&e1).unwrap();
let off2 = writer.append(&e2).unwrap();
let e2_bytes = e2.encode().unwrap();
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&wal_path)
.unwrap();
let corrupt_offset = off2 + (e2_bytes.len() as u64).saturating_sub(1);
file.seek(SeekFrom::Start(corrupt_offset)).unwrap();
let mut buf = [0u8; 1];
file.read_exact(&mut buf).unwrap();
buf[0] ^= 0xFF;
file.seek(SeekFrom::Start(corrupt_offset)).unwrap();
file.write_all(&buf).unwrap();
let cfg = LsmKVConfig {
wal: wal_cfg,
..Default::default()
};
let (store, recovery) = LsmKV::open_with_config(dir.path(), cfg).expect("reopen");
assert!(recovery.stop_reason.is_some());
assert_eq!(recovery.entries_recovered, 1);
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"a".to_vec()).unwrap(), Some(b"1".to_vec()));
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod read_path {
use super::*;
use crate::compaction::leveled::KeyRange;
use crate::lsm::sstable::{SSTableEntry, SSTableWriter};
fn test_config() -> LsmKVConfig {
LsmKVConfig {
wal: WalConfig {
segment_size: 4096,
max_segments: 2,
sync_mode: SyncMode::NoSync,
},
..Default::default()
}
}
#[test]
fn reads_from_sstable_via_buffer_pool() {
let dir = tempfile::tempdir().expect("tempdir");
let (store, _recovery) = LsmKV::open_with_config(dir.path(), test_config()).expect("open");
let file_id = 1u64;
let path = store.sstable_path_for(file_id);
let mut writer = SSTableWriter::create(&path, store.config.sstable).expect("sst create");
writer
.append(SSTableEntry {
key: b"k".to_vec(),
value: Some(b"v".to_vec()),
timestamp: 0,
sequence: 1,
})
.unwrap();
writer.finish().unwrap();
let size_bytes = fs::metadata(&path).unwrap().len();
let meta = SSTableMeta {
id: file_id,
level: 0,
size_bytes,
key_range: KeyRange {
first_key: b"k".to_vec(),
last_key: b"k".to_vec(),
},
};
store.levels.write().unwrap()[0].push(meta);
let before = store.buffer_pool.stats();
let mut ro = store.begin(TxnMode::ReadOnly).unwrap();
assert_eq!(ro.get(&b"k".to_vec()).unwrap(), Some(b"v".to_vec()));
assert_eq!(ro.get(&b"k".to_vec()).unwrap(), Some(b"v".to_vec()));
let after = store.buffer_pool.stats();
assert!(after.misses > before.misses);
assert!(after.hits > before.hits);
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod integration;