use crate::{
Table, TableId,
config::{Config, TreeType},
version::{BlobFileList, Level, Run, Version},
};
use std::{path::PathBuf, sync::Arc};
type UnreadableFiles = Vec<(PathBuf, String)>;
#[derive(Debug)]
pub struct RepairReport {
pub recovered: usize,
pub salvaged: usize,
pub unreadable: usize,
pub unreadable_files: Vec<(PathBuf, String)>,
pub method: &'static str,
pub warnings: Vec<&'static str>,
}
pub(crate) fn compute_table_checksum(
fs: &dyn crate::fs::Fs,
path: &std::path::Path,
) -> crate::Result<u128> {
let mut file = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
let mut hasher = xxhash_rust::xxh3::Xxh3Default::new();
let mut buf = vec![0u8; 256 * 1024];
loop {
let n = file.read(&mut buf)?;
if n == 0 {
break; }
let Some(chunk) = buf.get(..n) else { break };
hasher.update(chunk);
}
Ok(hasher.digest128())
}
fn highest_existing_version_id(
fs: &dyn crate::fs::Fs,
folder: &std::path::Path,
) -> crate::Result<Option<u64>> {
Ok(fs
.read_dir(folder)?
.into_iter()
.filter_map(|e| {
e.file_name
.strip_prefix('v')
.and_then(|rest| rest.parse::<u64>().ok())
})
.max())
}
fn quarantine_file(
fs: &dyn crate::fs::Fs,
table_base_folder: &std::path::Path,
src: &std::path::Path,
file_name: &str,
) -> crate::Result<PathBuf> {
let quarantine_dir = table_base_folder
.parent()
.unwrap_or(table_base_folder)
.join("repair-quarantine");
fs.create_dir_all(&quarantine_dir)?;
let dest = quarantine_dir.join(file_name);
fs.rename(src, &dest)?;
Ok(dest)
}
fn try_salvage_table(
config: &Config,
table_base_folder: &std::path::Path,
fs: &Arc<dyn crate::fs::Fs>,
table_path: &std::path::Path,
file_name: &str,
table_id: TableId,
) -> crate::Result<Option<Table>> {
let quarantined = quarantine_file(&**fs, table_base_folder, table_path, file_name)?;
let report = crate::salvage::salvage_sst_with_comparator(
&quarantined,
table_path.to_path_buf(),
fs,
&config.comparator,
)?;
if report.salvaged_path.is_none() {
return Ok(None);
}
if !report.dropped.is_empty() {
log::warn!(
"salvaged table {table_id}: recovered {} block(s), dropped {} corrupt block(s)",
report.blocks_salvaged,
report.dropped.len(),
);
}
let checksum = crate::Checksum::from_raw(compute_table_checksum(&**fs, table_path)?);
let table = Table::recover(
table_path.to_path_buf(),
checksum,
0,
0,
table_id,
config.cache.clone(),
None,
Arc::clone(fs),
false,
false,
config.encryption.clone(),
#[cfg(zstd_any)]
config.zstd_dictionary.clone(),
config.comparator.clone(),
#[cfg(feature = "metrics")]
Arc::new(crate::metrics::Metrics::default()),
)?;
Ok(Some(table))
}
fn recover_blob_files(
config: &Config,
) -> crate::Result<(Vec<crate::vlog::BlobFile>, UnreadableFiles)> {
let blobs_folder = config.path.join(crate::file::BLOBS_FOLDER);
let mut blob_files: Vec<crate::vlog::BlobFile> = Vec::new();
let mut unreadable: UnreadableFiles = Vec::new();
if !config.fs.exists(&blobs_folder)? {
return Ok((blob_files, unreadable));
}
let mut seen_ids: crate::HashSet<crate::vlog::BlobFileId> = crate::HashSet::default();
for dirent in config.fs.read_dir(&blobs_folder)? {
let crate::fs::FsDirEntry {
path: blob_path,
file_name,
is_dir,
} = dirent;
if is_dir || file_name == ".DS_Store" || file_name.starts_with("._") {
continue;
}
let Ok(blob_id) = file_name.parse::<crate::vlog::BlobFileId>() else {
let dest = quarantine_file(&*config.fs, &blobs_folder, &blob_path, &file_name)?;
unreadable.push((
blob_path,
format!(
"file name is not a blob id; quarantined to {}",
dest.display()
),
));
continue;
};
if !seen_ids.insert(blob_id) {
continue;
}
let checksum = match compute_table_checksum(&*config.fs, &blob_path) {
Ok(c) => crate::Checksum::from_raw(c),
Err(e) => {
seen_ids.remove(&blob_id);
unreadable.push((blob_path, e.to_string()));
continue;
}
};
match crate::vlog::recover_blob_file(&blob_path, blob_id, checksum, 0, &config.fs) {
Ok(bf) => blob_files.push(bf),
Err(e) => {
seen_ids.remove(&blob_id);
unreadable.push((blob_path, e.to_string()));
}
}
}
Ok((blob_files, unreadable))
}
impl Config {
pub fn repair(&self) -> crate::Result<RepairReport> {
repair_tree(self, false)
}
pub fn repair_with_salvage(&self, salvage: bool) -> crate::Result<RepairReport> {
repair_tree(self, salvage)
}
}
fn repair_tree(config: &Config, salvage: bool) -> crate::Result<RepairReport> {
#[cfg(feature = "std")]
let _directory_lock =
crate::config::acquire_directory_lock(&*config.fs, &config.path, config.directory_lock)?;
let mut recovered_tables: Vec<Table> = Vec::new();
let mut salvaged = 0usize;
let mut unreadable_files: Vec<(PathBuf, String)> = Vec::new();
let mut seen_ids: crate::HashSet<TableId> = crate::HashSet::default();
for (table_base_folder, folder_fs) in config.all_tables_folders() {
if !folder_fs.exists(&table_base_folder)? {
continue;
}
for dirent in folder_fs.read_dir(&table_base_folder)? {
let crate::fs::FsDirEntry {
path: table_path,
file_name,
is_dir,
} = dirent;
if is_dir || file_name == ".DS_Store" || file_name.starts_with("._") {
continue;
}
let Ok(table_id) = file_name.parse::<TableId>() else {
let dest =
quarantine_file(&*folder_fs, &table_base_folder, &table_path, &file_name)?;
unreadable_files.push((
table_path,
format!(
"file name is not a table id; quarantined to {}",
dest.display()
),
));
continue;
};
if !seen_ids.insert(table_id) {
continue;
}
let checksum = match compute_table_checksum(&*folder_fs, &table_path) {
Ok(c) => crate::Checksum::from_raw(c),
Err(e) => {
seen_ids.remove(&table_id);
unreadable_files.push((table_path, e.to_string()));
continue;
}
};
let recovered = Table::recover(
table_path.clone(),
checksum,
0,
0,
table_id,
config.cache.clone(),
None,
folder_fs.clone(),
false,
false,
config.encryption.clone(),
#[cfg(zstd_any)]
config.zstd_dictionary.clone(),
config.comparator.clone(),
#[cfg(feature = "metrics")]
Arc::new(crate::metrics::Metrics::default()),
);
match recovered {
Ok(table)
if salvage
&& !crate::verify::verify_sst_file_with_fs(&*folder_fs, &table_path)
.is_ok() =>
{
drop(table);
match try_salvage_table(
config,
&table_base_folder,
&folder_fs,
&table_path,
&file_name,
table_id,
) {
Ok(Some(table)) => {
salvaged += 1;
recovered_tables.push(table);
}
Ok(None) => {
seen_ids.remove(&table_id);
unreadable_files.push((
table_path,
"verify found corrupt blocks; nothing salvageable".to_string(),
));
}
Err(salvage_err) => {
seen_ids.remove(&table_id);
unreadable_files.push((
table_path,
format!(
"verify found corrupt blocks; salvage failed ({salvage_err})"
),
));
}
}
}
Ok(table) => recovered_tables.push(table),
Err(e) if salvage => {
match try_salvage_table(
config,
&table_base_folder,
&folder_fs,
&table_path,
&file_name,
table_id,
) {
Ok(Some(table)) => {
salvaged += 1;
recovered_tables.push(table);
}
Ok(None) => {
seen_ids.remove(&table_id);
unreadable_files.push((
table_path,
format!(
"unrecoverable ({e}); original quarantined, nothing salvageable"
),
));
}
Err(salvage_err) => {
seen_ids.remove(&table_id);
unreadable_files.push((
table_path,
format!("recovery failed ({e}); salvage failed ({salvage_err})"),
));
}
}
}
Err(e) => {
seen_ids.remove(&table_id);
unreadable_files.push((table_path, e.to_string()));
}
}
}
}
recovered_tables.sort_by_key(|t| std::cmp::Reverse(t.get_highest_seqno()));
let l0_runs = recovered_tables
.iter()
.cloned()
.filter_map(|t| Run::new(vec![t]).map(Arc::new))
.collect::<Vec<_>>();
let recovered = l0_runs.len();
let mut levels = Vec::with_capacity(config.level_count.into());
levels.push(Level::from_runs(l0_runs));
for _ in 1..config.level_count {
levels.push(Level::empty());
}
let version_id = match highest_existing_version_id(&*config.fs, &config.path)? {
Some(max) => max.checked_add(1).ok_or(crate::Error::Unrecoverable)?,
None => 0,
};
let (tree_type, blob_file_list) = if config.kv_separation_opts.is_some() {
let (blob_files, blob_unreadable) = recover_blob_files(config)?;
unreadable_files.extend(blob_unreadable);
let map: crate::HashMap<crate::vlog::BlobFileId, crate::vlog::BlobFile> =
blob_files.into_iter().map(|bf| (bf.id(), bf)).collect();
(TreeType::Blob, BlobFileList::new(map))
} else {
(
TreeType::Standard,
BlobFileList::new(crate::HashMap::default()),
)
};
let version = Version::from_levels(
version_id,
tree_type,
levels,
blob_file_list,
crate::blob_tree::FragmentationMap::default(),
);
crate::version::persist_version(
&config.path,
&version,
config.comparator.name(),
&*config.fs,
Arc::new(config.initial_runtime_config.clone()),
config.encryption.clone(),
config.sync_mode,
)?;
for dirent in config.fs.read_dir(&config.path)? {
if dirent.is_dir || !dirent.file_name.starts_with("edits-") {
continue;
}
match config.fs.remove_file(&dirent.path) {
Ok(()) => {}
Err(e) if e.kind() == crate::io::ErrorKind::NotFound => {}
Err(e) => return Err(e.into()),
}
}
let mut warnings = vec![
"All recovered tables placed at L0; background compaction will redistribute them",
"Recent unlogged version edits (in-flight compactions, recent deletions) are lost",
];
if config.kv_separation_opts.is_some() {
warnings.push(
"Blob fragmentation stats reset to empty; blob GC will re-learn reclaimable space over time",
);
}
Ok(RepairReport {
recovered,
salvaged,
unreadable: unreadable_files.len(),
unreadable_files,
method: "all-to-L0 with sequence-number ordering",
warnings,
})
}
#[cfg(test)]
mod tests;