use std::collections::HashSet;
use std::fs::create_dir_all;
#[cfg(not(target_os = "windows"))]
use std::fs::File;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use crate::batch::Batch;
use crate::bplustree::tree::DiskBPlusTree;
use crate::checkpoint::{CheckpointMetadata, DatabaseCheckpoint};
use crate::commit::{CommitEnv, CommitPipeline};
use crate::compaction::compactor::{CompactionOptions, Compactor};
use crate::compaction::CompactionStrategy;
use crate::error::{BackgroundErrorHandler, BackgroundErrorReason, Result};
use crate::levels::{write_manifest_to_disk, LevelManifest, ManifestChangeSet};
use crate::lockfile::LockFile;
use crate::memtable::{ImmutableEntry, ImmutableMemtables, MemTable};
use crate::snapshot::SnapshotTracker;
use crate::sstable::table::Table;
use crate::stall::{StallCounts, StallThresholds, WriteStallCountProvider};
use crate::task::TaskManager;
use crate::transaction::{Mode, Transaction, TransactionOptions};
use crate::vlog::{VLog, ValueLocation, ValuePointer};
use crate::wal::recovery::{repair_corrupted_wal_segment, replay_wal};
use crate::wal::{self, cleanup_old_segments, Wal, WalManager};
use crate::{
BytewiseComparator,
Comparator,
Error,
FilterPolicy,
LSMIterator,
Options,
TimestampComparator,
VLogChecksumLevel,
Value,
WalRecoveryMode,
};
pub trait CompactionOperations: Send + Sync {
fn compact_memtable(&self) -> Result<()>;
fn compact(&self, strategy: Arc<dyn CompactionStrategy>) -> Result<()>;
fn error_handler(&self) -> Arc<BackgroundErrorHandler>;
fn has_pending_immutables(&self) -> bool;
}
pub(crate) struct CoreInner {
pub(crate) active_memtable: Arc<RwLock<Arc<MemTable>>>,
pub(crate) immutable_memtables: Arc<RwLock<ImmutableMemtables>>,
pub(crate) flushed_history: Arc<RwLock<Option<Arc<MemTable>>>>,
pub level_manifest: Arc<RwLock<LevelManifest>>,
pub opts: Arc<Options>,
pub(crate) snapshot_tracker: SnapshotTracker,
pub(crate) vlog: Option<Arc<VLog>>,
pub(crate) wal: WalManager,
pub(crate) versioned_index: Option<Arc<parking_lot::RwLock<DiskBPlusTree>>>,
pub(crate) lockfile: Mutex<LockFile>,
pub(crate) error_handler: Arc<BackgroundErrorHandler>,
pub(crate) visible_seq_num: Arc<AtomicU64>,
}
impl CoreInner {
pub(crate) fn new(opts: Arc<Options>) -> Result<Self> {
let mut lockfile = LockFile::new(&opts.path);
lockfile.acquire()?;
let immutable_memtables = Arc::new(RwLock::new(ImmutableMemtables::default()));
let manifest = LevelManifest::new(Arc::clone(&opts))?;
let manifest_log_number = manifest.get_log_number();
let wal_path = opts.wal_dir();
let wal_instance =
Wal::open_with_min_log_number(&wal_path, manifest_log_number, wal::Options::default())?;
let visible_seq_num = Arc::new(AtomicU64::new(0));
let initial_memtable = Arc::new(MemTable::new(opts.max_memtable_size, 0));
initial_memtable.set_wal_number(wal_instance.get_active_log_number());
let active_memtable = Arc::new(RwLock::new(initial_memtable));
let level_manifest = Arc::new(RwLock::new(manifest));
let versioned_index = if opts.enable_versioned_index {
let versioned_index_dir = opts.versioned_index_dir();
let versioned_index_path = versioned_index_dir.join("index.bpt");
let comparator =
Arc::new(TimestampComparator::new(Arc::new(BytewiseComparator::default())));
let tree = DiskBPlusTree::disk(&versioned_index_path, comparator)?;
Some(Arc::new(parking_lot::RwLock::new(tree)))
} else {
None
};
let vlog = if opts.enable_vlog {
Some(Arc::new(VLog::new(Arc::clone(&opts))?))
} else {
None
};
Ok(Self {
opts,
active_memtable,
immutable_memtables,
level_manifest,
snapshot_tracker: SnapshotTracker::new(),
vlog,
wal: WalManager::new(wal_instance),
versioned_index,
lockfile: Mutex::new(lockfile),
error_handler: Arc::new(BackgroundErrorHandler::new()),
visible_seq_num,
flushed_history: Arc::new(RwLock::new(None)),
})
}
pub(crate) fn immutable_count(&self) -> usize {
self.immutable_memtables.read().map(|imm| imm.iter().count()).unwrap_or(0)
}
pub(crate) fn l0_file_count(&self) -> usize {
self.level_manifest
.read()
.map(|m| m.levels.get_levels().first().map(|l| l.tables.len()).unwrap_or(0))
.unwrap_or(0)
}
pub(crate) fn check_keys_conflict<'a, I>(&self, keys: I, start_seq: u64) -> Result<()>
where
I: Iterator<Item = &'a [u8]>,
{
let memtable = self.active_memtable.read()?;
let immutables = self.immutable_memtables.read()?;
let history = self.flushed_history.read()?;
let earliest_seq = if let Some(ref h) = *history {
h.earliest_seq()
} else if let Some(oldest) = immutables.first() {
oldest.memtable.earliest_seq()
} else {
memtable.earliest_seq()
};
if start_seq < earliest_seq {
return Err(Error::TransactionRetry);
}
for key in keys {
if let Some((ikey, _)) = memtable.get(key, None) {
if ikey.seq_num() > start_seq {
return Err(Error::TransactionWriteConflict);
}
continue;
}
let mut found_in_immutable = false;
for entry in immutables.iter().rev() {
if let Some((ikey, _)) = entry.memtable.get(key, None) {
if ikey.seq_num() > start_seq {
return Err(Error::TransactionWriteConflict);
}
found_in_immutable = true;
break;
}
}
if found_in_immutable {
continue;
}
if let Some(ref flushed) = *history {
if let Some((ikey, _)) = flushed.get(key, None) {
if ikey.seq_num() > start_seq {
return Err(Error::TransactionWriteConflict);
}
}
}
}
Ok(())
}
fn flush_immutable_to_sst(
&self,
memtable: Arc<MemTable>,
table_id: u64,
wal_number: u64,
) -> Result<Arc<Table>> {
let collect_bptree = self.versioned_index.is_some();
let (table, bptree_entries) = memtable
.flush(
table_id,
Arc::clone(&self.opts),
self.vlog.as_ref(),
self.opts.vlog_value_threshold,
collect_bptree,
)
.map_err(|e| {
Error::Other(format!(
"Failed to flush memtable to SST table_id={}: {}",
table_id, e
))
})?;
log::debug!("Created SST table_id={}, file_size={}", table.id, table.file_size);
if let Some(ref versioned_index) = self.versioned_index {
let mut vi_guard = versioned_index.write();
for (encoded_key, encoded_value) in &bptree_entries {
vi_guard.insert(encoded_key.clone(), encoded_value.clone())?;
}
vi_guard.sync()?;
log::debug!(
"Versioned index updated: {} entries written for table_id={}",
bptree_entries.len(),
table_id
);
}
let mut changeset = ManifestChangeSet::default();
changeset.new_tables.push((0, Arc::clone(&table)));
changeset.log_number = Some(wal_number + 1);
log::debug!(
"Changeset prepared: table_id={}, log_number={} (WAL #{:020} flushed)",
table_id,
wal_number + 1,
wal_number
);
let mut manifest = self.level_manifest.write()?;
let mut memtable_lock = self.immutable_memtables.write()?;
let mut history_lock = self.flushed_history.write()?;
let rollback = manifest.apply_changeset(&changeset)?;
if let Err(e) = write_manifest_to_disk(&manifest) {
manifest.revert_changeset(rollback);
let error = Error::Other(format!(
"Failed to atomically update manifest: table_id={}, log_number={}: {}",
table_id,
wal_number + 1,
e
));
self.error_handler.set_error(error.clone(), BackgroundErrorReason::ManifestWrite);
return Err(error);
}
memtable_lock.remove(table_id);
*history_lock = Some(memtable);
log::info!(
"Manifest updated atomically: table_id={}, log_number={}, last_sequence={}",
table_id,
wal_number + 1,
manifest.get_last_sequence()
);
let min_oldest_vlog = manifest.min_oldest_vlog_file_id();
cleanup_vlog_and_index(&self.vlog, &self.versioned_index, min_oldest_vlog, "flush");
Ok(table)
}
pub(crate) fn rotate_memtable(&self) -> Result<()> {
let mut active_memtable = self.active_memtable.write()?;
if active_memtable.is_empty() {
return Ok(());
}
log::debug!("rotate_memtable: rotating memtable size={}", active_memtable.size());
let (flushed_wal_number, new_wal_number) = {
let mut wal_guard = self.wal.write();
let old_log_number = wal_guard.get_active_log_number();
wal_guard.rotate().map_err(|e| {
Error::Other(format!("Failed to rotate WAL before memtable rotation: {}", e))
})?;
let new_log_number = wal_guard.get_active_log_number();
drop(wal_guard);
log::debug!(
"WAL rotated during memtable rotation: {} -> {}",
old_log_number,
new_log_number
);
(old_log_number, new_log_number)
};
let earliest_seq = self.visible_seq_num.load(Ordering::Acquire);
let flushed_memtable = std::mem::replace(
&mut *active_memtable,
Arc::new(MemTable::new(self.opts.max_memtable_size, earliest_seq)),
);
active_memtable.set_wal_number(new_wal_number);
let table_id = self.level_manifest.read()?.next_table_id();
let mut immutable_memtables = self.immutable_memtables.write()?;
immutable_memtables.add(table_id, flushed_wal_number, Arc::clone(&flushed_memtable));
drop(active_memtable);
drop(immutable_memtables);
log::debug!(
"rotate_memtable: completed rotation, table_id={}, wal_number={}",
table_id,
flushed_wal_number
);
Ok(())
}
fn flush_oldest_immutable_to_sst(&self) -> Result<Option<Arc<Table>>> {
let entry = {
let guard = self.immutable_memtables.read()?;
guard.first().cloned()
};
let entry = match entry {
Some(e) => e,
None => {
log::debug!("flush_oldest_immutable_to_sst: no immutables to flush");
return Ok(None);
}
};
if entry.memtable.is_empty() {
let mut guard = self.immutable_memtables.write()?;
guard.remove(entry.table_id);
log::debug!(
"flush_oldest_immutable_to_sst: skipped empty memtable table_id={}",
entry.table_id
);
return Ok(None);
}
log::debug!(
"flush_oldest_immutable_to_sst: flushing table_id={}, wal_number={}",
entry.table_id,
entry.wal_number
);
let table = self.flush_immutable_to_sst(
Arc::clone(&entry.memtable),
entry.table_id,
entry.wal_number,
)?;
let wal_dir = self.wal.read().get_dir_path().to_path_buf();
let min_wal_to_keep = entry.wal_number + 1;
tokio::spawn(async move {
match cleanup_old_segments(&wal_dir, min_wal_to_keep) {
Ok(count) if count > 0 => {
log::info!(
"Cleaned up {} old WAL segments (min_wal_to_keep={})",
count,
min_wal_to_keep
);
}
Ok(_) => {}
Err(e) => {
log::warn!("Failed to clean up old WAL segments: {}", e);
}
}
});
log::debug!(
"flush_oldest_immutable_to_sst: flushed table_id={}, file_size={}",
table.id,
table.file_size
);
Ok(Some(table))
}
pub(crate) fn flush_all_immutables_sync(&self) -> Result<()> {
let mut count = 0;
while self.flush_oldest_immutable_to_sst()?.is_some() {
count += 1;
}
if count > 0 {
log::debug!("flush_all_immutables_sync: flushed {} immutable memtables", count);
}
Ok(())
}
fn flush_memtable_and_update_manifest(
&self,
flushed_wal_number: Option<u64>,
) -> Result<Option<Arc<Table>>> {
let mut active_memtable = self.active_memtable.write()?;
if active_memtable.is_empty() {
return Ok(None);
}
let table_id = self.level_manifest.read()?.next_table_id();
let mut immutable_memtables = self.immutable_memtables.write()?;
let current_wal_number = self.wal.read().get_active_log_number();
let earliest_seq = self.visible_seq_num.load(Ordering::Acquire);
let flushed_memtable = std::mem::replace(
&mut *active_memtable,
Arc::new(MemTable::new(self.opts.max_memtable_size, earliest_seq)),
);
active_memtable.set_wal_number(current_wal_number);
let memtable_wal_number = flushed_memtable.get_wal_number();
immutable_memtables.add(table_id, memtable_wal_number, Arc::clone(&flushed_memtable));
drop(active_memtable);
drop(immutable_memtables);
let wal_that_was_flushed = match flushed_wal_number {
Some(num) => num,
None => {
memtable_wal_number
}
};
let table = self.flush_immutable_to_sst(
Arc::clone(&flushed_memtable),
table_id,
wal_that_was_flushed,
)?;
Ok(Some(table))
}
fn flush_all_memtables_for_shutdown(&self) -> Result<()> {
log::info!("Flushing all memtables for shutdown...");
let immutables_to_flush: Vec<ImmutableEntry> = {
let immutable_guard = self.immutable_memtables.read()?;
immutable_guard.iter().cloned().collect()
};
let immutable_count = immutables_to_flush.len();
if immutable_count > 0 {
log::info!("Flushing {} immutable memtable(s) first (older data)", immutable_count);
}
let mut flushed_count = 0;
for entry in immutables_to_flush {
if entry.memtable.is_empty() {
let mut immutable_guard = self.immutable_memtables.write()?;
immutable_guard.remove(entry.table_id);
log::debug!("Skipped empty immutable memtable: table_id={}", entry.table_id);
continue;
}
self.flush_immutable_to_sst(
Arc::clone(&entry.memtable),
entry.table_id,
entry.wal_number,
)?;
flushed_count += 1;
log::debug!(
"Flushed immutable memtable {}/{}: table_id={}, wal_number={}",
flushed_count,
immutable_count,
entry.table_id,
entry.wal_number
);
}
if flushed_count > 0 {
log::info!("Flushed {} immutable memtable(s) successfully", flushed_count);
}
let active_memtable = self.active_memtable.read()?;
let active_size = active_memtable.size();
let active_is_empty = active_memtable.is_empty();
drop(active_memtable);
if !active_is_empty {
log::info!("Flushing active memtable last (newest data): size={}", active_size);
match self.flush_memtable_and_update_manifest(None)? {
Some(table) => {
log::info!(
"Active memtable flushed: table_id={}, file_size={}",
table.id,
table.file_size
);
}
None => {
log::debug!("Active memtable was empty, skipped flush");
}
}
} else {
log::debug!("Active memtable is empty, skipping flush");
if flushed_count > 0 {
let current_wal = self.wal.read().get_active_log_number();
let changeset = ManifestChangeSet {
log_number: Some(current_wal + 1),
..Default::default()
};
let mut manifest = self.level_manifest.write()?;
let rollback = manifest.apply_changeset(&changeset)?;
if let Err(e) = write_manifest_to_disk(&manifest) {
manifest.revert_changeset(rollback);
let error = Error::Other(format!(
"Failed to update manifest log_number after immutable flush: {}",
e
));
self.error_handler
.set_error(error.clone(), BackgroundErrorReason::ManifestWrite);
return Err(error);
}
log::debug!(
"Updated manifest log_number to {} after immutable flushes",
current_wal + 1
);
}
}
log::info!("All memtables flushed successfully for shutdown");
Ok(())
}
fn cleanup_orphaned_sst_files(&self) -> Result<()> {
let sstable_dir = self.opts.sstable_dir();
if !sstable_dir.exists() {
return Ok(());
}
let manifest = self.level_manifest.read()?;
let live_tables = manifest.get_all_tables();
let live_table_ids: HashSet<u64> = live_tables.keys().copied().collect();
drop(manifest);
let entries = std::fs::read_dir(&sstable_dir)?;
let mut removed_count = 0;
for entry in entries {
let entry = entry?;
let filename = entry.file_name();
let filename_str = filename.to_string_lossy();
if filename_str.ends_with(".sst") && filename_str.len() == 24 {
if let Ok(table_id) = filename_str[..20].parse::<u64>() {
if !live_table_ids.contains(&table_id) {
let path = entry.path();
match std::fs::remove_file(&path) {
Ok(_) => {
removed_count += 1;
log::info!("Removed orphaned SST file: table_id={}", table_id);
}
Err(e) => {
log::warn!(
"Failed to remove orphaned SST table_id={}: {}",
table_id,
e
);
}
}
}
}
}
}
if removed_count > 0 {
log::info!("Cleaned up {} orphaned SST files", removed_count);
} else {
log::debug!("No orphaned SST files found");
}
Ok(())
}
fn cleanup_orphaned_vlog_files(&self) -> Result<()> {
if self.vlog.is_none() {
return Ok(()); }
let manifest = self.level_manifest.read()?;
let min_oldest_vlog = manifest.min_oldest_vlog_file_id();
if min_oldest_vlog == 0 {
log::debug!("No SSTs with VLog references found, skipping VLog orphan cleanup");
return Ok(());
}
log::info!("Cleaning up orphaned VLog files below min_oldest_vlog={}", min_oldest_vlog);
cleanup_vlog_and_index(&self.vlog, &self.versioned_index, min_oldest_vlog, "startup");
Ok(())
}
pub(crate) fn resolve_value(&self, value: &[u8]) -> Result<Value> {
let location = ValueLocation::decode(value)?;
location.resolve_value(self.vlog.as_ref())
}
}
impl CompactionOperations for CoreInner {
fn compact_memtable(&self) -> Result<()> {
self.flush_oldest_immutable_to_sst().map(|_| ())
}
fn compact(&self, strategy: Arc<dyn CompactionStrategy>) -> Result<()> {
let options = CompactionOptions::from(self);
let compactor = Compactor::new(options, strategy);
compactor.compact()?;
Ok(())
}
fn error_handler(&self) -> Arc<BackgroundErrorHandler> {
Arc::clone(&self.error_handler)
}
fn has_pending_immutables(&self) -> bool {
self.immutable_memtables.read().map(|guard| !guard.is_empty()).unwrap_or(false)
}
}
impl WriteStallCountProvider for CoreInner {
fn get_stall_counts(&self) -> StallCounts {
StallCounts {
immutable_memtables: self.immutable_count(),
l0_files: self.l0_file_count(),
}
}
}
struct LsmCommitEnv {
core: Arc<CoreInner>,
task_manager: Option<Arc<TaskManager>>,
}
impl LsmCommitEnv {
pub(crate) fn new(core: Arc<CoreInner>, task_manager: Arc<TaskManager>) -> Result<Self> {
Ok(Self {
core,
task_manager: Some(task_manager),
})
}
}
impl CommitEnv for LsmCommitEnv {
fn write(&self, batch: &Batch, seq_num: u64, sync: bool) -> Result<Batch> {
let mut processed_batch = Batch::new(seq_num);
for (_, entry, _current_seq_num, timestamp) in batch.entries_with_seq_nums()? {
let encoded_value = match &entry.value {
Some(value) => {
let value_location = ValueLocation::with_inline_value(value.clone());
Some(value_location.encode())
}
None => None,
};
processed_batch.add_record(entry.kind, entry.key.clone(), encoded_value, timestamp)?;
}
let enc_bytes = processed_batch.encode()?;
let mut wal_guard = self.core.wal.write();
wal_guard.append(&enc_bytes)?;
if sync {
wal_guard.sync()?;
}
drop(wal_guard);
Ok(processed_batch)
}
fn apply(&self, batch: &Batch) -> Result<()> {
let result = {
let active_memtable = self.core.active_memtable.read()?;
active_memtable.add(batch)
};
match result {
Ok(()) => Ok(()),
Err(Error::ArenaFull) => {
log::debug!("apply: arena full, rotating memtable");
self.core.rotate_memtable()?;
if let Some(ref task_manager) = self.task_manager {
task_manager.wake_up_memtable();
}
let active_memtable = self.core.active_memtable.read()?;
active_memtable.add(batch)
}
Err(e) => Err(e),
}
}
fn check_background_error(&self) -> Result<()> {
self.core.error_handler.check_error()
}
}
pub(crate) struct Core {
pub(crate) inner: Arc<CoreInner>,
pub(crate) commit_pipeline: Arc<CommitPipeline>,
pub(crate) task_manager: Mutex<Option<Arc<TaskManager>>>,
pub(crate) write_stall: Arc<crate::stall::WriteStallController>,
}
impl std::ops::Deref for Core {
type Target = CoreInner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl Core {
pub(crate) fn replay_wal_with_repair<F>(
wal_path: &Path,
min_wal_number: u64,
context: &str,
recovery_mode: WalRecoveryMode,
arena_size: usize,
mut flush_memtable: F,
) -> Result<(Option<u64>, Option<Arc<MemTable>>)>
where
F: FnMut(Arc<MemTable>, u64) -> Result<()>,
{
let (wal_seq_num_opt, memtables) = match replay_wal(wal_path, min_wal_number, arena_size) {
Ok(result) => result,
Err(Error::WalCorruption {
segment_id,
offset,
message,
}) => {
match recovery_mode {
WalRecoveryMode::AbsoluteConsistency => {
log::error!(
"WAL corruption detected in segment {} at offset {}: {}. \
AbsoluteConsistency mode: failing immediately without repair.",
segment_id,
offset,
message
);
return Err(Error::WalCorruption {
segment_id,
offset,
message,
});
}
WalRecoveryMode::TolerateCorruptedWithRepair => {
log::warn!(
"Detected WAL corruption in segment {} at offset {}: {}. Attempting repair...",
segment_id,
offset,
message
);
if let Err(repair_err) = repair_corrupted_wal_segment(wal_path, segment_id)
{
log::error!("Failed to repair WAL segment: {repair_err}");
return Err(Error::Other(format!(
"{context} failed: WAL segment {segment_id} is corrupted and could not be repaired. {repair_err}"
)));
}
match replay_wal(wal_path, min_wal_number, arena_size) {
Ok(result) => result,
Err(Error::WalCorruption {
segment_id: seg_id,
offset: off,
message,
}) => {
return Err(Error::Other(format!(
"{context} failed: WAL segment {seg_id} still corrupted at offset {off} after repair: {message}"
)));
}
Err(retry_err) => {
return Err(Error::Other(format!(
"{context} failed: WAL replay failed after repair. {retry_err}"
)));
}
}
}
}
}
Err(e) => return Err(e),
};
if memtables.is_empty() {
return Ok((None, None));
}
let memtable_count = memtables.len();
if memtable_count > 1 {
log::info!("Recovery: flushing {} intermediate memtables to SST", memtable_count - 1);
for (memtable, wal_number) in memtables.iter().take(memtable_count - 1) {
if !memtable.is_empty() {
flush_memtable(Arc::clone(memtable), *wal_number)?;
}
}
}
let (last_memtable, last_wal_number) = memtables.into_iter().last().unwrap();
let entry_count = {
let mut iter = last_memtable.iter();
let mut count = 0;
if iter.seek_first().unwrap_or(false) {
count += 1;
while iter.next().unwrap_or(false) {
count += 1;
}
}
count
};
log::info!(
"Recovery: setting last memtable (wal={}) as active with {} entries",
last_wal_number,
entry_count
);
Ok((wal_seq_num_opt, Some(last_memtable)))
}
pub(crate) fn new(opts: Arc<Options>) -> Result<Self> {
log::info!("=== Starting LSM tree initialization ===");
log::info!("Database path: {:?}", opts.path);
let inner = Arc::new(CoreInner::new(Arc::clone(&opts))?);
let thresholds = StallThresholds {
memtable_limit: opts.memtable_stall_threshold,
l0_file_limit: opts.l0_stall_threshold,
};
let write_stall = Arc::new(crate::stall::WriteStallController::new(
Arc::clone(&inner) as Arc<dyn WriteStallCountProvider>,
thresholds,
));
let task_manager = Arc::new(TaskManager::new(
Arc::clone(&inner) as Arc<dyn CompactionOperations>,
Arc::clone(&opts),
Arc::clone(&write_stall),
));
let commit_env =
Arc::new(LsmCommitEnv::new(Arc::clone(&inner), Arc::clone(&task_manager))?);
let commit_pipeline = CommitPipeline::new(
commit_env,
Arc::clone(&inner.visible_seq_num),
Arc::clone(&write_stall),
);
let wal_path = opts.wal_dir();
let min_wal_number = inner.level_manifest.read()?.get_log_number();
let manifest_last_seq = inner.level_manifest.read()?.get_last_sequence();
log::info!(
"Manifest state: log_number={}, last_sequence={}",
min_wal_number,
manifest_last_seq
);
let (wal_seq_num_opt, recovered_memtable) = Self::replay_wal_with_repair(
&wal_path,
min_wal_number,
"Database startup",
opts.wal_recovery_mode,
opts.max_memtable_size,
|memtable, wal_number| {
let table_id = inner.level_manifest.read()?.next_table_id();
inner.flush_immutable_to_sst(Arc::clone(&memtable), table_id, wal_number)?;
log::info!(
"Recovery: flushed memtable to SST table_id={}, wal_number={}",
table_id,
wal_number
);
Ok(())
},
)?;
if let Some(memtable) = recovered_memtable {
let mut active_memtable = inner.active_memtable.write()?;
*active_memtable = memtable;
}
{
let active_memtable = inner.active_memtable.read()?;
let current_wal_number = inner.wal.read().get_active_log_number();
active_memtable.set_wal_number(current_wal_number);
}
let manifest_last_seq = inner.level_manifest.read()?.get_last_sequence();
let max_seq_num = match wal_seq_num_opt {
Some(wal_seq) => {
let effective = std::cmp::max(manifest_last_seq, wal_seq);
log::debug!(
"WAL replayed: manifest_last_seq={}, wal_seq={}, using max={}",
manifest_last_seq,
wal_seq,
effective
);
effective
}
None => {
log::debug!("WAL skipped or empty, using manifest_last_seq={}", manifest_last_seq);
manifest_last_seq
}
};
commit_pipeline.set_seq_num(max_seq_num);
inner.cleanup_orphaned_sst_files()?;
inner.cleanup_orphaned_vlog_files()?;
task_manager.wake_up_level();
let core = Self {
inner: Arc::clone(&inner),
commit_pipeline: Arc::clone(&commit_pipeline),
task_manager: Mutex::new(Some(task_manager)),
write_stall,
};
log::info!("=== LSM tree initialization complete ===");
Ok(core)
}
pub(crate) async fn commit(&self, batch: Batch, sync: bool) -> Result<()> {
self.commit_pipeline.commit(batch, sync).await
}
pub(crate) fn seq_num(&self) -> u64 {
self.commit_pipeline.get_visible_seq_num()
}
pub(crate) fn flush_wal(&self, sync: bool) -> Result<()> {
if sync {
self.wal.sync()?;
} else {
self.wal.flush()?;
}
Ok(())
}
pub async fn close(&self) -> Result<()> {
log::info!("Shutting down LSM tree...");
self.commit_pipeline.shutdown();
log::debug!("Commit pipeline shutdown complete");
self.write_stall.signal_shutdown();
log::debug!("Write stall shutdown signal sent");
let task_manager = self.task_manager.lock().unwrap().take();
if let Some(task_manager) = task_manager {
log::debug!("Stopping background task manager...");
task_manager.stop().await;
log::debug!("Background task manager stopped");
}
if let Some(ref vlog) = self.inner.vlog {
log::debug!("Closing VLog...");
vlog.close()?;
log::debug!("VLog closed");
}
if let Some(ref versioned_index) = self.inner.versioned_index {
log::debug!("Closing versioned index...");
versioned_index.read().close()?;
log::debug!("Versioned index closed");
}
if self.inner.opts.flush_on_close {
log::info!("Flushing all memtables on shutdown (flush_on_close=true)");
self.inner.flush_all_memtables_for_shutdown().map_err(|e| {
Error::Other(format!("Failed to flush memtables during shutdown: {}", e))
})?;
log::info!("All memtables flushed successfully on shutdown");
}
let wal_log_number = self.inner.wal.read().get_active_log_number();
log::info!("Closing WAL: active_log_number={}", wal_log_number);
let mut wal_guard = self.inner.wal.write();
wal_guard.close().map_err(|e| Error::Other(format!("Failed to close WAL: {}", e)))?;
log::debug!("WAL #{:020} closed and synced", wal_log_number);
drop(wal_guard);
let wal_dir = self.inner.wal.read().get_dir_path().to_path_buf();
let min_wal_to_keep = self.inner.level_manifest.read()?.get_log_number();
log::debug!("Cleaning up obsolete WAL files (min_wal_to_keep={})", min_wal_to_keep);
match cleanup_old_segments(&wal_dir, min_wal_to_keep) {
Ok(count) if count > 0 => {
log::info!("Cleaned up {} obsolete WAL files during shutdown", count);
}
Ok(_) => {
log::debug!("No obsolete WAL files to clean up");
}
Err(e) => {
log::warn!("Failed to clean up WAL files during shutdown: {}", e);
}
}
log::debug!("Syncing directory structure...");
sync_directory_structure(&self.inner.opts).map_err(|e| {
Error::Other(format!("Failed to sync directories during shutdown: {}", e))
})?;
log::debug!("Directory sync complete");
let mut lockfile = self.inner.lockfile.lock()?;
lockfile.release()?;
let final_manifest = self.inner.level_manifest.read()?;
log::info!(
"=== LSM tree shutdown complete === log_number={}, last_sequence={}",
final_manifest.get_log_number(),
final_manifest.get_last_sequence()
);
Ok(())
}
}
#[derive(Clone)]
pub struct Tree {
pub(crate) core: Arc<Core>,
}
impl Tree {
pub(crate) fn new(opts: Arc<Options>) -> Result<Self> {
opts.validate()?;
Self::create_directory_structure(&opts)?;
let core = Core::new(Arc::clone(&opts))?;
sync_directory_structure(&opts)?;
Ok(Self {
core: Arc::new(core),
})
}
fn create_directory_structure(opts: &Options) -> Result<()> {
create_dir_all(&opts.path)?;
create_dir_all(opts.sstable_dir())?;
create_dir_all(opts.wal_dir())?;
create_dir_all(opts.manifest_dir())?;
if opts.enable_vlog {
create_dir_all(opts.vlog_dir())?;
}
if opts.enable_versioning {
create_dir_all(opts.versioned_index_dir())?;
}
Ok(())
}
pub fn begin(&self) -> Result<Transaction> {
self.begin_with_opts(TransactionOptions::new())
}
pub fn begin_with_mode(&self, mode: Mode) -> Result<Transaction> {
self.begin_with_opts(TransactionOptions::new_with_mode(mode))
}
pub fn begin_with_opts(&self, opts: TransactionOptions) -> Result<Transaction> {
let txn = Transaction::new(Arc::clone(&self.core), opts)?;
Ok(txn)
}
pub fn view(&self, f: impl FnOnce(&mut Transaction) -> Result<()>) -> Result<()> {
let mut txn = self.begin_with_mode(Mode::ReadOnly)?;
f(&mut txn)?;
Ok(())
}
pub fn create_checkpoint<P: AsRef<Path>>(
&self,
checkpoint_dir: P,
) -> Result<CheckpointMetadata> {
let checkpoint = DatabaseCheckpoint::new(Arc::clone(&self.core.inner));
checkpoint.create_checkpoint(checkpoint_dir)
}
pub fn restore_from_checkpoint<P: AsRef<Path>>(
&self,
checkpoint_dir: P,
) -> Result<CheckpointMetadata> {
let checkpoint = DatabaseCheckpoint::new(Arc::clone(&self.core.inner));
let metadata = checkpoint.restore_from_checkpoint(checkpoint_dir)?;
let new_levels = LevelManifest::new(Arc::clone(&self.core.inner.opts))?;
{
let mut levels_guard = self.core.inner.level_manifest.write()?;
*levels_guard = new_levels;
}
{
let earliest_seq = self.core.inner.visible_seq_num.load(Ordering::Acquire);
let mut active_memtable = self.core.inner.active_memtable.write()?;
*active_memtable =
Arc::new(MemTable::new(self.core.inner.opts.max_memtable_size, earliest_seq));
}
{
let mut immutable_memtables = self.core.inner.immutable_memtables.write()?;
*immutable_memtables = ImmutableMemtables::default();
}
let wal_path = self.core.inner.opts.path.join("wal");
let manifest_log_number = self.core.inner.level_manifest.read()?.get_log_number();
{
let mut wal_guard = self.core.inner.wal.write();
let new_wal = Wal::open_with_min_log_number(
&wal_path,
manifest_log_number,
wal::Options::default(),
)?;
*wal_guard = new_wal;
}
let (wal_seq_num_opt, recovered_memtable) = Core::replay_wal_with_repair(
&wal_path,
manifest_log_number,
"Database restore",
self.core.inner.opts.wal_recovery_mode,
self.core.inner.opts.max_memtable_size,
|memtable, wal_number| {
let table_id = self.core.inner.level_manifest.read()?.next_table_id();
self.core.inner.flush_immutable_to_sst(
Arc::clone(&memtable),
table_id,
wal_number,
)?;
log::info!(
"Restore: flushed memtable to SST table_id={}, wal_number={}",
table_id,
wal_number
);
Ok(())
},
)?;
if let Some(memtable) = recovered_memtable {
let mut active_memtable = self.core.inner.active_memtable.write()?;
*active_memtable = memtable;
}
{
let active_memtable = self.core.inner.active_memtable.read()?;
let current_wal_number = self.core.inner.wal.read().get_active_log_number();
active_memtable.set_wal_number(current_wal_number);
}
let manifest_last_seq = self.core.inner.level_manifest.read()?.get_last_sequence();
let max_seq_num = match wal_seq_num_opt {
Some(wal_seq) => std::cmp::max(manifest_last_seq, wal_seq),
None => manifest_last_seq,
};
self.core.commit_pipeline.set_seq_num(max_seq_num);
Ok(metadata)
}
pub async fn close(&self) -> Result<()> {
self.core.close().await
}
#[cfg(test)]
pub(crate) fn flush(&self) -> Result<()> {
{
let active = self.core.inner.active_memtable.read()?;
if !active.is_empty() {
drop(active); self.core.inner.rotate_memtable()?;
}
}
self.core.inner.flush_all_immutables_sync()?;
self.core.write_stall.signal_work_done();
Ok(())
}
#[cfg(test)]
pub(crate) fn compact(&self, strategy: Arc<dyn CompactionStrategy>) -> Result<()> {
self.core.inner.compact(strategy)?;
self.core.write_stall.signal_work_done();
Ok(())
}
pub fn flush_wal(&self, sync: bool) -> Result<()> {
self.core.flush_wal(sync)
}
}
impl Drop for Tree {
fn drop(&mut self) {
#[cfg(not(target_arch = "wasm32"))]
{
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let core = Arc::clone(&self.core);
handle.spawn(async move {
if let Err(err) = core.close().await {
log::error!("Error closing store: {}", err);
}
});
} else {
log::warn!("No runtime available for closing the store correctly");
}
}
}
}
pub struct TreeBuilder {
opts: Options,
}
impl TreeBuilder {
pub fn new() -> Self {
Self {
opts: Options::default(),
}
}
pub fn with_options(opts: Options) -> Self {
Self {
opts,
}
}
pub fn with_path(mut self, path: std::path::PathBuf) -> Self {
self.opts = self.opts.with_path(path);
self
}
pub fn with_block_size(mut self, size: usize) -> Self {
self.opts = self.opts.with_block_size(size);
self
}
pub fn with_block_restart_interval(mut self, interval: usize) -> Self {
self.opts = self.opts.with_block_restart_interval(interval);
self
}
pub fn with_filter_policy(mut self, policy: Option<Arc<dyn FilterPolicy>>) -> Self {
self.opts = self.opts.with_filter_policy(policy);
self
}
pub fn with_comparator(mut self, comparator: Arc<dyn Comparator>) -> Self {
self.opts = self.opts.with_comparator(comparator);
self
}
pub fn without_compression(mut self) -> Self {
self.opts = self.opts.without_compression();
self
}
pub fn with_level_count(mut self, count: u8) -> Self {
self.opts = self.opts.with_level_count(count);
self
}
pub fn with_max_memtable_size(mut self, size: usize) -> Self {
self.opts = self.opts.with_max_memtable_size(size);
self
}
pub fn with_block_cache_capacity(mut self, capacity_bytes: u64) -> Self {
self.opts = self.opts.with_block_cache_capacity(capacity_bytes);
self
}
pub fn with_index_partition_size(mut self, size: usize) -> Self {
self.opts = self.opts.with_index_partition_size(size);
self
}
pub fn with_vlog_max_file_size(mut self, size: u64) -> Self {
self.opts = self.opts.with_vlog_max_file_size(size);
self
}
pub fn with_vlog_checksum_verification(mut self, level: VLogChecksumLevel) -> Self {
self.opts = self.opts.with_vlog_checksum_verification(level);
self
}
pub fn with_enable_vlog(mut self, enable: bool) -> Self {
self.opts = self.opts.with_enable_vlog(enable);
self
}
pub fn with_vlog_value_threshold(mut self, value: usize) -> Self {
self.opts = self.opts.with_vlog_value_threshold(value);
self
}
pub fn with_versioning(mut self, enable: bool, retention_ns: u64) -> Self {
self.opts = self.opts.with_versioning(enable, retention_ns);
self
}
pub fn with_versioned_index(mut self, enable: bool) -> Self {
self.opts = self.opts.with_versioned_index(enable);
self
}
pub fn with_flush_on_close(mut self, value: bool) -> Self {
self.opts = self.opts.with_flush_on_close(value);
self
}
pub fn with_memtable_stall_threshold(mut self, value: usize) -> Self {
self.opts = self.opts.with_memtable_stall_threshold(value);
self
}
pub fn with_l0_stall_threshold(mut self, value: usize) -> Self {
self.opts = self.opts.with_l0_stall_threshold(value);
self
}
pub fn build(self) -> Result<Tree> {
Tree::new(Arc::new(self.opts))
}
pub fn build_with_options(self) -> Result<(Tree, Arc<Options>)> {
let opts = Arc::new(self.opts);
let tree = Tree::new(Arc::clone(&opts))?;
Ok((tree, opts))
}
}
impl Default for TreeBuilder {
fn default() -> Self {
Self::new()
}
}
pub(crate) fn fsync_directory<P: AsRef<Path>>(path: P) -> std::io::Result<()> {
let path = path.as_ref();
if !path.exists() {
return Ok(());
}
#[cfg(not(target_os = "windows"))]
{
let file = File::open(path)?;
debug_assert!(file.metadata()?.is_dir());
file.sync_all()?;
}
Ok(())
}
fn sync_directory_structure(opts: &Options) -> Result<()> {
fsync_directory(opts.sstable_dir()).map_err(|e| {
Error::Other(format!(
"Failed to sync SSTable directory '{}': {}",
opts.sstable_dir().display(),
e
))
})?;
fsync_directory(opts.wal_dir()).map_err(|e| {
Error::Other(format!("Failed to sync WAL directory '{}': {}", opts.wal_dir().display(), e))
})?;
fsync_directory(opts.manifest_dir()).map_err(|e| {
Error::Other(format!(
"Failed to sync manifest directory '{}': {}",
opts.manifest_dir().display(),
e
))
})?;
if opts.enable_vlog {
fsync_directory(opts.vlog_dir()).map_err(|e| {
Error::Other(format!(
"Failed to sync VLog directory '{}': {}",
opts.vlog_dir().display(),
e
))
})?;
}
if opts.enable_versioning {
fsync_directory(opts.versioned_index_dir()).map_err(|e| {
Error::Other(format!(
"Failed to sync versioned index directory '{}': {}",
opts.versioned_index_dir().display(),
e
))
})?;
}
fsync_directory(&opts.path).map_err(|e| {
Error::Other(format!("Failed to sync base directory '{}': {}", opts.path.display(), e))
})?;
Ok(())
}
pub(crate) fn cleanup_stale_versioned_index(
versioned_index: &Option<Arc<parking_lot::RwLock<DiskBPlusTree>>>,
min_valid_file_id: u32,
) -> Result<usize> {
let versioned_index = match versioned_index {
Some(idx) => idx,
None => return Ok(0),
};
let keys_to_delete: Vec<Vec<u8>> = {
let guard = versioned_index.read();
let mut stale_keys = Vec::new();
let empty: &[u8] = &[];
let iter = guard.range(empty..)?;
for entry in iter {
let (key, value) = entry?;
if let Ok(loc) = ValueLocation::decode(&value) {
if loc.is_value_pointer() {
if let Ok(ptr) = ValuePointer::decode(&loc.value) {
if ptr.file_id < min_valid_file_id {
stale_keys.push(key.to_vec());
}
}
}
}
}
stale_keys
};
if keys_to_delete.is_empty() {
return Ok(0);
}
log::debug!(
"Cleaning up {} stale versioned_index entries for VLog files < {}",
keys_to_delete.len(),
min_valid_file_id
);
let mut deleted_count = 0;
const BATCH_SIZE: usize = 100;
for batch in keys_to_delete.chunks(BATCH_SIZE) {
let mut guard = versioned_index.write();
for key in batch {
if guard.delete(key)?.is_some() {
deleted_count += 1;
}
}
}
Ok(deleted_count)
}
pub(crate) fn cleanup_vlog_and_index(
vlog: &Option<Arc<VLog>>,
versioned_index: &Option<Arc<parking_lot::RwLock<DiskBPlusTree>>>,
min_oldest_vlog: u32,
context: &str,
) {
if min_oldest_vlog == 0 {
return;
}
if let Err(e) = cleanup_stale_versioned_index(versioned_index, min_oldest_vlog) {
log::warn!("Failed to cleanup stale versioned_index entries during {}: {}", context, e);
}
if let Some(ref vlog) = vlog {
if let Err(e) = vlog.cleanup_obsolete_files(min_oldest_vlog) {
log::warn!("Failed to cleanup obsolete vlog files during {}: {}", context, e);
}
}
}