use std::time::Duration;
use chrono::Utc;
use futures::stream::StreamExt;
use lance::dataset::cleanup::{CleanupPolicy, RemovalStats};
use lance::dataset::optimize::{
CompactionMetrics, CompactionOptions, compact_files, plan_compaction,
};
use lance::index::DatasetIndexExt;
use lance_index::optimize::OptimizeOptions;
use super::*;
const DEFAULT_MAINT_CONCURRENCY: usize = 8;
fn maint_concurrency() -> usize {
std::env::var("OMNIGRAPH_MAINTENANCE_CONCURRENCY")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|v| *v > 0)
.unwrap_or(DEFAULT_MAINT_CONCURRENCY)
}
const LANCE_SUPPORTS_BLOB_COMPACTION: bool = false;
#[derive(Debug, Clone, Default)]
pub struct CleanupPolicyOptions {
pub keep_versions: Option<u32>,
pub older_than: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum SkipReason {
BlobColumnsUnsupportedByLance,
DriftNeedsRepair,
}
impl SkipReason {
pub fn as_str(&self) -> &'static str {
match self {
SkipReason::BlobColumnsUnsupportedByLance => "blob_columns_unsupported_by_lance",
SkipReason::DriftNeedsRepair => "drift_needs_repair",
}
}
}
impl std::fmt::Display for SkipReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let msg = match self {
SkipReason::BlobColumnsUnsupportedByLance => {
"blob columns — Lance compaction unsupported"
}
SkipReason::DriftNeedsRepair => "manifest/head drift — run omnigraph repair",
};
f.write_str(msg)
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct TableOptimizeStats {
pub table_key: String,
pub fragments_removed: usize,
pub fragments_added: usize,
pub committed: bool,
pub skipped: Option<SkipReason>,
pub manifest_version: Option<u64>,
pub lance_head_version: Option<u64>,
pub pending_indexes: Vec<super::PendingIndex>,
}
impl TableOptimizeStats {
fn compacted(table_key: String, metrics: &CompactionMetrics, committed: bool) -> Self {
Self {
table_key,
fragments_removed: metrics.fragments_removed,
fragments_added: metrics.fragments_added,
committed,
skipped: None,
manifest_version: None,
lance_head_version: None,
pending_indexes: Vec::new(),
}
}
fn skipped(table_key: String, reason: SkipReason) -> Self {
Self {
table_key,
fragments_removed: 0,
fragments_added: 0,
committed: false,
skipped: Some(reason),
manifest_version: None,
lance_head_version: None,
pending_indexes: Vec::new(),
}
}
fn skipped_for_drift(
table_key: String,
manifest_version: u64,
lance_head_version: u64,
) -> Self {
Self {
table_key,
fragments_removed: 0,
fragments_added: 0,
committed: false,
skipped: Some(SkipReason::DriftNeedsRepair),
manifest_version: Some(manifest_version),
lance_head_version: Some(lance_head_version),
pending_indexes: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct TableCleanupStats {
pub table_key: String,
pub bytes_removed: u64,
pub old_versions_removed: u64,
pub error: Option<String>,
}
pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStats>> {
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("optimize").await?;
if !crate::db::manifest::list_sidecars(db.root_uri(), db.storage_adapter())
.await?
.is_empty()
{
return Err(OmniError::manifest_conflict(
"optimize requires a clean recovery state; reopen the graph to run the \
recovery sweep before optimizing",
));
}
let snapshot = db.fresh_snapshot_for_branch(None).await?;
let table_tasks: Vec<(String, String, bool)> = {
let catalog = db.catalog();
let mut tasks = Vec::new();
for table_key in all_table_keys(&catalog) {
let Some(entry) = snapshot.entry(&table_key) else {
continue;
};
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
let has_blob = !blob_properties_for_table_key(&catalog, &table_key)?.is_empty();
tasks.push((table_key, full_path, has_blob));
}
tasks
};
if table_tasks.is_empty() {
return Ok(Vec::new());
}
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
let stats: Vec<Result<TableOptimizeStats>> = futures::stream::iter(table_tasks.into_iter())
.map(move |(table_key, full_path, has_blob)| async move {
optimize_one_table(db, table_key, full_path, has_blob).await
})
.buffer_unordered(concurrency)
.collect()
.await;
let any_committed = stats.iter().any(|s| matches!(s, Ok(st) if st.committed));
let edge_committed = stats
.iter()
.any(|s| matches!(s, Ok(st) if st.committed && st.table_key.starts_with("edge:")));
if any_committed {
db.runtime_cache.invalidate_all().await;
if edge_committed {
db.invalidate_graph_index().await;
}
}
stats.into_iter().collect()
}
async fn optimize_one_table(
db: &Omnigraph,
table_key: String,
full_path: String,
has_blob: bool,
) -> Result<TableOptimizeStats> {
if has_blob && !LANCE_SUPPORTS_BLOB_COMPACTION {
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
"skipping compaction: table has blob columns the current Lance \
cannot rewrite (blob-v2 AllBinary decode bug); other tables \
unaffected — rerun after the Lance fix",
);
return Ok(TableOptimizeStats::skipped(
table_key,
SkipReason::BlobColumnsUnsupportedByLance,
));
}
let _guard = db
.write_queue()
.acquire_many(&[(table_key.clone(), None)])
.await;
let handle = db
.storage()
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let mut ds = handle.into_dataset();
let expected_version = db
.fresh_snapshot_for_branch(None)
.await?
.entry(&table_key)
.map(|e| e.table_version)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
let lance_head_version = ds.version().version;
if lance_head_version < expected_version {
return Err(OmniError::manifest_internal(format!(
"table '{}' Lance HEAD version {} is behind manifest version {}",
table_key, lance_head_version, expected_version
)));
}
if lance_head_version > expected_version {
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
manifest_version = expected_version,
lance_head_version,
"skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \
to classify and publish covered maintenance drift explicitly",
);
return Ok(TableOptimizeStats::skipped_for_drift(
table_key,
expected_version,
lance_head_version,
));
}
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let will_compact = plan.num_tasks() > 0;
let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?;
let needs_index_create = if let Some(type_name) = table_key.strip_prefix("node:") {
super::table_ops::needs_index_work_node(db, type_name, &table_key, &full_path, None).await?
} else {
super::table_ops::needs_index_work_edge(db, &table_key, &full_path, None).await?
};
if !will_compact && !needs_reindex && !needs_index_create {
return Ok(TableOptimizeStats::compacted(
table_key,
&CompactionMetrics::default(),
false,
));
}
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::Optimize,
None,
None,
vec![crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: full_path.clone(),
expected_version,
post_commit_pin: expected_version + 1,
table_branch: None,
}],
);
let handle =
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?;
let version_before = ds.version().version;
let metrics: CompactionMetrics = if will_compact {
compact_files(&mut ds, options, None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
} else {
CompactionMetrics::default()
};
ds.optimize_indices(&OptimizeOptions::default())
.await
.map_err(|e| OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)))?;
let catalog = db.catalog();
let mut snapshot = crate::storage_layer::SnapshotHandle::new(ds);
let pending_indexes: Vec<super::PendingIndex> =
super::table_ops::build_indices_on_dataset_for_catalog(
db,
&catalog,
&table_key,
&mut snapshot,
)
.await?;
let version_after = snapshot.dataset().version().version;
let committed = version_after != version_before;
crate::failpoints::maybe_fail("optimize.post_phase_b_pre_manifest_commit")?;
if committed {
let state = db.storage().table_state(&full_path, &snapshot).await?;
let update = crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
};
let mut expected = std::collections::HashMap::new();
expected.insert(table_key.clone(), expected_version);
db.coordinator
.write()
.await
.commit_updates_with_actor_with_expected(&[update], &expected, None)
.await?;
}
if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"optimize recovery sidecar cleanup failed; next open's recovery sweep will resolve it"
);
}
let mut stat = TableOptimizeStats::compacted(table_key, &metrics, committed);
stat.pending_indexes = pending_indexes;
Ok(stat)
}
pub async fn cleanup_all_tables(
db: &mut Omnigraph,
options: CleanupPolicyOptions,
) -> Result<Vec<TableCleanupStats>> {
if options.keep_versions.is_none() && options.older_than.is_none() {
return Err(OmniError::manifest(
"cleanup requires at least one of keep_versions or older_than",
));
}
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("cleanup").await?;
let reconciled = reconcile_orphaned_branches(db).await?;
if !reconciled.reclaimed.is_empty() {
tracing::info!(
count = reconciled.reclaimed.len(),
reclaimed = ?reconciled.reclaimed,
"cleanup reconciled orphaned branch forks"
);
}
if !reconciled.failures.is_empty() {
tracing::warn!(
count = reconciled.failures.len(),
failures = ?reconciled.failures,
"cleanup could not reconcile some orphaned forks; will retry next cleanup"
);
}
let before_timestamp = options.older_than.map(|d| Utc::now() - d);
let keep_versions = options.keep_versions;
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let table_tasks: Vec<_> = all_table_keys(&db.catalog())
.into_iter()
.filter_map(|table_key| {
let entry = snapshot.entry(&table_key)?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
Some((table_key, full_path))
})
.collect();
if table_tasks.is_empty() {
return Ok(Vec::new());
}
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
let storage = db.storage();
let results: Vec<TableCleanupStats> = futures::stream::iter(table_tasks.into_iter())
.map(|(table_key, full_path)| async move {
let outcome: Result<RemovalStats> = async {
crate::failpoints::maybe_fail("cleanup.table_gc")?;
let handle = storage
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let ds = handle.into_dataset();
let before_version = keep_versions
.map(|n| ds.version().version.saturating_sub(n as u64))
.filter(|v| *v > 0);
let policy = CleanupPolicy {
before_timestamp,
before_version,
delete_unverified: false,
error_if_tagged_old_versions: false,
clean_referenced_branches: false,
delete_rate_limit: None,
};
lance::dataset::cleanup::cleanup_old_versions(&ds, policy)
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
.await;
match outcome {
Ok(removed) => TableCleanupStats {
table_key,
bytes_removed: removed.bytes_removed,
old_versions_removed: removed.old_versions,
error: None,
},
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
error = %err,
"version GC failed for table; other tables unaffected",
);
TableCleanupStats {
table_key,
bytes_removed: 0,
old_versions_removed: 0,
error: Some(err.to_string()),
}
}
}
})
.buffer_unordered(concurrency)
.collect()
.await;
Ok(results)
}
#[derive(Debug, Clone, Default)]
pub struct BranchReconcileStats {
pub reclaimed: Vec<(String, String)>,
pub failures: Vec<(String, String)>,
}
pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconcileStats> {
use std::collections::{HashMap, HashSet};
let live_branches: HashSet<String> = db
.coordinator
.read()
.await
.all_branches()
.await?
.into_iter()
.collect();
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let table_targets: Vec<(String, String)> = all_table_keys(&db.catalog())
.into_iter()
.filter_map(|table_key| {
let entry = snapshot.entry(&table_key)?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
Some((table_key, full_path))
})
.collect();
let mut stats = BranchReconcileStats::default();
let mut branch_snapshots: HashMap<String, crate::db::Snapshot> = HashMap::new();
let mut failed_branch_snapshots: HashSet<String> = HashSet::new();
let storage = db.storage();
for (table_key, full_path) in table_targets {
let listed = match storage.list_branches(&full_path).await {
Ok(listed) => listed,
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
error = %err,
"listing branches failed during reconcile; skipping table",
);
stats.failures.push((table_key.clone(), err.to_string()));
continue;
}
};
let mut orphans: Vec<String> = Vec::new();
for branch in listed {
if branch == "main" || crate::db::is_internal_system_branch(&branch) {
continue;
}
let is_orphan = if !live_branches.contains(&branch) {
true } else {
if failed_branch_snapshots.contains(&branch) {
continue;
}
if !branch_snapshots.contains_key(&branch) {
let branch_snapshot =
match crate::failpoints::maybe_fail("cleanup.resolve_branch_snapshot") {
Ok(()) => db.snapshot_for_branch(Some(&branch)).await,
Err(injected) => Err(injected),
};
match branch_snapshot {
Ok(snap) => {
branch_snapshots.insert(branch.clone(), snap);
}
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
branch = %branch,
error = %err,
"resolving branch snapshot failed during reconcile; skipping",
);
stats.failures.push((table_key.clone(), err.to_string()));
failed_branch_snapshots.insert(branch.clone());
continue;
}
}
}
branch_snapshots[&branch]
.entry(&table_key)
.map(|e| e.table_branch.as_deref() != Some(branch.as_str()))
.unwrap_or(true)
};
if is_orphan {
orphans.push(branch);
}
}
orphans.sort_by(|a, b| b.len().cmp(&a.len()).then_with(|| a.cmp(b)));
for branch in orphans {
let _guard = db
.write_queue()
.acquire(&(table_key.clone(), Some(branch.clone())))
.await;
match super::table_ops::classify_fork_ref(db, &table_key, &branch).await {
super::table_ops::ForkRefStatus::Orphan => {}
super::table_ops::ForkRefStatus::Legitimate => continue,
super::table_ops::ForkRefStatus::Indeterminate => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
branch = %branch,
"fresh re-check inconclusive during reconcile; skipping to avoid \
destroying a possibly-live fork (will retry next cleanup)",
);
stats.failures.push((
table_key.clone(),
format!("indeterminate fork status for {branch}"),
));
continue;
}
}
let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") {
Ok(()) => storage.force_delete_branch(&full_path, &branch).await,
Err(injected) => Err(injected),
};
match outcome {
Ok(()) => stats.reclaimed.push((table_key.clone(), branch)),
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
branch = %branch,
error = %err,
"reclaiming orphaned fork failed; will retry next cleanup",
);
stats.failures.push((table_key.clone(), err.to_string()));
}
}
}
}
if let Err(err) = reconcile_commit_graph_orphans(db, &live_branches, &mut stats).await {
tracing::warn!(
target: "omnigraph::cleanup",
error = %err,
"commit-graph orphan reconcile failed; will retry next cleanup",
);
stats
.failures
.push(("_graph_commits".to_string(), err.to_string()));
}
Ok(stats)
}
async fn reconcile_commit_graph_orphans(
db: &Omnigraph,
keep: &std::collections::HashSet<String>,
stats: &mut BranchReconcileStats,
) -> Result<()> {
let commits_uri = crate::db::commit_graph::graph_commits_uri(db.root_uri());
if !db.storage_adapter().exists(&commits_uri).await? {
return Ok(());
}
let mut commit_graph = crate::db::commit_graph::CommitGraph::open(db.root_uri()).await?;
for branch in orphan_branches(commit_graph.list_branches().await?, keep) {
match commit_graph.force_delete_branch(&branch).await {
Ok(()) => stats.reclaimed.push(("_graph_commits".to_string(), branch)),
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
branch = %branch,
error = %err,
"reclaiming orphaned commit-graph branch failed; will retry next cleanup",
);
stats
.failures
.push(("_graph_commits".to_string(), err.to_string()));
}
}
}
Ok(())
}
fn orphan_branches(present: Vec<String>, keep: &std::collections::HashSet<String>) -> Vec<String> {
let mut orphans: Vec<String> = present
.into_iter()
.filter(|branch| !keep.contains(branch))
.collect();
orphans.sort_by(|a, b| b.len().cmp(&a.len()).then_with(|| a.cmp(b)));
orphans
}
pub(super) fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) -> Vec<String> {
let mut keys: Vec<String> = catalog
.node_types
.keys()
.map(|n| format!("node:{}", n))
.chain(catalog.edge_types.keys().map(|n| format!("edge:{}", n)))
.collect();
keys.sort();
keys
}
#[cfg(all(test, feature = "failpoints"))]
mod tests {
use super::*;
use crate::failpoints::ScopedFailPoint;
use crate::loader::{LoadMode, load_jsonl};
fn node_table_uri(root: &str, type_name: &str) -> String {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in type_name.as_bytes() {
hash ^= b as u64;
hash = hash.wrapping_mul(0x100_0000_01b3);
}
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
}
#[tokio::test]
async fn reconcile_caches_live_branch_snapshot_resolution_failure() {
let _scenario = fail::FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = "node Person { name: String @key }\nnode Company { name: String @key }\n";
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Alice\"}}\n\
{\"type\":\"Company\",\"data\":{\"name\":\"Acme\"}}",
LoadMode::Merge,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
for type_name in ["Person", "Company"] {
let table_uri = node_table_uri(uri, type_name);
let mut ds = lance::Dataset::open(&table_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
}
let _fp = ScopedFailPoint::new("cleanup.resolve_branch_snapshot", "return");
let stats = reconcile_orphaned_branches(&db).await.unwrap();
assert_eq!(
stats.failures.len(),
1,
"one live-branch snapshot resolution failure should be reported once, \
not once per table: {:?}",
stats.failures
);
assert!(
stats.failures[0]
.1
.contains("cleanup.resolve_branch_snapshot"),
"the recorded failure should be the branch-snapshot resolution failure: {:?}",
stats.failures
);
assert!(
stats.reclaimed.is_empty(),
"unreadable live-branch refs must be left for the next cleanup run"
);
}
}