use crate::{
Table, TableId,
config::{Config, TreeType},
version::{BlobFileList, Level, Run, Version},
};
use std::{path::PathBuf, sync::Arc};
#[derive(Debug)]
pub struct RepairReport {
pub recovered: usize,
pub unreadable: usize,
pub unreadable_files: Vec<(PathBuf, String)>,
pub method: &'static str,
pub warnings: Vec<&'static str>,
}
fn compute_table_checksum(fs: &dyn crate::fs::Fs, path: &std::path::Path) -> crate::Result<u128> {
let mut file = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
let mut hasher = xxhash_rust::xxh3::Xxh3Default::new();
let mut buf = vec![0u8; 256 * 1024];
loop {
let n = file.read(&mut buf)?;
if n == 0 {
break; }
let Some(chunk) = buf.get(..n) else { break };
hasher.update(chunk);
}
Ok(hasher.digest128())
}
fn highest_existing_version_id(
fs: &dyn crate::fs::Fs,
folder: &std::path::Path,
) -> crate::Result<Option<u64>> {
Ok(fs
.read_dir(folder)?
.into_iter()
.filter_map(|e| {
e.file_name
.strip_prefix('v')
.and_then(|rest| rest.parse::<u64>().ok())
})
.max())
}
fn quarantine_file(
fs: &dyn crate::fs::Fs,
table_base_folder: &std::path::Path,
src: &std::path::Path,
file_name: &str,
) -> crate::Result<PathBuf> {
let quarantine_dir = table_base_folder
.parent()
.unwrap_or(table_base_folder)
.join("repair-quarantine");
fs.create_dir_all(&quarantine_dir)?;
let dest = quarantine_dir.join(file_name);
fs.rename(src, &dest)?;
Ok(dest)
}
impl Config {
pub fn repair(&self) -> crate::Result<RepairReport> {
repair_tree(self)
}
}
fn repair_tree(config: &Config) -> crate::Result<RepairReport> {
if config.kv_separation_opts.is_some() {
return Err(crate::Error::FeatureUnsupported(
"repair of KV-separated (blob) trees",
));
}
let mut recovered_tables: Vec<Table> = Vec::new();
let mut unreadable_files: Vec<(PathBuf, String)> = Vec::new();
let mut seen_ids: crate::HashSet<TableId> = crate::HashSet::default();
for (table_base_folder, folder_fs) in config.all_tables_folders() {
if !folder_fs.exists(&table_base_folder)? {
continue;
}
for dirent in folder_fs.read_dir(&table_base_folder)? {
let crate::fs::FsDirEntry {
path: table_path,
file_name,
is_dir,
} = dirent;
if is_dir || file_name == ".DS_Store" || file_name.starts_with("._") {
continue;
}
let Ok(table_id) = file_name.parse::<TableId>() else {
let reason =
match quarantine_file(&*folder_fs, &table_base_folder, &table_path, &file_name)
{
Ok(dest) => {
format!(
"file name is not a table id; quarantined to {}",
dest.display()
)
}
Err(e) => format!("file name is not a table id; quarantine failed: {e}"),
};
unreadable_files.push((table_path, reason));
continue;
};
if !seen_ids.insert(table_id) {
continue;
}
let checksum = match compute_table_checksum(&*folder_fs, &table_path) {
Ok(c) => crate::Checksum::from_raw(c),
Err(e) => {
seen_ids.remove(&table_id);
unreadable_files.push((table_path, e.to_string()));
continue;
}
};
let recovered = Table::recover(
table_path.clone(),
checksum,
0,
0,
table_id,
config.cache.clone(),
None,
folder_fs.clone(),
false,
false,
config.encryption.clone(),
#[cfg(zstd_any)]
config.zstd_dictionary.clone(),
config.comparator.clone(),
#[cfg(feature = "metrics")]
Arc::new(crate::metrics::Metrics::default()),
);
match recovered {
Ok(table) => recovered_tables.push(table),
Err(e) => {
seen_ids.remove(&table_id);
unreadable_files.push((table_path, e.to_string()));
}
}
}
}
recovered_tables.sort_by_key(|t| std::cmp::Reverse(t.get_highest_seqno()));
let l0_runs = recovered_tables
.iter()
.cloned()
.filter_map(|t| Run::new(vec![t]).map(Arc::new))
.collect::<Vec<_>>();
let recovered = l0_runs.len();
let mut levels = Vec::with_capacity(config.level_count.into());
levels.push(Level::from_runs(l0_runs));
for _ in 1..config.level_count {
levels.push(Level::empty());
}
let version_id = highest_existing_version_id(&*config.fs, &config.path)?
.map_or(0, |max| max.saturating_add(1));
let version = Version::from_levels(
version_id,
TreeType::Standard,
levels,
BlobFileList::new(crate::HashMap::default()),
crate::blob_tree::FragmentationMap::default(),
);
crate::version::persist_version(
&config.path,
&version,
config.comparator.name(),
&*config.fs,
Arc::new(config.initial_runtime_config.clone()),
config.encryption.clone(),
config.sync_mode,
)?;
for dirent in config.fs.read_dir(&config.path)? {
if dirent.is_dir || !dirent.file_name.starts_with("edits-") {
continue;
}
match config.fs.remove_file(&dirent.path) {
Ok(()) => {}
Err(e) if e.kind() == crate::io::ErrorKind::NotFound => {}
Err(e) => return Err(e.into()),
}
}
Ok(RepairReport {
recovered,
unreadable: unreadable_files.len(),
unreadable_files,
method: "all-to-L0 with sequence-number ordering",
warnings: vec![
"All recovered tables placed at L0; background compaction will redistribute them",
"Recent unlogged version edits (in-flight compactions, recent deletions) are lost",
],
})
}
#[cfg(test)]
mod tests {
use super::{compute_table_checksum, highest_existing_version_id};
use crate::fs::StdFs;
use test_log::test;
#[test]
fn compute_table_checksum_matches_oneshot_xxh3() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("000007");
let payload: Vec<u8> = (0..600_000u32).map(|i| (i % 251) as u8).collect();
std::fs::write(&path, &payload)?;
let got = compute_table_checksum(&StdFs, &path)?;
let expected = xxhash_rust::xxh3::xxh3_128(&payload);
assert_eq!(
got, expected,
"streamed digest must equal the one-shot xxh3-128 digest",
);
Ok(())
}
#[test]
fn highest_existing_version_id_picks_the_max_and_ignores_non_versions() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
for name in ["v2", "v10", "v3", "current", "vNaN", "notaversion"] {
std::fs::write(dir.path().join(name), b"x")?;
}
assert_eq!(highest_existing_version_id(&StdFs, dir.path())?, Some(10));
Ok(())
}
#[test]
fn highest_existing_version_id_none_when_no_versions_present() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
std::fs::write(dir.path().join("current"), b"x")?;
assert_eq!(highest_existing_version_id(&StdFs, dir.path())?, None);
Ok(())
}
}