use crate::db::{partition_for_key, DBError, DBOptions, Result};
use crate::memtable::{Entry, Memtable};
use crate::merge_operator::MergeOperator;
use crate::wal::{Record, RecoveryMode, WALReader};
use bytes::Bytes;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tracing::{debug, error, warn};
pub(crate) fn recover_partitioned(
wal_path: &Path,
memtables: &[Memtable],
merge_operator: Option<&Arc<dyn MergeOperator>>,
recovery_mode: RecoveryMode,
) -> Result<()> {
let mut reader =
WALReader::open(wal_path).map_err(|e| DBError::Io(std::io::Error::other(e)))?;
let mut records_recovered = 0u64;
loop {
match reader.read_next() {
Ok(Some(record)) => {
match record {
Record::Put { key, value, seq } => {
let partition = partition_for_key(&key);
memtables[partition].put(key, value, seq);
}
Record::Delete { key, seq } => {
let partition = partition_for_key(&key);
memtables[partition].delete(key, seq);
}
Record::Batch {
operations,
base_seq,
} => {
let mut current_seq = base_seq;
for op in operations {
match op {
crate::wal::BatchOp::Put { key, value } => {
let partition = partition_for_key(&key);
memtables[partition].put(key, value, current_seq);
}
crate::wal::BatchOp::Delete { key } => {
let partition = partition_for_key(&key);
memtables[partition].delete(key, current_seq);
}
crate::wal::BatchOp::Merge { key, operand } => {
let partition = partition_for_key(&key);
apply_merge(
&memtables[partition],
key,
operand,
merge_operator,
current_seq,
);
}
}
current_seq += 1;
}
}
Record::Merge { key, operand, seq } => {
let partition = partition_for_key(&key);
apply_merge(&memtables[partition], key, operand, merge_operator, seq);
}
}
records_recovered += 1;
}
Ok(None) => {
break;
}
Err(e) => {
match recovery_mode {
RecoveryMode::Strict => {
error!(
error = %e,
records_recovered = records_recovered,
"WAL corruption detected in strict mode"
);
return Err(DBError::WalCorruption(format!(
"WAL corruption after {records_recovered} records: {e}"
)));
}
RecoveryMode::BestEffort => {
warn!(
error = %e,
records_recovered = records_recovered,
"WAL recovery stopped due to corrupt/truncated record (best-effort mode)"
);
break;
}
}
}
}
}
Ok(())
}
fn apply_merge(
mt: &Memtable,
key: Bytes,
operand: Bytes,
merge_operator: Option<&Arc<dyn MergeOperator>>,
seq: u64,
) {
let new_entry = match mt.get_entry(&key) {
Some(Entry::Value(v)) => {
if let Some(op) = merge_operator {
let operands = [operand.as_ref()];
if let Some(merged) = op.full_merge(&key, Some(&v), &operands) {
Entry::Value(Bytes::from(merged))
} else {
Entry::Merge {
base: Some(v),
operands: vec![operand],
}
}
} else {
Entry::Merge {
base: Some(v),
operands: vec![operand],
}
}
}
Some(Entry::Merge { base, mut operands }) => {
if let Some(op) = merge_operator {
let pushed = if let Some(last) = operands.last() {
if let Some(merged) = op.partial_merge(&key, last, &operand) {
operands.pop();
operands.push(Bytes::from(merged));
true
} else {
false
}
} else {
false
};
if !pushed {
operands.push(operand);
}
} else {
operands.push(operand);
}
Entry::Merge { base, operands }
}
Some(Entry::Tombstone) | None => Entry::Merge {
base: None,
operands: vec![operand],
},
};
mt.put_entry(key, new_entry, seq);
}
pub(crate) fn cleanup_old_deletions(
pending_deletions: &Arc<Mutex<Vec<(PathBuf, std::time::Instant)>>>,
) {
const DELETION_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
let mut pending = pending_deletions
.lock()
.expect("pending_deletions lock poisoned");
let now = std::time::Instant::now();
let mut still_pending = Vec::new();
let mut ready_to_delete = Vec::new();
for (path, queued_at) in pending.drain(..) {
if now.duration_since(queued_at) >= DELETION_DELAY {
ready_to_delete.push(path);
} else {
still_pending.push((path, queued_at));
}
}
*pending = still_pending;
drop(pending);
for path in ready_to_delete {
if let Err(e) = std::fs::remove_file(&path) {
warn!(path = ?path, error = %e, "Failed to delete old SSTable");
} else {
debug!(path = ?path, "Deleted old SSTable after safe delay");
}
}
}
#[allow(dead_code)] pub(crate) fn check_disk_space(options: &DBOptions) -> Result<()> {
if let Some(min_space) = options.min_disk_space_bytes {
use sysinfo::{DiskExt, System, SystemExt};
let mut sys = System::new();
sys.refresh_disks_list();
let data_dir = &options.data_dir;
if let Some(disk) = sys
.disks()
.iter()
.find(|d| data_dir.starts_with(d.mount_point()))
{
let available = disk.available_space();
if available < min_space {
return Err(DBError::DiskSpaceFull {
available,
required: min_space,
});
}
}
}
Ok(())
}