Skip to main content

omnigraph/db/omnigraph/
table_ops.rs

1use super::*;
2
3pub(super) async fn graph_index(db: &Omnigraph) -> Result<Arc<crate::graph_index::GraphIndex>> {
4    db.ensure_schema_state_valid().await?;
5    let coord = db.coordinator.read().await;
6    let resolved = coord
7        .resolve_target(&ReadTarget::Branch(
8            coord.current_branch().unwrap_or("main").to_string(),
9        ))
10        .await?;
11    drop(coord);
12    let catalog = db.catalog();
13    db.runtime_cache.graph_index(&resolved, &catalog).await
14}
15
16pub(super) async fn graph_index_for_resolved(
17    db: &Omnigraph,
18    resolved: &ResolvedTarget,
19) -> Result<Arc<crate::graph_index::GraphIndex>> {
20    let catalog = db.catalog();
21    db.runtime_cache.graph_index(resolved, &catalog).await
22}
23
24pub(super) async fn ensure_indices(db: &Omnigraph) -> Result<Vec<PendingIndex>> {
25    let current_branch = db
26        .coordinator
27        .read()
28        .await
29        .current_branch()
30        .map(str::to_string);
31    ensure_indices_for_branch(db, current_branch.as_deref()).await
32}
33
34pub(super) async fn ensure_indices_on(db: &Omnigraph, branch: &str) -> Result<Vec<PendingIndex>> {
35    let branch = normalize_branch_name(branch)?;
36    ensure_indices_for_branch(db, branch.as_deref()).await
37}
38
39#[cfg(feature = "failpoints")]
40pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test(
41    db: &mut Omnigraph,
42    branch: &str,
43    table_key: &str,
44    table_branch: Option<&str>,
45) -> Result<u64> {
46    let branch = normalize_branch_name(branch)?;
47    let snapshot = db.snapshot_for_branch(branch.as_deref()).await?;
48    let entry = snapshot
49        .entry(table_key)
50        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
51    let full_path = format!("{}/{}", db.root_uri, entry.table_path);
52    let ds = db
53        .storage()
54        .open_dataset_head_for_write(table_key, &full_path, table_branch)
55        .await?;
56    let state = db.storage().table_state(&full_path, &ds).await?;
57    let update = crate::db::SubTableUpdate {
58        table_key: table_key.to_string(),
59        table_version: state.version,
60        table_branch: table_branch.map(str::to_string),
61        row_count: state.row_count,
62        version_metadata: state.version_metadata,
63    };
64    let mut expected = std::collections::HashMap::new();
65    expected.insert(table_key.to_string(), entry.table_version);
66    commit_prepared_updates_on_branch_with_expected(
67        db,
68        branch.as_deref(),
69        &[update],
70        &expected,
71        None,
72    )
73    .await
74}
75
76pub(super) async fn ensure_indices_for_branch(
77    db: &Omnigraph,
78    branch: Option<&str>,
79) -> Result<Vec<PendingIndex>> {
80    db.ensure_schema_state_valid().await?;
81    db.ensure_schema_apply_idle("ensure_indices").await?;
82    let resolved = db.resolved_branch_target(branch).await?;
83    let snapshot = resolved.snapshot;
84    let mut updates = Vec::new();
85    let mut pending = Vec::new();
86    let active_branch = resolved.branch;
87    let catalog = db.catalog();
88
89    // Recovery sidecar: protect the per-table commit_staged loop in
90    // build_indices_on_dataset (one commit per index built). Only pins
91    // tables that ACTUALLY need index work — the classifier
92    // loose-matches for SidecarKind::EnsureIndices (the actual N
93    // depends on which indices are missing), but if a table needs zero
94    // commits and gets pinned, the all-or-nothing decision rule
95    // classifies it as `NoMovement` and rolls back legitimately-
96    // committed work on sibling tables. Steady-state runs (everything
97    // already indexed) skip the sidecar entirely.
98    let mut recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = Vec::new();
99    for type_name in catalog.node_types.keys() {
100        let table_key = format!("node:{}", type_name);
101        let Some(entry) = snapshot.entry(&table_key) else {
102            continue;
103        };
104        // Match the processing loop's branch filter: when running on a
105        // feature branch, main-branch tables (table_branch = None) are
106        // skipped (`None => continue` at ~line 118). Pinning them here
107        // would force NoMovement on recovery and trigger an all-or-
108        // nothing rollback of legitimately-committed work on the
109        // feature-branch tables.
110        if active_branch.is_some() && entry.table_branch.is_none() {
111            continue;
112        }
113        let full_path = format!("{}/{}", db.root_uri, entry.table_path);
114        if needs_index_work_node(
115            db,
116            type_name,
117            &table_key,
118            &full_path,
119            entry.table_branch.as_deref(),
120        )
121        .await?
122        {
123            recovery_pins.push(crate::db::manifest::SidecarTablePin {
124                table_key,
125                table_path: full_path,
126                expected_version: entry.table_version,
127                post_commit_pin: entry.table_version + 1,
128                // Use active_branch (where commits actually land), NOT
129                // entry.table_branch (where the table currently lives).
130                // open_owned_dataset_for_branch_write forks a feature
131                // branch from a main-branch table on first write — the
132                // resulting commit lands on active_branch. Recovery's
133                // open_lance_head must check the same branch.
134                table_branch: active_branch.clone(),
135            });
136        }
137    }
138    for edge_name in catalog.edge_types.keys() {
139        let table_key = format!("edge:{}", edge_name);
140        let Some(entry) = snapshot.entry(&table_key) else {
141            continue;
142        };
143        if active_branch.is_some() && entry.table_branch.is_none() {
144            continue;
145        }
146        let full_path = format!("{}/{}", db.root_uri, entry.table_path);
147        if needs_index_work_edge(db, &table_key, &full_path, entry.table_branch.as_deref()).await? {
148            recovery_pins.push(crate::db::manifest::SidecarTablePin {
149                table_key,
150                table_path: full_path,
151                expected_version: entry.table_version,
152                post_commit_pin: entry.table_version + 1,
153                // Use active_branch (where commits actually land), NOT
154                // entry.table_branch (where the table currently lives).
155                // open_owned_dataset_for_branch_write forks a feature
156                // branch from a main-branch table on first write — the
157                // resulting commit lands on active_branch. Recovery's
158                // open_lance_head must check the same branch.
159                table_branch: active_branch.clone(),
160            });
161        }
162    }
163    // Acquire per-(table_key, active_branch) queues for every table
164    // that needs index work. Held across the per-table commit loop and
165    // the manifest publish at the end of this function. Sorted-order
166    // acquisition prevents lock-order inversion against concurrent
167    // multi-table writers (mutation finalize, branch_merge, the fork
168    // path, recovery).
169    let queue_keys: Vec<(String, Option<String>)> = recovery_pins
170        .iter()
171        .map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
172        .collect();
173    let _queue_guards = db.write_queue().acquire_many(&queue_keys).await;
174
175    let recovery_handle = if recovery_pins.is_empty() {
176        None
177    } else {
178        let sidecar = crate::db::manifest::new_sidecar(
179            crate::db::manifest::SidecarKind::EnsureIndices,
180            active_branch.clone(),
181            // `ensure_indices` doesn't currently take an actor; system-attributed.
182            // Future: add `ensure_indices_as` to thread actor context.
183            None,
184            recovery_pins,
185        );
186        Some(
187            crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
188                .await?,
189        )
190    };
191
192    for type_name in catalog.node_types.keys() {
193        let table_key = format!("node:{}", type_name);
194        let Some(entry) = snapshot.entry(&table_key) else {
195            continue;
196        };
197        let full_path = format!("{}/{}", db.root_uri, entry.table_path);
198        let (mut ds, resolved_branch) = match active_branch.as_deref() {
199            Some(active_branch) => match entry.table_branch.as_deref() {
200                None => continue,
201                _ => {
202                    open_owned_dataset_for_branch_write(
203                        db,
204                        &table_key,
205                        &full_path,
206                        entry.table_branch.as_deref(),
207                        entry.table_version,
208                        active_branch,
209                        crate::db::MutationOpKind::SchemaRewrite,
210                    )
211                    .await?
212                }
213            },
214            None => (
215                db.storage()
216                    .open_dataset_head_for_write(&table_key, &full_path, None)
217                    .await?,
218                None,
219            ),
220        };
221        let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0);
222        if row_count > 0 {
223            pending.extend(build_indices_on_dataset(db, &table_key, &mut ds).await?);
224        }
225
226        let state = db.storage().table_state(&full_path, &ds).await?;
227        if state.version != entry.table_version
228            || resolved_branch.as_deref() != entry.table_branch.as_deref()
229        {
230            updates.push(crate::db::SubTableUpdate {
231                table_key,
232                table_version: state.version,
233                table_branch: resolved_branch,
234                row_count: state.row_count,
235                version_metadata: state.version_metadata,
236            });
237        }
238    }
239
240    for edge_name in catalog.edge_types.keys() {
241        let table_key = format!("edge:{}", edge_name);
242        let Some(entry) = snapshot.entry(&table_key) else {
243            continue;
244        };
245        let full_path = format!("{}/{}", db.root_uri, entry.table_path);
246        let (mut ds, resolved_branch) = match active_branch.as_deref() {
247            Some(active_branch) => match entry.table_branch.as_deref() {
248                None => continue,
249                _ => {
250                    open_owned_dataset_for_branch_write(
251                        db,
252                        &table_key,
253                        &full_path,
254                        entry.table_branch.as_deref(),
255                        entry.table_version,
256                        active_branch,
257                        crate::db::MutationOpKind::SchemaRewrite,
258                    )
259                    .await?
260                }
261            },
262            None => (
263                db.storage()
264                    .open_dataset_head_for_write(&table_key, &full_path, None)
265                    .await?,
266                None,
267            ),
268        };
269        let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0);
270        if row_count > 0 {
271            pending.extend(build_indices_on_dataset(db, &table_key, &mut ds).await?);
272        }
273
274        let state = db.storage().table_state(&full_path, &ds).await?;
275        if state.version != entry.table_version
276            || resolved_branch.as_deref() != entry.table_branch.as_deref()
277        {
278            updates.push(crate::db::SubTableUpdate {
279                table_key,
280                table_version: state.version,
281                table_branch: resolved_branch,
282                row_count: state.row_count,
283                version_metadata: state.version_metadata,
284            });
285        }
286    }
287
288    // Failpoint: pin the per-writer Phase B → Phase C residual for
289    // ensure_indices. Lance HEAD has advanced on every touched table
290    // (one commit_staged per index built) but the manifest publish below
291    // hasn't run. Used by
292    // `tests/failpoints.rs::ensure_indices_phase_b_failure_recovered_on_next_open`.
293    crate::failpoints::maybe_fail("ensure_indices.post_phase_b_pre_manifest_commit")?;
294
295    if !updates.is_empty() {
296        commit_prepared_updates_on_branch(db, branch, &updates, None).await?;
297    }
298
299    // Recovery sidecar lifecycle: delete after the manifest publish (or
300    // no-op when there were no updates — the sidecar covered the
301    // per-table commit window regardless). Best-effort cleanup; failing
302    // the user here would error a call that already succeeded.
303    if let Some(handle) = recovery_handle {
304        if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
305            tracing::warn!(
306                error = %err,
307                operation_id = handle.operation_id.as_str(),
308                "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
309            );
310        }
311    }
312
313    Ok(pending)
314}
315
316/// The single scalar/vector index a node property receives from a one-column
317/// `@index`/`@key` declaration, or `None` when the property type is not
318/// indexable here (a list column or `Blob`).
319///
320/// Shared by `build_indices_on_dataset_for_catalog` (which builds the index)
321/// and `needs_index_work_node` (which checks coverage to decide recovery-
322/// sidecar pinning) so the two cannot drift: an enum or orderable scalar the
323/// builder gives a BTREE must also be reported as "needs work" until that
324/// BTREE exists, or the HEAD-advancing build would run without sidecar cover.
325#[derive(Clone, Copy, PartialEq, Eq, Debug)]
326enum NodePropIndexKind {
327    Btree,
328    Fts,
329    Vector,
330}
331
332fn node_prop_index_kind(prop_type: &PropType) -> Option<NodePropIndexKind> {
333    if prop_type.list {
334        return None;
335    }
336    // Enums are physically `String` but filtered by equality, so they take a
337    // scalar BTREE, not an FTS inverted index (Lance never consults an inverted
338    // index for `=`/range). Free-text Strings keep FTS for
339    // `search()`/`match_text`/`bm25`.
340    let is_enum = prop_type.enum_values.is_some();
341    match prop_type.scalar {
342        ScalarType::String if !is_enum => Some(NodePropIndexKind::Fts),
343        ScalarType::Vector(_) => Some(NodePropIndexKind::Vector),
344        ScalarType::String
345        | ScalarType::DateTime
346        | ScalarType::Date
347        | ScalarType::I32
348        | ScalarType::I64
349        | ScalarType::U32
350        | ScalarType::U64
351        | ScalarType::F32
352        | ScalarType::F64
353        | ScalarType::Bool => Some(NodePropIndexKind::Btree),
354        ScalarType::Blob => None,
355    }
356}
357
358/// Whether a vector column currently has at least one non-null vector — the
359/// minimum for Lance IVF k-means to train (the `ivf_flat(1)` index we build
360/// needs >=1 vector). Used identically by `needs_index_work_node` (so an
361/// untrainable column is not pinned for recovery — avoiding a zero-commit pin
362/// that would roll back a sibling's index work) and by the vector build arm (so
363/// `create_vector_index` is only attempted when it can succeed, keeping its
364/// genuine errors fatal instead of swallowed as pending). If index params
365/// become size-aware (dev-graph iss-687), this threshold moves with them.
366async fn vector_column_trainable(
367    db: &Omnigraph,
368    ds: &SnapshotHandle,
369    column: &str,
370) -> Result<bool> {
371    Ok(db
372        .storage()
373        .count_rows(ds, Some(format!("{column} IS NOT NULL")))
374        .await?
375        > 0)
376}
377
378/// Returns true if the node table is missing at least one declared
379/// scalar/vector index that `build_indices_on_dataset_for_catalog` would
380/// build AND has at least one row (the ensure_indices loop has
381/// `if row_count > 0 { build_indices(...) }`, so empty tables produce
382/// zero commits and must NOT be pinned in the sidecar — pinning them
383/// would force `NoMovement` classification on recovery and trigger the
384/// all-or-nothing rollback of sibling tables' legitimate index work).
385///
386/// Per `build_indices_on_dataset_for_catalog`, nodes get BTree (id) plus, for
387/// each one-column `@index`/`@key` property, the index `node_prop_index_kind`
388/// assigns: a scalar BTREE for enums and orderable scalars
389/// (DateTime/Date/numeric/Bool), FTS for free-text Strings, or a Vector index.
390/// Edges get BTree only (id, src, dst). This helper and the builder share
391/// `node_prop_index_kind` so they cannot drift — see its doc comment.
392pub(super) async fn needs_index_work_node(
393    db: &Omnigraph,
394    type_name: &str,
395    table_key: &str,
396    full_path: &str,
397    table_branch: Option<&str>,
398) -> Result<bool> {
399    let ds = db
400        .storage()
401        .open_dataset_head_for_write(table_key, full_path, table_branch)
402        .await?;
403    // Empty tables are skipped by the ensure_indices loop, so they must
404    // not be pinned in the sidecar — pinning a table that produces zero
405    // commits classifies as NoMovement on recovery and forces all-or-
406    // nothing rollback of sibling tables' legitimate index work.
407    // Errors from count_rows are propagated: silently treating them as
408    // "0 rows" risks skipping a table that is actually about to be
409    // modified.
410    if db.storage().count_rows(&ds, None).await? == 0 {
411        return Ok(false);
412    }
413    if !db.storage().has_btree_index(&ds, "id").await? {
414        return Ok(true);
415    }
416    let catalog = db.catalog();
417    let Some(node_type) = catalog.node_types.get(type_name) else {
418        return Ok(false);
419    };
420    for index_cols in &node_type.indices {
421        if index_cols.len() != 1 {
422            continue;
423        }
424        let prop_name = &index_cols[0];
425        let Some(prop_type) = node_type.properties.get(prop_name) else {
426            continue;
427        };
428        match node_prop_index_kind(prop_type) {
429            Some(NodePropIndexKind::Fts) => {
430                if !db.storage().has_fts_index(&ds, prop_name).await? {
431                    return Ok(true);
432                }
433            }
434            Some(NodePropIndexKind::Vector) => {
435                // Only count a missing vector index as buildable *work* when the
436                // column is trainable (>=1 non-null vector). An untrainable
437                // column would defer in the build and commit nothing; pinning it
438                // for recovery would be a zero-commit pin that classifies
439                // NoMovement and rolls back a sibling table's index work.
440                if !db.storage().has_vector_index(&ds, prop_name).await?
441                    && vector_column_trainable(db, &ds, prop_name).await?
442                {
443                    return Ok(true);
444                }
445            }
446            Some(NodePropIndexKind::Btree) => {
447                if !db.storage().has_btree_index(&ds, prop_name).await? {
448                    return Ok(true);
449                }
450            }
451            None => {}
452        }
453    }
454    Ok(false)
455}
456
457/// Companion to `needs_index_work_node` for edge tables.
458///
459/// **Intentional asymmetry with the node helper**: edges only need
460/// BTree indices (id, src, dst) per `build_indices_on_dataset_for_catalog`
461/// at the edge branch (this file, lines 474-485). FTS / vector indices
462/// on edge properties are not built today; if they ever are, this
463/// helper plus the build function must be updated together.
464///
465/// Empty edge tables are skipped by the ensure_indices loop the same
466/// way node tables are; see `needs_index_work_node`.
467pub(super) async fn needs_index_work_edge(
468    db: &Omnigraph,
469    table_key: &str,
470    full_path: &str,
471    table_branch: Option<&str>,
472) -> Result<bool> {
473    let ds = db
474        .storage()
475        .open_dataset_head_for_write(table_key, full_path, table_branch)
476        .await?;
477    if db.storage().count_rows(&ds, None).await? == 0 {
478        return Ok(false);
479    }
480    Ok(!db.storage().has_btree_index(&ds, "id").await?
481        || !db.storage().has_btree_index(&ds, "src").await?
482        || !db.storage().has_btree_index(&ds, "dst").await?)
483}
484
485pub(super) async fn open_for_mutation(
486    db: &Omnigraph,
487    table_key: &str,
488    op_kind: crate::db::MutationOpKind,
489) -> Result<(SnapshotHandle, String, Option<String>)> {
490    let current_branch = db
491        .coordinator
492        .read()
493        .await
494        .current_branch()
495        .map(str::to_string);
496    open_for_mutation_on_branch(db, current_branch.as_deref(), table_key, op_kind).await
497}
498
499/// Open a sub-table for mutation. The `op_kind` selects the strict-vs-relaxed
500/// pre-stage version-check policy — see [`crate::db::MutationOpKind`] for the
501/// rationale per kind. Insert / Merge skip the strict
502/// `ensure_expected_version` check (Lance's natural conflict resolver +
503/// per-(table, branch) queue + publisher CAS handle drift); Update / Delete /
504/// SchemaRewrite keep it (read-modify-write SI).
505pub(super) async fn open_for_mutation_on_branch(
506    db: &Omnigraph,
507    branch: Option<&str>,
508    table_key: &str,
509    op_kind: crate::db::MutationOpKind,
510) -> Result<(SnapshotHandle, String, Option<String>)> {
511    db.ensure_schema_apply_not_locked("write").await?;
512    let resolved = db.resolved_branch_target(branch).await?;
513    let entry = resolved
514        .snapshot
515        .entry(table_key)
516        .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
517    let full_path = format!("{}/{}", db.root_uri, entry.table_path);
518    match resolved.branch.as_deref() {
519        None => {
520            let ds = db
521                .storage()
522                .open_dataset_head_for_write(table_key, &full_path, None)
523                .await?;
524            if op_kind.strict_pre_stage_version_check() {
525                db.storage()
526                    .ensure_expected_version(&ds, table_key, entry.table_version)?;
527            }
528            Ok((ds, full_path, None))
529        }
530        Some(active_branch) => {
531            let (ds, table_branch) = open_owned_dataset_for_branch_write(
532                db,
533                table_key,
534                &full_path,
535                entry.table_branch.as_deref(),
536                entry.table_version,
537                active_branch,
538                op_kind,
539            )
540            .await?;
541            Ok((ds, full_path, table_branch))
542        }
543    }
544}
545
546pub(super) async fn open_owned_dataset_for_branch_write(
547    db: &Omnigraph,
548    table_key: &str,
549    full_path: &str,
550    entry_branch: Option<&str>,
551    entry_version: u64,
552    active_branch: &str,
553    op_kind: crate::db::MutationOpKind,
554) -> Result<(SnapshotHandle, Option<String>)> {
555    match entry_branch {
556        Some(branch) if branch == active_branch => {
557            let ds = db
558                .storage()
559                .open_dataset_head_for_write(table_key, full_path, Some(active_branch))
560                .await?;
561            if op_kind.strict_pre_stage_version_check() {
562                db.storage()
563                    .ensure_expected_version(&ds, table_key, entry_version)?;
564            }
565            Ok((ds, Some(active_branch.to_string())))
566        }
567        source_branch => {
568            crate::failpoints::maybe_fail("fork.before_classify")?;
569            // Authority check before forking: re-read the live manifest. If this
570            // table is already forked on active_branch, a concurrent first-write
571            // won the race and our snapshot is stale — that is a retryable
572            // conflict, not an orphan. (A zombie fork is never in the manifest,
573            // so this only fires for a live concurrent fork.)
574            let live = db.snapshot_for_branch(Some(active_branch)).await?;
575            if let Some(entry) = live.entry(table_key) {
576                if entry.table_branch.as_deref() == Some(active_branch) {
577                    return Err(OmniError::manifest_expected_version_mismatch(
578                        table_key,
579                        entry_version,
580                        entry.table_version,
581                    ));
582                }
583            }
584            // The fork advances Lance state before the manifest publish. The
585            // caller holds the per-(table, active_branch) write queue from
586            // before this fork through the publish, so a leftover ref is a
587            // manifest-unreferenced fork (interrupted prior fork, or
588            // delete+recreate), not a live in-process fork. The wrapper
589            // self-heals it (reclaim + re-fork); see
590            // `Omnigraph::fork_dataset_from_entry_state`.
591            db.fork_dataset_from_entry_state(
592                table_key,
593                full_path,
594                source_branch,
595                entry_version,
596                active_branch,
597            )
598            .await?;
599            let ds = db
600                .storage()
601                .open_dataset_head_for_write(table_key, full_path, Some(active_branch))
602                .await?;
603            if op_kind.strict_pre_stage_version_check() {
604                db.storage()
605                    .ensure_expected_version(&ds, table_key, entry_version)?;
606            }
607            Ok((ds, Some(active_branch.to_string())))
608        }
609    }
610}
611
612pub(super) async fn fork_dataset_from_entry_state(
613    db: &Omnigraph,
614    table_key: &str,
615    full_path: &str,
616    source_branch: Option<&str>,
617    source_version: u64,
618    active_branch: &str,
619) -> Result<crate::storage_layer::ForkOutcome<SnapshotHandle>> {
620    db.storage()
621        .fork_branch_from_state(
622            full_path,
623            source_branch,
624            table_key,
625            source_version,
626            active_branch,
627        )
628        .await
629}
630
631/// Classification of a Lance branch ref `B` on table `T` against FRESH manifest
632/// authority — the single decision both fork-ref reclaim sites share: the
633/// write-path reclaim ([`reclaim_orphaned_fork_and_refork`]) and the cleanup
634/// reconciler (`optimize::reconcile_orphaned_branches`). Having one classifier
635/// keeps the two destructive sites from drifting (the bug history: each was
636/// hardened separately and the other lagged).
637#[derive(Debug, Clone, Copy, PartialEq, Eq)]
638pub(crate) enum ForkRefStatus {
639    /// The manifest places `T` on `B` — a legitimate fork. Never destroy.
640    Legitimate,
641    /// The manifest does not reference this fork (`T` not on `B`, or `B` absent
642    /// from the manifest entirely). Reclaimable.
643    Orphan,
644    /// Fresh authority could not be established (a transient read failure on a
645    /// live branch). Ambiguous — do not destroy; the caller retries / converges.
646    Indeterminate,
647}
648
649/// Classify a fork ref from FRESH manifest authority (bypasses the coordinator
650/// cache). MUST be called with the per-`(table, branch)` write queue held, so
651/// the classification is stable against in-process writers for the caller's
652/// critical section. Both reclaim sites map the result to their own action
653/// (write path: reclaim vs retryable; cleanup: delete vs skip), but the
654/// destroy-only-on-`Orphan` rule is enforced here, once.
655pub(crate) async fn classify_fork_ref(
656    db: &Omnigraph,
657    table_key: &str,
658    branch: &str,
659) -> ForkRefStatus {
660    // `classify.fresh_read` failpoint: simulate a transient failure of the
661    // fresh-authority read (no-op without the `failpoints` feature). Lets a
662    // test exercise the Indeterminate path — a read failure on a live branch
663    // must classify as Indeterminate (skip), never Orphan (destroy).
664    let fresh = match crate::failpoints::maybe_fail("classify.fresh_read") {
665        Ok(()) => db.fresh_snapshot_for_branch(Some(branch)).await,
666        Err(injected) => Err(injected),
667    };
668    match fresh {
669        Ok(snap) => {
670            let placed = snap
671                .entry(table_key)
672                .map(|e| e.table_branch.as_deref() == Some(branch))
673                .unwrap_or(false);
674            if placed {
675                ForkRefStatus::Legitimate
676            } else {
677                // Branch resolves but the manifest does not place this table on
678                // it — a manifest-unreferenced fork.
679                ForkRefStatus::Orphan
680            }
681        }
682        // Branch did not resolve. `all_branches` lists `_refs/branches/` live, so
683        // absent there = genuinely no such manifest branch (origin-1 orphan);
684        // present (or a list error) = transient read — never destroy on that.
685        Err(_) => match db.coordinator.read().await.all_branches().await {
686            Ok(fresh) if !fresh.iter().any(|b| b == branch) => ForkRefStatus::Orphan,
687            _ => ForkRefStatus::Indeterminate,
688        },
689    }
690}
691
692/// Reclaim a manifest-unreferenced fork and re-fork in its place.
693///
694/// Reached when `fork_branch_from_state` reports `RefAlreadyExists`. This is a
695/// destructive op (it force-deletes a Lance branch ref), so it owns its own
696/// safety precondition rather than trusting the caller's: it re-derives, via
697/// [`classify_fork_ref`], that the manifest does not place this table on
698/// `active_branch`. The caller's earlier proof may have come from the
699/// coordinator's *cached* branch snapshot (`resolved_branch_target` returns
700/// the cache when the handle is bound to `active_branch` — an embedded handle
701/// on the branch, or `branch_merge`'s target swap); trusting it could
702/// force-delete a fork a concurrent writer just legitimately published. Only
703/// once fresh authority confirms the ref is unreferenced does it drop the ref
704/// (idempotent `force_delete_branch`) and re-fork, exactly once.
705///
706/// If fresh authority shows the table IS on `active_branch` (a legitimate
707/// concurrent fork), or a second collision occurs after reclaim (a foreign-
708/// process writer recreated the ref — the documented one-winner-CAS gap), it
709/// surfaces a retryable conflict; on retry the winner's fork is visible and
710/// the no-fork path runs.
711pub(super) async fn reclaim_orphaned_fork_and_refork(
712    db: &Omnigraph,
713    table_key: &str,
714    full_path: &str,
715    source_branch: Option<&str>,
716    source_version: u64,
717    active_branch: &str,
718) -> Result<SnapshotHandle> {
719    // Self-validate against FRESH authority before destroying anything. Only an
720    // Orphan is reclaimable; a Legitimate status (a concurrent writer published
721    // a real fork despite the caller's possibly-cached proof) or an
722    // Indeterminate one (transient read) surfaces a retryable conflict rather
723    // than stranding the manifest at a version the recreated ref won't have.
724    match classify_fork_ref(db, table_key, active_branch).await {
725        ForkRefStatus::Orphan => {}
726        ForkRefStatus::Legitimate => {
727            let actual = db
728                .fresh_snapshot_for_branch(Some(active_branch))
729                .await
730                .ok()
731                .and_then(|s| s.entry(table_key).map(|e| e.table_version))
732                .unwrap_or(source_version);
733            return Err(OmniError::manifest_expected_version_mismatch(
734                table_key,
735                source_version,
736                actual,
737            ));
738        }
739        ForkRefStatus::Indeterminate => {
740            return Err(OmniError::manifest_conflict(format!(
741                "could not verify whether branch '{active_branch}' still owns an orphaned \
742                 fork for table '{table_key}' because fresh manifest authority was \
743                 unavailable; refresh and retry"
744            )));
745        }
746    }
747
748    crate::failpoints::maybe_fail("fork.before_reclaim")?;
749    db.storage()
750        .force_delete_branch(full_path, active_branch)
751        .await
752        .map_err(|e| {
753            // Lance refuses to delete a branch with dependent child branches
754            // even under force (RefConflict). Unreachable for a leaf first-write
755            // fork (the cleanup reconciler also drops children before parents),
756            // but surface it actionably if it ever happens. We match loosely on
757            // "referenc" rather than the exact prose, which is not a Lance API
758            // contract; a typed RefConflict variant through `force_delete_branch`
759            // is the durable follow-up.
760            if e.to_string().contains("referenc") {
761                OmniError::manifest_conflict(format!(
762                    "branch '{active_branch}' cannot reclaim the leftover fork for \
763                     table '{table_key}' because it has dependent child branches; \
764                     delete the child branches (or run `omnigraph cleanup`) first"
765                ))
766            } else {
767                e
768            }
769        })?;
770
771    match fork_dataset_from_entry_state(
772        db,
773        table_key,
774        full_path,
775        source_branch,
776        source_version,
777        active_branch,
778    )
779    .await?
780    {
781        crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
782        crate::storage_layer::ForkOutcome::RefAlreadyExists => {
783            let live = db.fresh_snapshot_for_branch(Some(active_branch)).await?;
784            let actual = live
785                .entry(table_key)
786                .map(|e| e.table_version)
787                .unwrap_or(source_version);
788            Err(OmniError::manifest_expected_version_mismatch(
789                table_key,
790                source_version,
791                actual,
792            ))
793        }
794    }
795}
796
797pub(super) async fn reopen_for_mutation(
798    db: &Omnigraph,
799    table_key: &str,
800    full_path: &str,
801    table_branch: Option<&str>,
802    expected_version: u64,
803    op_kind: crate::db::MutationOpKind,
804) -> Result<SnapshotHandle> {
805    db.ensure_schema_apply_not_locked("write").await?;
806    if op_kind.strict_pre_stage_version_check() {
807        db.storage()
808            .reopen_for_mutation(full_path, table_branch, table_key, expected_version)
809            .await
810    } else {
811        // Insert / Merge: skip the strict version check. Open at HEAD —
812        // Lance's natural conflict resolver at commit_staged time
813        // (rebase append, dedupe merge_insert) handles concurrent
814        // writers correctly; the publisher CAS in
815        // `MutationStaging::commit_all` (refreshed under the
816        // per-(table, branch) queue via `snapshot_for_branch`) catches
817        // genuine cross-process drift as 409. See
818        // [`crate::db::MutationOpKind`] for the policy rationale.
819        let _ = expected_version;
820        db.storage()
821            .open_dataset_head_for_write(table_key, full_path, table_branch)
822            .await
823    }
824}
825
826pub(super) async fn open_dataset_at_state(
827    db: &Omnigraph,
828    table_path: &str,
829    table_branch: Option<&str>,
830    table_version: u64,
831) -> Result<SnapshotHandle> {
832    db.storage()
833        .open_dataset_at_state(table_path, table_branch, table_version)
834        .await
835}
836
837/// A declared index the builder could not materialize on this pass. Today the
838/// only such case is a vector (IVF) column with no trainable vectors yet
839/// (KMeans needs >=1 vector), e.g. the load-before-embed window. Reported, not
840/// fatal: a later `ensure_indices`/`optimize` retries once the column is
841/// buildable, and reads stay correct via brute-force meanwhile. Surfacing
842/// pending index *status* rather than failing the operation is the database
843/// norm (Postgres `indisvalid`, LanceDB `list_indices`).
844#[derive(Debug, Clone)]
845pub struct PendingIndex {
846    pub table_key: String,
847    pub column: String,
848    pub reason: String,
849}
850
851pub(super) async fn build_indices_on_dataset(
852    db: &Omnigraph,
853    table_key: &str,
854    ds: &mut SnapshotHandle,
855) -> Result<Vec<PendingIndex>> {
856    let catalog = db.catalog();
857    build_indices_on_dataset_for_catalog(db, &catalog, table_key, ds).await
858}
859
860pub(super) async fn build_indices_on_dataset_for_catalog(
861    db: &Omnigraph,
862    catalog: &Catalog,
863    table_key: &str,
864    ds: &mut SnapshotHandle,
865) -> Result<Vec<PendingIndex>> {
866    if let Some(type_name) = table_key.strip_prefix("node:") {
867        let mut pending = Vec::new();
868        if !db.storage().has_btree_index(ds, "id").await? {
869            stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
870        }
871
872        if let Some(node_type) = catalog.node_types.get(type_name) {
873            // Stage scalar indices first (BTree, Inverted), then call
874            // `create_vector_index` inline. The inline-commit on a vector
875            // index advances HEAD, which would invalidate any uncommitted
876            // scalar index transactions if we stacked them. Today the
877            // per-stage shape commits each scalar index immediately so
878            // the order constraint is implicit, but if we ever batch
879            // scalar stages we must ensure they all land before the
880            // vector inline-commit.
881            for index_cols in &node_type.indices {
882                if index_cols.len() != 1 {
883                    continue;
884                }
885                let prop_name = &index_cols[0];
886                if let Some(prop_type) = node_type.properties.get(prop_name) {
887                    match node_prop_index_kind(prop_type) {
888                        Some(NodePropIndexKind::Fts) => {
889                            if !db.storage().has_fts_index(ds, prop_name).await? {
890                                stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
891                                    .await?;
892                            }
893                        }
894                        Some(NodePropIndexKind::Vector) => {
895                            if !db.storage().has_vector_index(ds, prop_name).await? {
896                                // A vector (IVF) index trains k-means over the column,
897                                // so it needs >=1 non-null vector (KMeans errors
898                                // "cannot train N centroids with 0 vectors"). Precheck
899                                // trainability: a column with no vectors yet (e.g. rows
900                                // loaded before `embed`) is recorded as a *pending*
901                                // index and skipped — deferred, not failed. The SAME
902                                // predicate gates `needs_index_work_node`, so an
903                                // untrainable column is never pinned for recovery (no
904                                // zero-commit pin that would roll back a sibling
905                                // table's index work). This function is the chokepoint
906                                // every write path funnels through (load/mutate, schema
907                                // apply, ensure_indices, optimize, merge), realizing
908                                // the governing principle — physical index state never
909                                // fails a logical operation. Only when trainable do we
910                                // attempt the build, and then we PROPAGATE any error: a
911                                // genuine I/O/manifest/Lance failure must stay fatal,
912                                // not be hidden as pending. (Vector creation is an
913                                // inline-commit residual until lance#6666; iss-951.)
914                                if vector_column_trainable(db, ds, prop_name).await? {
915                                    let new_snap = db
916                                        .storage_inline_residual()
917                                        .create_vector_index(ds.clone(), prop_name.as_str())
918                                        .await
919                                        .map_err(|e| {
920                                            OmniError::Lance(format!(
921                                                "create Vector index on {}({}): {}",
922                                                table_key, prop_name, e
923                                            ))
924                                        })?;
925                                    *ds = new_snap;
926                                } else {
927                                    tracing::info!(
928                                        target: "omnigraph::index",
929                                        table = %table_key,
930                                        column = %prop_name,
931                                        "deferring Vector index: column has no \
932                                         trainable vectors yet",
933                                    );
934                                    pending.push(PendingIndex {
935                                        table_key: table_key.to_string(),
936                                        column: prop_name.clone(),
937                                        reason: "column has no non-null vectors to \
938                                                 train on yet"
939                                            .to_string(),
940                                    });
941                                }
942                            }
943                        }
944                        // Enum + orderable scalars (DateTime/Date/numeric/Bool)
945                        // get a BTREE so `=`, range, IN, and IS NULL are index-
946                        // accelerated instead of degrading to a full scan.
947                        Some(NodePropIndexKind::Btree) => {
948                            if !db.storage().has_btree_index(ds, prop_name).await? {
949                                stage_and_commit_btree(db, table_key, ds, &[prop_name.as_str()])
950                                    .await?;
951                            }
952                        }
953                        // List or Blob column: not indexable as a scalar here.
954                        None => {}
955                    }
956                }
957            }
958        }
959        return Ok(pending);
960    }
961
962    if table_key.starts_with("edge:") {
963        if !db.storage().has_btree_index(ds, "id").await? {
964            stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
965        }
966        if !db.storage().has_btree_index(ds, "src").await? {
967            stage_and_commit_btree(db, table_key, ds, &["src"]).await?;
968        }
969        if !db.storage().has_btree_index(ds, "dst").await? {
970            stage_and_commit_btree(db, table_key, ds, &["dst"]).await?;
971        }
972        // Edge tables only get BTree (id/src/dst), which build at any
973        // cardinality; no pending state is possible here.
974        return Ok(Vec::new());
975    }
976
977    Err(OmniError::manifest(format!(
978        "invalid table key '{}'",
979        table_key
980    )))
981}
982
983/// Stage a BTREE index transaction and commit it, advancing the in-memory
984/// `*ds` to the new HEAD. The staged primitive + immediate `commit_staged`
985/// pair replaced the earlier inline-commit `create_btree_index(ds)` call.
986/// Per-call behavior is unchanged (HEAD advances once per index), but
987/// the bytes-on-disk and HEAD-advance are now decoupled at the
988/// `TableStore` API surface — a caller that needs end-of-batch atomicity
989/// can stage many transactions and commit them in one pass (the eventual
990/// index reconciler relies on this).
991async fn stage_and_commit_btree(
992    db: &Omnigraph,
993    table_key: &str,
994    ds: &mut SnapshotHandle,
995    columns: &[&str],
996) -> Result<()> {
997    let staged = db
998        .storage()
999        .stage_create_btree_index(ds, columns)
1000        .await
1001        .map_err(|e| {
1002            OmniError::Lance(format!(
1003                "stage_create_btree_index on {}({:?}): {}",
1004                table_key, columns, e
1005            ))
1006        })?;
1007    // Failpoint between stage and commit. Used by `tests/failpoints.rs`
1008    // to demonstrate that a stage-step failure in the staged-index
1009    // path (`stage_create_btree_index` succeeded; `commit_staged` not
1010    // yet called) leaves no Lance-HEAD drift on the touched table.
1011    crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?;
1012    let new_ds = db
1013        .storage()
1014        .commit_staged(ds.clone(), staged)
1015        .await
1016        .map_err(|e| {
1017            OmniError::Lance(format!(
1018                "commit BTree index on {}({:?}): {}",
1019                table_key, columns, e
1020            ))
1021        })?;
1022    *ds = new_ds;
1023    Ok(())
1024}
1025
1026/// Stage an INVERTED (FTS) index transaction and commit it. See
1027/// `stage_and_commit_btree` for the rationale.
1028async fn stage_and_commit_inverted(
1029    db: &Omnigraph,
1030    table_key: &str,
1031    ds: &mut SnapshotHandle,
1032    column: &str,
1033) -> Result<()> {
1034    let staged = db
1035        .storage()
1036        .stage_create_inverted_index(ds, column)
1037        .await
1038        .map_err(|e| {
1039            OmniError::Lance(format!(
1040                "stage_create_inverted_index on {}({}): {}",
1041                table_key, column, e
1042            ))
1043        })?;
1044    let new_ds = db
1045        .storage()
1046        .commit_staged(ds.clone(), staged)
1047        .await
1048        .map_err(|e| {
1049            OmniError::Lance(format!(
1050                "commit Inverted index on {}({}): {}",
1051                table_key, column, e
1052            ))
1053        })?;
1054    *ds = new_ds;
1055    Ok(())
1056}
1057
1058async fn prepare_updates_for_commit(
1059    db: &Omnigraph,
1060    branch: Option<&str>,
1061    updates: &[crate::db::SubTableUpdate],
1062) -> Result<Vec<crate::db::SubTableUpdate>> {
1063    if updates.is_empty() {
1064        return Ok(Vec::new());
1065    }
1066
1067    let snapshot = db.snapshot_for_branch(branch).await?;
1068    let mut prepared = Vec::with_capacity(updates.len());
1069
1070    for update in updates {
1071        let Some(entry) = snapshot.entry(&update.table_key) else {
1072            return Err(OmniError::manifest(format!(
1073                "no manifest entry for {}",
1074                update.table_key
1075            )));
1076        };
1077
1078        let mut prepared_update = update.clone();
1079        if prepared_update.row_count > 0 {
1080            let full_path = format!("{}/{}", db.root_uri, entry.table_path);
1081            // Strict version check is correct here: this runs INSIDE
1082            // the publisher commit path, after `commit_staged` already
1083            // advanced Lance HEAD to `prepared_update.table_version`.
1084            // The check is a defense-in-depth assertion that the
1085            // dataset state matches what we just committed; not the
1086            // pre-stage race the op-kind policy targets.
1087            let mut ds = reopen_for_mutation(
1088                db,
1089                &prepared_update.table_key,
1090                &full_path,
1091                prepared_update.table_branch.as_deref(),
1092                prepared_update.table_version,
1093                crate::db::MutationOpKind::SchemaRewrite,
1094            )
1095            .await?;
1096            // Any column not yet buildable (e.g. a vector column whose rows
1097            // have null embeddings) is deferred and logged inside
1098            // build_indices; a later ensure_indices/optimize materializes it.
1099            // The load/mutate/merge commit must not fail on it.
1100            let _pending = build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?;
1101            let state = db.storage().table_state(&full_path, &ds).await?;
1102            prepared_update.table_version = state.version;
1103            prepared_update.row_count = state.row_count;
1104            prepared_update.version_metadata = state.version_metadata;
1105        }
1106
1107        prepared.push(prepared_update);
1108    }
1109
1110    Ok(prepared)
1111}
1112
1113async fn commit_prepared_updates(
1114    db: &Omnigraph,
1115    updates: &[crate::db::SubTableUpdate],
1116    actor_id: Option<&str>,
1117) -> Result<u64> {
1118    let PublishedSnapshot {
1119        manifest_version,
1120        _snapshot_id: _,
1121    } = db
1122        .coordinator
1123        .write()
1124        .await
1125        .commit_updates_with_actor(updates, actor_id)
1126        .await?;
1127    Ok(manifest_version)
1128}
1129
1130async fn commit_prepared_updates_with_expected(
1131    db: &Omnigraph,
1132    updates: &[crate::db::SubTableUpdate],
1133    expected_table_versions: &std::collections::HashMap<String, u64>,
1134    actor_id: Option<&str>,
1135) -> Result<u64> {
1136    let PublishedSnapshot {
1137        manifest_version,
1138        _snapshot_id: _,
1139    } = db
1140        .coordinator
1141        .write()
1142        .await
1143        .commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id)
1144        .await?;
1145    Ok(manifest_version)
1146}
1147
1148pub(super) async fn commit_prepared_updates_on_branch(
1149    db: &Omnigraph,
1150    branch: Option<&str>,
1151    updates: &[crate::db::SubTableUpdate],
1152    actor_id: Option<&str>,
1153) -> Result<u64> {
1154    let current_branch = db
1155        .coordinator
1156        .read()
1157        .await
1158        .current_branch()
1159        .map(str::to_string);
1160    let requested_branch = branch.map(str::to_string);
1161    if requested_branch == current_branch {
1162        return commit_prepared_updates(db, updates, actor_id).await;
1163    }
1164
1165    let mut coordinator = match requested_branch.as_deref() {
1166        Some(branch) => {
1167            GraphCoordinator::open_branch(db.uri(), branch, Arc::clone(&db.storage)).await?
1168        }
1169        None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?,
1170    };
1171    let PublishedSnapshot {
1172        manifest_version,
1173        _snapshot_id: _,
1174    } = coordinator
1175        .commit_updates_with_actor(updates, actor_id)
1176        .await?;
1177    Ok(manifest_version)
1178}
1179
1180pub(super) async fn commit_prepared_updates_on_branch_with_expected(
1181    db: &Omnigraph,
1182    branch: Option<&str>,
1183    updates: &[crate::db::SubTableUpdate],
1184    expected_table_versions: &std::collections::HashMap<String, u64>,
1185    actor_id: Option<&str>,
1186) -> Result<u64> {
1187    let current_branch = db
1188        .coordinator
1189        .read()
1190        .await
1191        .current_branch()
1192        .map(str::to_string);
1193    let requested_branch = branch.map(str::to_string);
1194    if requested_branch == current_branch {
1195        return commit_prepared_updates_with_expected(
1196            db,
1197            updates,
1198            expected_table_versions,
1199            actor_id,
1200        )
1201        .await;
1202    }
1203
1204    let mut coordinator = match requested_branch.as_deref() {
1205        Some(branch) => {
1206            GraphCoordinator::open_branch(db.uri(), branch, Arc::clone(&db.storage)).await?
1207        }
1208        None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?,
1209    };
1210    let PublishedSnapshot {
1211        manifest_version,
1212        _snapshot_id: _,
1213    } = coordinator
1214        .commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id)
1215        .await?;
1216    Ok(manifest_version)
1217}
1218
1219// Used only by in-tree tests (`#[cfg(test)]`); the runtime path now uses
1220// `commit_updates_on_branch_with_expected` exclusively.
1221#[cfg(test)]
1222pub(super) async fn commit_updates(
1223    db: &mut Omnigraph,
1224    updates: &[crate::db::SubTableUpdate],
1225) -> Result<u64> {
1226    db.ensure_schema_apply_not_locked("write commit").await?;
1227    let current_branch = db
1228        .coordinator
1229        .read()
1230        .await
1231        .current_branch()
1232        .map(str::to_string);
1233    let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?;
1234    commit_prepared_updates(db, &prepared, None).await
1235}
1236
1237pub(super) async fn commit_manifest_updates(
1238    db: &Omnigraph,
1239    updates: &[crate::db::SubTableUpdate],
1240) -> Result<u64> {
1241    db.coordinator
1242        .write()
1243        .await
1244        .commit_manifest_updates(updates)
1245        .await
1246}
1247
1248pub(super) async fn record_merge_commit(
1249    db: &Omnigraph,
1250    manifest_version: u64,
1251    parent_commit_id: &str,
1252    merged_parent_commit_id: &str,
1253    actor_id: Option<&str>,
1254) -> Result<String> {
1255    db.coordinator
1256        .write()
1257        .await
1258        .record_merge_commit(
1259            manifest_version,
1260            parent_commit_id,
1261            merged_parent_commit_id,
1262            actor_id,
1263        )
1264        .await
1265        .map(|snapshot_id| snapshot_id.as_str().to_string())
1266}
1267
1268/// Commit updates with a publisher-level OCC fence. The
1269/// `expected_table_versions` map asserts the manifest's pre-write per-table
1270/// versions; mismatches surface as `ManifestConflictDetails::ExpectedVersionMismatch`.
1271pub(super) async fn commit_updates_on_branch_with_expected(
1272    db: &Omnigraph,
1273    branch: Option<&str>,
1274    updates: &[crate::db::SubTableUpdate],
1275    expected_table_versions: &std::collections::HashMap<String, u64>,
1276    actor_id: Option<&str>,
1277) -> Result<u64> {
1278    db.ensure_schema_apply_not_locked("write commit").await?;
1279    let prepared = prepare_updates_for_commit(db, branch, updates).await?;
1280    commit_prepared_updates_on_branch_with_expected(
1281        db,
1282        branch,
1283        &prepared,
1284        expected_table_versions,
1285        actor_id,
1286    )
1287    .await
1288}
1289
1290pub(super) async fn ensure_commit_graph_initialized(db: &Omnigraph) -> Result<()> {
1291    db.coordinator
1292        .write()
1293        .await
1294        .ensure_commit_graph_initialized()
1295        .await
1296}
1297
1298pub(super) async fn invalidate_graph_index(db: &Omnigraph) {
1299    db.runtime_cache.invalidate_all().await;
1300}
1301
1302#[cfg(test)]
1303mod classify_fork_ref_tests {
1304    //! Direct coverage of [`classify_fork_ref`] — the single fresh-authority
1305    //! decision both fork-ref reclaim sites (write-path reclaim + cleanup
1306    //! reconciler) route through. Pins each deterministic status so reverting
1307    //! the fresh-authority logic at either site fails here. (The `Indeterminate`
1308    //! arm needs an injected transient read and is covered under the
1309    //! `failpoints` suite.)
1310    use super::*;
1311    use crate::db::Omnigraph;
1312    use crate::loader::LoadMode;
1313
1314    const SCHEMA: &str = "node Person { name: String @key }\nnode Company { name: String @key }\n";
1315
1316    /// On-disk dataset path for a node table, taken from the manifest entry
1317    /// (the same path the engine uses) so the test forges against the real ref.
1318    async fn node_path(db: &Omnigraph, branch: &str, table_key: &str) -> String {
1319        let snap = db.snapshot_for_branch(Some(branch)).await.unwrap();
1320        let entry = snap.entry(table_key).unwrap();
1321        format!("{}/{}", db.root_uri, entry.table_path)
1322    }
1323
1324    #[tokio::test]
1325    async fn classify_distinguishes_legitimate_unreferenced_and_ghost() {
1326        let dir = tempfile::tempdir().unwrap();
1327        let db = Omnigraph::init(dir.path().to_str().unwrap(), SCHEMA)
1328            .await
1329            .unwrap();
1330        db.branch_create("feature").await.unwrap();
1331
1332        // Legitimate: a real write forks Company onto `feature`, and the
1333        // manifest places Company on `feature`.
1334        db.load_as(
1335            "feature",
1336            None,
1337            r#"{"type":"Company","data":{"name":"Acme"}}"#,
1338            LoadMode::Merge,
1339            None,
1340        )
1341        .await
1342        .unwrap();
1343        assert_eq!(
1344            classify_fork_ref(&db, "node:Company", "feature").await,
1345            ForkRefStatus::Legitimate,
1346            "a manifest-placed fork must classify as Legitimate (never destroyed)"
1347        );
1348
1349        // Orphan (manifest-unreferenced): forge a `feature` ref on Person, which
1350        // the manifest's `feature` snapshot still places on main.
1351        let person = node_path(&db, "feature", "node:Person").await;
1352        {
1353            let mut ds = lance::Dataset::open(&person).await.unwrap();
1354            let v = ds.version().version;
1355            ds.create_branch("feature", v, None).await.unwrap();
1356        }
1357        assert_eq!(
1358            classify_fork_ref(&db, "node:Person", "feature").await,
1359            ForkRefStatus::Orphan,
1360            "a ref the manifest does not place on the branch must classify as Orphan"
1361        );
1362
1363        // Orphan (ghost): a ref for a branch the manifest does not have at all.
1364        {
1365            let mut ds = lance::Dataset::open(&person).await.unwrap();
1366            let v = ds.version().version;
1367            ds.create_branch("ghost", v, None).await.unwrap();
1368        }
1369        assert_eq!(
1370            classify_fork_ref(&db, "node:Person", "ghost").await,
1371            ForkRefStatus::Orphan,
1372            "a ref for a branch absent from the manifest must classify as Orphan"
1373        );
1374    }
1375}