use rocksdb::{DBWithThreadMode, SingleThreaded};
use serde::Serialize;
use std::path::PathBuf;
use crate::core::db;
use crate::core::db::dir_hashes::dir_hashes_db::{
dir_hash_db_path_from_commit_id, with_entry_evicted,
};
use crate::core::db::key_val::str_val_db;
use crate::error::OxenError;
use crate::model::merkle_tree::node::{EMerkleTreeNode, MerkleTreeNode};
use crate::model::{Commit, LocalRepository, MerkleHash};
use crate::repositories;
use crate::util;
#[derive(Debug, Clone, Serialize)]
pub struct RebuildDirHashesStats {
pub commit_id: String,
pub dirs_written: usize,
}
pub fn rebuild_dir_hash_db(
repo: &LocalRepository,
commit: &Commit,
) -> Result<RebuildDirHashesStats, OxenError> {
log::info!(
"rebuild_dir_hash_db: repo={:?} commit={}",
repo.path,
commit.id
);
let root = repositories::tree::get_root_with_children(repo, commit)?
.ok_or_else(|| OxenError::RevisionNotFound(commit.id.clone().into()))?;
let pairs = collect_dir_hashes(&root);
let db_path = dir_hash_db_path_from_commit_id(repo, &commit.id);
let new_path = db_path.with_file_name(format!("dir_hashes.new.{}", commit.id));
let old_path = db_path.with_file_name(format!("dir_hashes.old.{}", commit.id));
for path in [&new_path, &old_path] {
if path.exists() {
util::fs::remove_dir_all(path)?;
}
}
let mut successful_writes: usize = 0;
{
let opts = db::key_val::opts::default();
let new_db: DBWithThreadMode<SingleThreaded> =
DBWithThreadMode::open(&opts, dunce::simplified(&new_path))?;
for (path, hash) in &pairs {
let Some(path_str) = path.to_str() else {
log::error!("Skipping non-UTF-8 path during rebuild: {path:?}");
continue;
};
str_val_db::put(&new_db, path_str, &hash.to_string())?;
successful_writes += 1;
}
}
let swap_db_path = db_path.clone();
with_entry_evicted(repo, &commit.id, move || {
let had_existing = swap_db_path.exists();
if had_existing {
util::fs::rename(&swap_db_path, &old_path)?;
}
util::fs::rename(&new_path, &swap_db_path)?;
if had_existing
&& old_path.exists()
&& let Err(err) = util::fs::remove_dir_all(&old_path)
{
log::warn!(
"rebuild_dir_hash_db: could not remove previous dir_hashes at \
{old_path:?}: {err}"
);
}
Ok(())
})?;
Ok(RebuildDirHashesStats {
commit_id: commit.id.clone(),
dirs_written: successful_writes,
})
}
fn collect_dir_hashes(root: &MerkleTreeNode) -> Vec<(PathBuf, MerkleHash)> {
let mut out = Vec::new();
let mut stack: Vec<(&MerkleTreeNode, PathBuf)> = vec![(root, PathBuf::new())];
while let Some((node, path)) = stack.pop() {
if matches!(&node.node, EMerkleTreeNode::Directory(_)) {
out.push((path.clone(), node.hash));
}
for child in &node.children {
let next_path = if let EMerkleTreeNode::Directory(dir) = &child.node {
path.join(dir.name())
} else {
path.clone()
};
stack.push((child, next_path));
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use crate::error::OxenError;
use crate::repositories;
use crate::storage::version_store::LocalFilePath;
use crate::test;
use crate::util;
#[tokio::test]
async fn test_fsck_dry_run_detects_corrupted_version() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let file_path = repo.path.join("hello.txt");
test::write_txt_file_to_path(&file_path, "hello world")?;
repositories::add(&repo, &file_path).await?;
repositories::commit(&repo, "Adding hello.txt")?;
let version_store = repo.version_store();
let versions = version_store.list_versions().await?;
assert!(!versions.is_empty());
let hash = &versions[0];
let version_path = version_store.get_version_path(hash).await?;
let LocalFilePath::Stable(ref path) = version_path else {
panic!("Expected LocalVersionStore (Stable path), got a Temp path. This test only works with local storage.");
};
std::fs::write(path, b"corrupted data")?;
let result = version_store.clean_corrupted_versions(true).await?;
assert!(result.corrupted > 0);
assert_eq!(result.cleaned, 0);
assert!(version_store.version_exists(hash).await?);
Ok(())
})
.await
}
#[tokio::test]
async fn test_fsck_clean_removes_corrupted_version() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let file_path = repo.path.join("hello.txt");
test::write_txt_file_to_path(&file_path, "hello world")?;
repositories::add(&repo, &file_path).await?;
repositories::commit(&repo, "Adding hello.txt")?;
let version_store = repo.version_store();
let versions = version_store.list_versions().await?;
assert!(!versions.is_empty());
let hash = &versions[0];
let version_path = version_store.get_version_path(hash).await?;
let LocalFilePath::Stable(ref path) = version_path else {
panic!("Expected LocalVersionStore (Stable path), got a Temp path. This test only works with local storage.");
};
std::fs::write(path, b"corrupted data")?;
let result = version_store.clean_corrupted_versions(false).await?;
assert!(result.corrupted > 0);
assert!(result.cleaned > 0);
assert!(!version_store.version_exists(hash).await?);
Ok(())
})
.await
}
#[tokio::test]
async fn test_fsck_no_corruption_on_clean_repo() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let file_path = repo.path.join("hello.txt");
test::write_txt_file_to_path(&file_path, "hello world")?;
repositories::add(&repo, &file_path).await?;
repositories::commit(&repo, "Adding hello.txt")?;
let version_store = repo.version_store();
let result = version_store.clean_corrupted_versions(true).await?;
assert_eq!(result.corrupted, 0);
assert!(result.scanned > 0);
Ok(())
})
.await
}
#[tokio::test]
async fn test_rebuild_dir_hash_db_restores_missing_entry() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|repo| async move {
let parent_dir = repo.path.join("features").join("fbimg");
let child_dir = parent_dir.join("dinov3_vits16");
util::fs::create_dir_all(&child_dir)?;
let file = child_dir.join("note.txt");
test::write_txt_file_to_path(&file, "hello")?;
repositories::add(&repo, &repo.path).await?;
let commit = repositories::commit(&repo, "seed nested dirs")?;
let child_rel = PathBuf::from("features/fbimg/dinov3_vits16");
let ok = repositories::tree::get_dir_with_children(&repo, &commit, &child_rel, None)?;
assert!(
ok.is_some(),
"expected dir_with_children to find {child_rel:?} on a healthy repo"
);
let db_path =
crate::core::db::dir_hashes::dir_hashes_db::dir_hash_db_path_from_commit_id(
&repo, &commit.id,
);
crate::core::db::dir_hashes::dir_hashes_db::remove_from_cache_with_children(&db_path)?;
util::fs::remove_dir_all(&db_path)?;
let broken =
repositories::tree::get_dir_with_children(&repo, &commit, &child_rel, None);
assert!(
matches!(broken, Err(OxenError::PathDoesNotExist(_))),
"expected PathDoesNotExist after dir_hash_db was removed, got {broken:?}"
);
let stats = rebuild_dir_hash_db(&repo, &commit)?;
assert_eq!(stats.commit_id, commit.id);
assert!(
stats.dirs_written >= 3,
"expected at least root + features + fbimg + dinov3_vits16 entries, got {}",
stats.dirs_written
);
let repaired =
repositories::tree::get_dir_with_children(&repo, &commit, &child_rel, None)?;
assert!(
repaired.is_some(),
"expected dir_with_children to find {child_rel:?} after rebuild"
);
Ok(())
})
.await
}
}