use tracing::{info, instrument, warn};
use uni_common::api::error::UniError;
use uni_common::core::fork::{ForkInfo, ForkStatus};
use super::registry::ForkRegistryHandle;
#[instrument(skip(registry, dataset_uri_for), level = "info")]
pub async fn recover_forks<F>(
registry: &ForkRegistryHandle,
mut dataset_uri_for: F,
) -> Result<usize, UniError>
where
F: FnMut(&str) -> String,
{
let mut reconciled = 0usize;
let snapshot = registry.snapshot().await;
let pending: Vec<ForkInfo> = snapshot
.forks
.values()
.filter(|f| f.status == ForkStatus::Pending)
.cloned()
.collect();
for info in pending {
info!(fork_name = %info.name, fork_id = %info.id, "rolling back Pending create");
rollback_branches(&info, &mut dataset_uri_for).await;
registry.rollback_create(&info.name).await?;
reconciled += 1;
}
let snapshot = registry.snapshot().await;
let tombstoned: Vec<ForkInfo> = snapshot
.forks
.values()
.filter(|f| f.status == ForkStatus::Tombstoned)
.cloned()
.collect();
for info in tombstoned {
info!(fork_name = %info.name, fork_id = %info.id, "completing tombstoned drop");
delete_all_branches(&info, &mut dataset_uri_for).await;
registry.finish_drop(&info).await?;
reconciled += 1;
}
let orphans = registry.list_tombstones().await?;
for info in orphans {
info!(
fork_name = %info.name,
fork_id = %info.id,
"sweeping orphan tombstone"
);
delete_all_branches(&info, &mut dataset_uri_for).await;
registry.finish_drop(&info).await?;
reconciled += 1;
}
Ok(reconciled)
}
async fn delete_all_branches<F>(info: &ForkInfo, dataset_uri_for: &mut F)
where
F: FnMut(&str) -> String,
{
#[cfg(feature = "lance-backend")]
for (dataset, branch) in &info.datasets {
let uri = dataset_uri_for(dataset);
if let Err(e) = crate::backend::lance_branch::delete_branch(&uri, branch).await {
warn!(
dataset = %dataset,
branch = %branch,
"delete_branch during recovery failed: {e}"
);
}
}
#[cfg(not(feature = "lance-backend"))]
{
let _ = (info, dataset_uri_for);
}
}
async fn rollback_branches<F>(info: &ForkInfo, dataset_uri_for: &mut F)
where
F: FnMut(&str) -> String,
{
if !info.datasets.is_empty() {
delete_all_branches(info, dataset_uri_for).await;
}
}
#[doc(hidden)]
pub fn join_uri_with(base_uri: String) -> impl FnMut(&str) -> String {
move |dataset: &str| {
if base_uri.ends_with('/') {
format!("{base_uri}{dataset}.lance")
} else {
format!("{base_uri}/{dataset}.lance")
}
}
}