use std::time::Duration;
use chrono::Utc;
use futures::stream::StreamExt;
use lance::dataset::cleanup::{CleanupPolicy, RemovalStats};
use lance::dataset::optimize::{CompactionMetrics, CompactionOptions, compact_files};
use super::*;
const DEFAULT_MAINT_CONCURRENCY: usize = 8;
fn maint_concurrency() -> usize {
std::env::var("OMNIGRAPH_MAINTENANCE_CONCURRENCY")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|v| *v > 0)
.unwrap_or(DEFAULT_MAINT_CONCURRENCY)
}
const LANCE_SUPPORTS_BLOB_COMPACTION: bool = false;
#[derive(Debug, Clone, Default)]
pub struct CleanupPolicyOptions {
pub keep_versions: Option<u32>,
pub older_than: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum SkipReason {
BlobColumnsUnsupportedByLance,
}
impl SkipReason {
pub fn as_str(&self) -> &'static str {
match self {
SkipReason::BlobColumnsUnsupportedByLance => "blob_columns_unsupported_by_lance",
}
}
}
impl std::fmt::Display for SkipReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let msg = match self {
SkipReason::BlobColumnsUnsupportedByLance => {
"blob columns — Lance compaction unsupported"
}
};
f.write_str(msg)
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct TableOptimizeStats {
pub table_key: String,
pub fragments_removed: usize,
pub fragments_added: usize,
pub committed: bool,
pub skipped: Option<SkipReason>,
}
impl TableOptimizeStats {
fn compacted(table_key: String, metrics: &CompactionMetrics, committed: bool) -> Self {
Self {
table_key,
fragments_removed: metrics.fragments_removed,
fragments_added: metrics.fragments_added,
committed,
skipped: None,
}
}
fn skipped(table_key: String, reason: SkipReason) -> Self {
Self {
table_key,
fragments_removed: 0,
fragments_added: 0,
committed: false,
skipped: Some(reason),
}
}
}
#[derive(Debug, Clone)]
pub struct TableCleanupStats {
pub table_key: String,
pub bytes_removed: u64,
pub old_versions_removed: u64,
pub error: Option<String>,
}
pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStats>> {
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("optimize").await?;
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let table_tasks: Vec<(String, String, bool)> = {
let catalog = db.catalog();
let mut tasks = Vec::new();
for table_key in all_table_keys(&catalog) {
let Some(entry) = snapshot.entry(&table_key) else {
continue;
};
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
let has_blob = !blob_properties_for_table_key(&catalog, &table_key)?.is_empty();
tasks.push((table_key, full_path, has_blob));
}
tasks
};
if table_tasks.is_empty() {
return Ok(Vec::new());
}
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
let table_store = &db.table_store;
let stats: Vec<Result<TableOptimizeStats>> = futures::stream::iter(table_tasks.into_iter())
.map(|(table_key, full_path, has_blob)| async move {
if has_blob && !LANCE_SUPPORTS_BLOB_COMPACTION {
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
"skipping compaction: table has blob columns the current Lance \
cannot rewrite (blob-v2 AllBinary decode bug); other tables \
unaffected — rerun after the Lance fix",
);
return Ok(TableOptimizeStats::skipped(
table_key,
SkipReason::BlobColumnsUnsupportedByLance,
));
}
let mut ds = table_store
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let version_before = ds.version().version;
let metrics: CompactionMetrics =
compact_files(&mut ds, CompactionOptions::default(), None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let version_after = ds.version().version;
Ok(TableOptimizeStats::compacted(
table_key,
&metrics,
version_after != version_before,
))
})
.buffer_unordered(concurrency)
.collect()
.await;
stats.into_iter().collect()
}
pub async fn cleanup_all_tables(
db: &mut Omnigraph,
options: CleanupPolicyOptions,
) -> Result<Vec<TableCleanupStats>> {
if options.keep_versions.is_none() && options.older_than.is_none() {
return Err(OmniError::manifest(
"cleanup requires at least one of keep_versions or older_than",
));
}
db.ensure_schema_state_valid().await?;
db.ensure_schema_apply_idle("cleanup").await?;
let reconciled = reconcile_orphaned_branches(db).await?;
if !reconciled.reclaimed.is_empty() {
tracing::info!(
count = reconciled.reclaimed.len(),
reclaimed = ?reconciled.reclaimed,
"cleanup reconciled orphaned branch forks"
);
}
if !reconciled.failures.is_empty() {
tracing::warn!(
count = reconciled.failures.len(),
failures = ?reconciled.failures,
"cleanup could not reconcile some orphaned forks; will retry next cleanup"
);
}
let before_timestamp = options.older_than.map(|d| Utc::now() - d);
let keep_versions = options.keep_versions;
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let table_tasks: Vec<_> = all_table_keys(&db.catalog())
.into_iter()
.filter_map(|table_key| {
let entry = snapshot.entry(&table_key)?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
Some((table_key, full_path))
})
.collect();
if table_tasks.is_empty() {
return Ok(Vec::new());
}
let concurrency = maint_concurrency().min(table_tasks.len()).max(1);
let table_store = &db.table_store;
let results: Vec<TableCleanupStats> = futures::stream::iter(table_tasks.into_iter())
.map(|(table_key, full_path)| async move {
let outcome: Result<RemovalStats> = async {
crate::failpoints::maybe_fail("cleanup.table_gc")?;
let ds = table_store
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let before_version = keep_versions
.map(|n| ds.version().version.saturating_sub(n as u64))
.filter(|v| *v > 0);
let policy = CleanupPolicy {
before_timestamp,
before_version,
delete_unverified: false,
error_if_tagged_old_versions: false,
clean_referenced_branches: false,
delete_rate_limit: None,
};
lance::dataset::cleanup::cleanup_old_versions(&ds, policy)
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
.await;
match outcome {
Ok(removed) => TableCleanupStats {
table_key,
bytes_removed: removed.bytes_removed,
old_versions_removed: removed.old_versions,
error: None,
},
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
error = %err,
"version GC failed for table; other tables unaffected",
);
TableCleanupStats {
table_key,
bytes_removed: 0,
old_versions_removed: 0,
error: Some(err.to_string()),
}
}
}
})
.buffer_unordered(concurrency)
.collect()
.await;
Ok(results)
}
#[derive(Debug, Clone, Default)]
pub struct BranchReconcileStats {
pub reclaimed: Vec<(String, String)>,
pub failures: Vec<(String, String)>,
}
pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconcileStats> {
use std::collections::HashSet;
let keep: HashSet<String> = db
.coordinator
.read()
.await
.all_branches()
.await?
.into_iter()
.collect();
let resolved = db.resolved_branch_target(None).await?;
let snapshot = resolved.snapshot;
let table_targets: Vec<(String, String)> = all_table_keys(&db.catalog())
.into_iter()
.filter_map(|table_key| {
let entry = snapshot.entry(&table_key)?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
Some((table_key, full_path))
})
.collect();
let mut stats = BranchReconcileStats::default();
for (table_key, full_path) in table_targets {
let listed = match db.table_store.list_branches(&full_path).await {
Ok(listed) => listed,
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
error = %err,
"listing branches failed during reconcile; skipping table",
);
stats.failures.push((table_key.clone(), err.to_string()));
continue;
}
};
for branch in orphan_branches(listed, &keep) {
let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") {
Ok(()) => db.table_store.force_delete_branch(&full_path, &branch).await,
Err(injected) => Err(injected),
};
match outcome {
Ok(()) => stats.reclaimed.push((table_key.clone(), branch)),
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
branch = %branch,
error = %err,
"reclaiming orphaned fork failed; will retry next cleanup",
);
stats.failures.push((table_key.clone(), err.to_string()));
}
}
}
}
if let Err(err) = reconcile_commit_graph_orphans(db, &keep, &mut stats).await {
tracing::warn!(
target: "omnigraph::cleanup",
error = %err,
"commit-graph orphan reconcile failed; will retry next cleanup",
);
stats.failures.push(("_graph_commits".to_string(), err.to_string()));
}
Ok(stats)
}
async fn reconcile_commit_graph_orphans(
db: &Omnigraph,
keep: &std::collections::HashSet<String>,
stats: &mut BranchReconcileStats,
) -> Result<()> {
let commits_uri = crate::db::commit_graph::graph_commits_uri(db.root_uri());
if !db.storage_adapter().exists(&commits_uri).await? {
return Ok(());
}
let mut commit_graph = crate::db::commit_graph::CommitGraph::open(db.root_uri()).await?;
for branch in orphan_branches(commit_graph.list_branches().await?, keep) {
match commit_graph.force_delete_branch(&branch).await {
Ok(()) => stats.reclaimed.push(("_graph_commits".to_string(), branch)),
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
branch = %branch,
error = %err,
"reclaiming orphaned commit-graph branch failed; will retry next cleanup",
);
stats.failures.push(("_graph_commits".to_string(), err.to_string()));
}
}
}
Ok(())
}
fn orphan_branches(present: Vec<String>, keep: &std::collections::HashSet<String>) -> Vec<String> {
let mut orphans: Vec<String> = present
.into_iter()
.filter(|branch| !keep.contains(branch))
.collect();
orphans.sort_by(|a, b| b.len().cmp(&a.len()).then_with(|| a.cmp(b)));
orphans
}
fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) -> Vec<String> {
let mut keys: Vec<String> = catalog
.node_types
.keys()
.map(|n| format!("node:{}", n))
.chain(catalog.edge_types.keys().map(|n| format!("edge:{}", n)))
.collect();
keys.sort();
keys
}