1use super::*;
2
3pub(super) async fn graph_index(db: &Omnigraph) -> Result<Arc<crate::graph_index::GraphIndex>> {
4 db.ensure_schema_state_valid().await?;
5 let coord = db.coordinator.read().await;
6 let resolved = coord
7 .resolve_target(&ReadTarget::Branch(
8 coord.current_branch().unwrap_or("main").to_string(),
9 ))
10 .await?;
11 drop(coord);
12 let catalog = db.catalog();
13 db.runtime_cache.graph_index(&resolved, &catalog).await
14}
15
16pub(super) async fn graph_index_for_resolved(
17 db: &Omnigraph,
18 resolved: &ResolvedTarget,
19) -> Result<Arc<crate::graph_index::GraphIndex>> {
20 let catalog = db.catalog();
21 db.runtime_cache.graph_index(resolved, &catalog).await
22}
23
24pub(super) async fn ensure_indices(db: &Omnigraph) -> Result<Vec<PendingIndex>> {
25 let current_branch = db
26 .coordinator
27 .read()
28 .await
29 .current_branch()
30 .map(str::to_string);
31 ensure_indices_for_branch(db, current_branch.as_deref()).await
32}
33
34pub(super) async fn ensure_indices_on(db: &Omnigraph, branch: &str) -> Result<Vec<PendingIndex>> {
35 let branch = normalize_branch_name(branch)?;
36 ensure_indices_for_branch(db, branch.as_deref()).await
37}
38
39#[cfg(feature = "failpoints")]
40pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test(
41 db: &mut Omnigraph,
42 branch: &str,
43 table_key: &str,
44 table_branch: Option<&str>,
45) -> Result<u64> {
46 let branch = normalize_branch_name(branch)?;
47 let snapshot = db.snapshot_for_branch(branch.as_deref()).await?;
48 let entry = snapshot
49 .entry(table_key)
50 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
51 let full_path = format!("{}/{}", db.root_uri, entry.table_path);
52 let ds = db
53 .storage()
54 .open_dataset_head_for_write(table_key, &full_path, table_branch)
55 .await?;
56 let state = db.storage().table_state(&full_path, &ds).await?;
57 let update = crate::db::SubTableUpdate {
58 table_key: table_key.to_string(),
59 table_version: state.version,
60 table_branch: table_branch.map(str::to_string),
61 row_count: state.row_count,
62 version_metadata: state.version_metadata,
63 };
64 let mut expected = std::collections::HashMap::new();
65 expected.insert(table_key.to_string(), entry.table_version);
66 commit_prepared_updates_on_branch_with_expected(
67 db,
68 branch.as_deref(),
69 &[update],
70 &expected,
71 None,
72 )
73 .await
74}
75
76pub(super) async fn ensure_indices_for_branch(
77 db: &Omnigraph,
78 branch: Option<&str>,
79) -> Result<Vec<PendingIndex>> {
80 db.ensure_schema_state_valid().await?;
81 db.ensure_schema_apply_idle("ensure_indices").await?;
82 let resolved = db.resolved_branch_target(branch).await?;
83 let snapshot = resolved.snapshot;
84 let mut updates = Vec::new();
85 let mut pending = Vec::new();
86 let active_branch = resolved.branch;
87 let catalog = db.catalog();
88
89 let mut recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = Vec::new();
99 for type_name in catalog.node_types.keys() {
100 let table_key = format!("node:{}", type_name);
101 let Some(entry) = snapshot.entry(&table_key) else {
102 continue;
103 };
104 if active_branch.is_some() && entry.table_branch.is_none() {
111 continue;
112 }
113 let full_path = format!("{}/{}", db.root_uri, entry.table_path);
114 if needs_index_work_node(
115 db,
116 type_name,
117 &table_key,
118 &full_path,
119 entry.table_branch.as_deref(),
120 )
121 .await?
122 {
123 recovery_pins.push(crate::db::manifest::SidecarTablePin {
124 table_key,
125 table_path: full_path,
126 expected_version: entry.table_version,
127 post_commit_pin: entry.table_version + 1,
128 table_branch: active_branch.clone(),
135 });
136 }
137 }
138 for edge_name in catalog.edge_types.keys() {
139 let table_key = format!("edge:{}", edge_name);
140 let Some(entry) = snapshot.entry(&table_key) else {
141 continue;
142 };
143 if active_branch.is_some() && entry.table_branch.is_none() {
144 continue;
145 }
146 let full_path = format!("{}/{}", db.root_uri, entry.table_path);
147 if needs_index_work_edge(db, &table_key, &full_path, entry.table_branch.as_deref()).await? {
148 recovery_pins.push(crate::db::manifest::SidecarTablePin {
149 table_key,
150 table_path: full_path,
151 expected_version: entry.table_version,
152 post_commit_pin: entry.table_version + 1,
153 table_branch: active_branch.clone(),
160 });
161 }
162 }
163 let queue_keys: Vec<(String, Option<String>)> = recovery_pins
170 .iter()
171 .map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
172 .collect();
173 let _queue_guards = db.write_queue().acquire_many(&queue_keys).await;
174
175 let recovery_handle = if recovery_pins.is_empty() {
176 None
177 } else {
178 let sidecar = crate::db::manifest::new_sidecar(
179 crate::db::manifest::SidecarKind::EnsureIndices,
180 active_branch.clone(),
181 None,
184 recovery_pins,
185 );
186 Some(
187 crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
188 .await?,
189 )
190 };
191
192 for type_name in catalog.node_types.keys() {
193 let table_key = format!("node:{}", type_name);
194 let Some(entry) = snapshot.entry(&table_key) else {
195 continue;
196 };
197 let full_path = format!("{}/{}", db.root_uri, entry.table_path);
198 let (mut ds, resolved_branch) = match active_branch.as_deref() {
199 Some(active_branch) => match entry.table_branch.as_deref() {
200 None => continue,
201 _ => {
202 open_owned_dataset_for_branch_write(
203 db,
204 &table_key,
205 &full_path,
206 entry.table_branch.as_deref(),
207 entry.table_version,
208 active_branch,
209 crate::db::MutationOpKind::SchemaRewrite,
210 )
211 .await?
212 }
213 },
214 None => (
215 db.storage()
216 .open_dataset_head_for_write(&table_key, &full_path, None)
217 .await?,
218 None,
219 ),
220 };
221 let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0);
222 if row_count > 0 {
223 pending.extend(build_indices_on_dataset(db, &table_key, &mut ds).await?);
224 }
225
226 let state = db.storage().table_state(&full_path, &ds).await?;
227 if state.version != entry.table_version
228 || resolved_branch.as_deref() != entry.table_branch.as_deref()
229 {
230 updates.push(crate::db::SubTableUpdate {
231 table_key,
232 table_version: state.version,
233 table_branch: resolved_branch,
234 row_count: state.row_count,
235 version_metadata: state.version_metadata,
236 });
237 }
238 }
239
240 for edge_name in catalog.edge_types.keys() {
241 let table_key = format!("edge:{}", edge_name);
242 let Some(entry) = snapshot.entry(&table_key) else {
243 continue;
244 };
245 let full_path = format!("{}/{}", db.root_uri, entry.table_path);
246 let (mut ds, resolved_branch) = match active_branch.as_deref() {
247 Some(active_branch) => match entry.table_branch.as_deref() {
248 None => continue,
249 _ => {
250 open_owned_dataset_for_branch_write(
251 db,
252 &table_key,
253 &full_path,
254 entry.table_branch.as_deref(),
255 entry.table_version,
256 active_branch,
257 crate::db::MutationOpKind::SchemaRewrite,
258 )
259 .await?
260 }
261 },
262 None => (
263 db.storage()
264 .open_dataset_head_for_write(&table_key, &full_path, None)
265 .await?,
266 None,
267 ),
268 };
269 let row_count = db.storage().count_rows(&ds, None).await.unwrap_or(0);
270 if row_count > 0 {
271 pending.extend(build_indices_on_dataset(db, &table_key, &mut ds).await?);
272 }
273
274 let state = db.storage().table_state(&full_path, &ds).await?;
275 if state.version != entry.table_version
276 || resolved_branch.as_deref() != entry.table_branch.as_deref()
277 {
278 updates.push(crate::db::SubTableUpdate {
279 table_key,
280 table_version: state.version,
281 table_branch: resolved_branch,
282 row_count: state.row_count,
283 version_metadata: state.version_metadata,
284 });
285 }
286 }
287
288 crate::failpoints::maybe_fail("ensure_indices.post_phase_b_pre_manifest_commit")?;
294
295 if !updates.is_empty() {
296 commit_prepared_updates_on_branch(db, branch, &updates, None).await?;
297 }
298
299 if let Some(handle) = recovery_handle {
304 if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
305 tracing::warn!(
306 error = %err,
307 operation_id = handle.operation_id.as_str(),
308 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
309 );
310 }
311 }
312
313 Ok(pending)
314}
315
316#[derive(Clone, Copy, PartialEq, Eq, Debug)]
326enum NodePropIndexKind {
327 Btree,
328 Fts,
329 Vector,
330}
331
332fn node_prop_index_kind(prop_type: &PropType) -> Option<NodePropIndexKind> {
333 if prop_type.list {
334 return None;
335 }
336 let is_enum = prop_type.enum_values.is_some();
341 match prop_type.scalar {
342 ScalarType::String if !is_enum => Some(NodePropIndexKind::Fts),
343 ScalarType::Vector(_) => Some(NodePropIndexKind::Vector),
344 ScalarType::String
345 | ScalarType::DateTime
346 | ScalarType::Date
347 | ScalarType::I32
348 | ScalarType::I64
349 | ScalarType::U32
350 | ScalarType::U64
351 | ScalarType::F32
352 | ScalarType::F64
353 | ScalarType::Bool => Some(NodePropIndexKind::Btree),
354 ScalarType::Blob => None,
355 }
356}
357
358async fn vector_column_trainable(
367 db: &Omnigraph,
368 ds: &SnapshotHandle,
369 column: &str,
370) -> Result<bool> {
371 Ok(db
372 .storage()
373 .count_rows(ds, Some(format!("{column} IS NOT NULL")))
374 .await?
375 > 0)
376}
377
378pub(super) async fn needs_index_work_node(
393 db: &Omnigraph,
394 type_name: &str,
395 table_key: &str,
396 full_path: &str,
397 table_branch: Option<&str>,
398) -> Result<bool> {
399 let ds = db
400 .storage()
401 .open_dataset_head_for_write(table_key, full_path, table_branch)
402 .await?;
403 if db.storage().count_rows(&ds, None).await? == 0 {
411 return Ok(false);
412 }
413 if !db.storage().has_btree_index(&ds, "id").await? {
414 return Ok(true);
415 }
416 let catalog = db.catalog();
417 let Some(node_type) = catalog.node_types.get(type_name) else {
418 return Ok(false);
419 };
420 for index_cols in &node_type.indices {
421 if index_cols.len() != 1 {
422 continue;
423 }
424 let prop_name = &index_cols[0];
425 let Some(prop_type) = node_type.properties.get(prop_name) else {
426 continue;
427 };
428 match node_prop_index_kind(prop_type) {
429 Some(NodePropIndexKind::Fts) => {
430 if !db.storage().has_fts_index(&ds, prop_name).await? {
431 return Ok(true);
432 }
433 }
434 Some(NodePropIndexKind::Vector) => {
435 if !db.storage().has_vector_index(&ds, prop_name).await?
441 && vector_column_trainable(db, &ds, prop_name).await?
442 {
443 return Ok(true);
444 }
445 }
446 Some(NodePropIndexKind::Btree) => {
447 if !db.storage().has_btree_index(&ds, prop_name).await? {
448 return Ok(true);
449 }
450 }
451 None => {}
452 }
453 }
454 Ok(false)
455}
456
457pub(super) async fn needs_index_work_edge(
468 db: &Omnigraph,
469 table_key: &str,
470 full_path: &str,
471 table_branch: Option<&str>,
472) -> Result<bool> {
473 let ds = db
474 .storage()
475 .open_dataset_head_for_write(table_key, full_path, table_branch)
476 .await?;
477 if db.storage().count_rows(&ds, None).await? == 0 {
478 return Ok(false);
479 }
480 Ok(!db.storage().has_btree_index(&ds, "id").await?
481 || !db.storage().has_btree_index(&ds, "src").await?
482 || !db.storage().has_btree_index(&ds, "dst").await?)
483}
484
485pub(super) async fn open_for_mutation(
486 db: &Omnigraph,
487 table_key: &str,
488 op_kind: crate::db::MutationOpKind,
489) -> Result<(SnapshotHandle, String, Option<String>)> {
490 let current_branch = db
491 .coordinator
492 .read()
493 .await
494 .current_branch()
495 .map(str::to_string);
496 open_for_mutation_on_branch(db, current_branch.as_deref(), table_key, op_kind).await
497}
498
499pub(super) async fn open_for_mutation_on_branch(
506 db: &Omnigraph,
507 branch: Option<&str>,
508 table_key: &str,
509 op_kind: crate::db::MutationOpKind,
510) -> Result<(SnapshotHandle, String, Option<String>)> {
511 db.ensure_schema_apply_not_locked("write").await?;
512 let resolved = db.resolved_branch_target(branch).await?;
513 let entry = resolved
514 .snapshot
515 .entry(table_key)
516 .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
517 let full_path = format!("{}/{}", db.root_uri, entry.table_path);
518 match resolved.branch.as_deref() {
519 None => {
520 let ds = db
521 .storage()
522 .open_dataset_head_for_write(table_key, &full_path, None)
523 .await?;
524 if op_kind.strict_pre_stage_version_check() {
525 db.storage()
526 .ensure_expected_version(&ds, table_key, entry.table_version)?;
527 }
528 Ok((ds, full_path, None))
529 }
530 Some(active_branch) => {
531 let (ds, table_branch) = open_owned_dataset_for_branch_write(
532 db,
533 table_key,
534 &full_path,
535 entry.table_branch.as_deref(),
536 entry.table_version,
537 active_branch,
538 op_kind,
539 )
540 .await?;
541 Ok((ds, full_path, table_branch))
542 }
543 }
544}
545
546pub(super) async fn open_owned_dataset_for_branch_write(
547 db: &Omnigraph,
548 table_key: &str,
549 full_path: &str,
550 entry_branch: Option<&str>,
551 entry_version: u64,
552 active_branch: &str,
553 op_kind: crate::db::MutationOpKind,
554) -> Result<(SnapshotHandle, Option<String>)> {
555 match entry_branch {
556 Some(branch) if branch == active_branch => {
557 let ds = db
558 .storage()
559 .open_dataset_head_for_write(table_key, full_path, Some(active_branch))
560 .await?;
561 if op_kind.strict_pre_stage_version_check() {
562 db.storage()
563 .ensure_expected_version(&ds, table_key, entry_version)?;
564 }
565 Ok((ds, Some(active_branch.to_string())))
566 }
567 source_branch => {
568 crate::failpoints::maybe_fail("fork.before_classify")?;
569 let live = db.snapshot_for_branch(Some(active_branch)).await?;
575 if let Some(entry) = live.entry(table_key) {
576 if entry.table_branch.as_deref() == Some(active_branch) {
577 return Err(OmniError::manifest_expected_version_mismatch(
578 table_key,
579 entry_version,
580 entry.table_version,
581 ));
582 }
583 }
584 db.fork_dataset_from_entry_state(
592 table_key,
593 full_path,
594 source_branch,
595 entry_version,
596 active_branch,
597 )
598 .await?;
599 let ds = db
600 .storage()
601 .open_dataset_head_for_write(table_key, full_path, Some(active_branch))
602 .await?;
603 if op_kind.strict_pre_stage_version_check() {
604 db.storage()
605 .ensure_expected_version(&ds, table_key, entry_version)?;
606 }
607 Ok((ds, Some(active_branch.to_string())))
608 }
609 }
610}
611
612pub(super) async fn fork_dataset_from_entry_state(
613 db: &Omnigraph,
614 table_key: &str,
615 full_path: &str,
616 source_branch: Option<&str>,
617 source_version: u64,
618 active_branch: &str,
619) -> Result<crate::storage_layer::ForkOutcome<SnapshotHandle>> {
620 db.storage()
621 .fork_branch_from_state(
622 full_path,
623 source_branch,
624 table_key,
625 source_version,
626 active_branch,
627 )
628 .await
629}
630
631#[derive(Debug, Clone, Copy, PartialEq, Eq)]
638pub(crate) enum ForkRefStatus {
639 Legitimate,
641 Orphan,
644 Indeterminate,
647}
648
649pub(crate) async fn classify_fork_ref(
656 db: &Omnigraph,
657 table_key: &str,
658 branch: &str,
659) -> ForkRefStatus {
660 let fresh = match crate::failpoints::maybe_fail("classify.fresh_read") {
665 Ok(()) => db.fresh_snapshot_for_branch(Some(branch)).await,
666 Err(injected) => Err(injected),
667 };
668 match fresh {
669 Ok(snap) => {
670 let placed = snap
671 .entry(table_key)
672 .map(|e| e.table_branch.as_deref() == Some(branch))
673 .unwrap_or(false);
674 if placed {
675 ForkRefStatus::Legitimate
676 } else {
677 ForkRefStatus::Orphan
680 }
681 }
682 Err(_) => match db.coordinator.read().await.all_branches().await {
686 Ok(fresh) if !fresh.iter().any(|b| b == branch) => ForkRefStatus::Orphan,
687 _ => ForkRefStatus::Indeterminate,
688 },
689 }
690}
691
692pub(super) async fn reclaim_orphaned_fork_and_refork(
712 db: &Omnigraph,
713 table_key: &str,
714 full_path: &str,
715 source_branch: Option<&str>,
716 source_version: u64,
717 active_branch: &str,
718) -> Result<SnapshotHandle> {
719 match classify_fork_ref(db, table_key, active_branch).await {
725 ForkRefStatus::Orphan => {}
726 ForkRefStatus::Legitimate => {
727 let actual = db
728 .fresh_snapshot_for_branch(Some(active_branch))
729 .await
730 .ok()
731 .and_then(|s| s.entry(table_key).map(|e| e.table_version))
732 .unwrap_or(source_version);
733 return Err(OmniError::manifest_expected_version_mismatch(
734 table_key,
735 source_version,
736 actual,
737 ));
738 }
739 ForkRefStatus::Indeterminate => {
740 return Err(OmniError::manifest_conflict(format!(
741 "could not verify whether branch '{active_branch}' still owns an orphaned \
742 fork for table '{table_key}' because fresh manifest authority was \
743 unavailable; refresh and retry"
744 )));
745 }
746 }
747
748 crate::failpoints::maybe_fail("fork.before_reclaim")?;
749 db.storage()
750 .force_delete_branch(full_path, active_branch)
751 .await
752 .map_err(|e| {
753 if e.to_string().contains("referenc") {
761 OmniError::manifest_conflict(format!(
762 "branch '{active_branch}' cannot reclaim the leftover fork for \
763 table '{table_key}' because it has dependent child branches; \
764 delete the child branches (or run `omnigraph cleanup`) first"
765 ))
766 } else {
767 e
768 }
769 })?;
770
771 match fork_dataset_from_entry_state(
772 db,
773 table_key,
774 full_path,
775 source_branch,
776 source_version,
777 active_branch,
778 )
779 .await?
780 {
781 crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
782 crate::storage_layer::ForkOutcome::RefAlreadyExists => {
783 let live = db.fresh_snapshot_for_branch(Some(active_branch)).await?;
784 let actual = live
785 .entry(table_key)
786 .map(|e| e.table_version)
787 .unwrap_or(source_version);
788 Err(OmniError::manifest_expected_version_mismatch(
789 table_key,
790 source_version,
791 actual,
792 ))
793 }
794 }
795}
796
797pub(super) async fn reopen_for_mutation(
798 db: &Omnigraph,
799 table_key: &str,
800 full_path: &str,
801 table_branch: Option<&str>,
802 expected_version: u64,
803 op_kind: crate::db::MutationOpKind,
804) -> Result<SnapshotHandle> {
805 db.ensure_schema_apply_not_locked("write").await?;
806 if op_kind.strict_pre_stage_version_check() {
807 db.storage()
808 .reopen_for_mutation(full_path, table_branch, table_key, expected_version)
809 .await
810 } else {
811 let _ = expected_version;
820 db.storage()
821 .open_dataset_head_for_write(table_key, full_path, table_branch)
822 .await
823 }
824}
825
826pub(super) async fn open_dataset_at_state(
827 db: &Omnigraph,
828 table_path: &str,
829 table_branch: Option<&str>,
830 table_version: u64,
831) -> Result<SnapshotHandle> {
832 db.storage()
833 .open_dataset_at_state(table_path, table_branch, table_version)
834 .await
835}
836
837#[derive(Debug, Clone)]
845pub struct PendingIndex {
846 pub table_key: String,
847 pub column: String,
848 pub reason: String,
849}
850
851pub(super) async fn build_indices_on_dataset(
852 db: &Omnigraph,
853 table_key: &str,
854 ds: &mut SnapshotHandle,
855) -> Result<Vec<PendingIndex>> {
856 let catalog = db.catalog();
857 build_indices_on_dataset_for_catalog(db, &catalog, table_key, ds).await
858}
859
860pub(super) async fn build_indices_on_dataset_for_catalog(
861 db: &Omnigraph,
862 catalog: &Catalog,
863 table_key: &str,
864 ds: &mut SnapshotHandle,
865) -> Result<Vec<PendingIndex>> {
866 if let Some(type_name) = table_key.strip_prefix("node:") {
867 let mut pending = Vec::new();
868 if !db.storage().has_btree_index(ds, "id").await? {
869 stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
870 }
871
872 if let Some(node_type) = catalog.node_types.get(type_name) {
873 for index_cols in &node_type.indices {
882 if index_cols.len() != 1 {
883 continue;
884 }
885 let prop_name = &index_cols[0];
886 if let Some(prop_type) = node_type.properties.get(prop_name) {
887 match node_prop_index_kind(prop_type) {
888 Some(NodePropIndexKind::Fts) => {
889 if !db.storage().has_fts_index(ds, prop_name).await? {
890 stage_and_commit_inverted(db, table_key, ds, prop_name.as_str())
891 .await?;
892 }
893 }
894 Some(NodePropIndexKind::Vector) => {
895 if !db.storage().has_vector_index(ds, prop_name).await? {
896 if vector_column_trainable(db, ds, prop_name).await? {
915 let new_snap = db
916 .storage_inline_residual()
917 .create_vector_index(ds.clone(), prop_name.as_str())
918 .await
919 .map_err(|e| {
920 OmniError::Lance(format!(
921 "create Vector index on {}({}): {}",
922 table_key, prop_name, e
923 ))
924 })?;
925 *ds = new_snap;
926 } else {
927 tracing::info!(
928 target: "omnigraph::index",
929 table = %table_key,
930 column = %prop_name,
931 "deferring Vector index: column has no \
932 trainable vectors yet",
933 );
934 pending.push(PendingIndex {
935 table_key: table_key.to_string(),
936 column: prop_name.clone(),
937 reason: "column has no non-null vectors to \
938 train on yet"
939 .to_string(),
940 });
941 }
942 }
943 }
944 Some(NodePropIndexKind::Btree) => {
948 if !db.storage().has_btree_index(ds, prop_name).await? {
949 stage_and_commit_btree(db, table_key, ds, &[prop_name.as_str()])
950 .await?;
951 }
952 }
953 None => {}
955 }
956 }
957 }
958 }
959 return Ok(pending);
960 }
961
962 if table_key.starts_with("edge:") {
963 if !db.storage().has_btree_index(ds, "id").await? {
964 stage_and_commit_btree(db, table_key, ds, &["id"]).await?;
965 }
966 if !db.storage().has_btree_index(ds, "src").await? {
967 stage_and_commit_btree(db, table_key, ds, &["src"]).await?;
968 }
969 if !db.storage().has_btree_index(ds, "dst").await? {
970 stage_and_commit_btree(db, table_key, ds, &["dst"]).await?;
971 }
972 return Ok(Vec::new());
975 }
976
977 Err(OmniError::manifest(format!(
978 "invalid table key '{}'",
979 table_key
980 )))
981}
982
983async fn stage_and_commit_btree(
992 db: &Omnigraph,
993 table_key: &str,
994 ds: &mut SnapshotHandle,
995 columns: &[&str],
996) -> Result<()> {
997 let staged = db
998 .storage()
999 .stage_create_btree_index(ds, columns)
1000 .await
1001 .map_err(|e| {
1002 OmniError::Lance(format!(
1003 "stage_create_btree_index on {}({:?}): {}",
1004 table_key, columns, e
1005 ))
1006 })?;
1007 crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?;
1012 let new_ds = db
1013 .storage()
1014 .commit_staged(ds.clone(), staged)
1015 .await
1016 .map_err(|e| {
1017 OmniError::Lance(format!(
1018 "commit BTree index on {}({:?}): {}",
1019 table_key, columns, e
1020 ))
1021 })?;
1022 *ds = new_ds;
1023 Ok(())
1024}
1025
1026async fn stage_and_commit_inverted(
1029 db: &Omnigraph,
1030 table_key: &str,
1031 ds: &mut SnapshotHandle,
1032 column: &str,
1033) -> Result<()> {
1034 let staged = db
1035 .storage()
1036 .stage_create_inverted_index(ds, column)
1037 .await
1038 .map_err(|e| {
1039 OmniError::Lance(format!(
1040 "stage_create_inverted_index on {}({}): {}",
1041 table_key, column, e
1042 ))
1043 })?;
1044 let new_ds = db
1045 .storage()
1046 .commit_staged(ds.clone(), staged)
1047 .await
1048 .map_err(|e| {
1049 OmniError::Lance(format!(
1050 "commit Inverted index on {}({}): {}",
1051 table_key, column, e
1052 ))
1053 })?;
1054 *ds = new_ds;
1055 Ok(())
1056}
1057
1058async fn prepare_updates_for_commit(
1059 db: &Omnigraph,
1060 branch: Option<&str>,
1061 updates: &[crate::db::SubTableUpdate],
1062) -> Result<Vec<crate::db::SubTableUpdate>> {
1063 if updates.is_empty() {
1064 return Ok(Vec::new());
1065 }
1066
1067 let snapshot = db.snapshot_for_branch(branch).await?;
1068 let mut prepared = Vec::with_capacity(updates.len());
1069
1070 for update in updates {
1071 let Some(entry) = snapshot.entry(&update.table_key) else {
1072 return Err(OmniError::manifest(format!(
1073 "no manifest entry for {}",
1074 update.table_key
1075 )));
1076 };
1077
1078 let mut prepared_update = update.clone();
1079 if prepared_update.row_count > 0 {
1080 let full_path = format!("{}/{}", db.root_uri, entry.table_path);
1081 let mut ds = reopen_for_mutation(
1088 db,
1089 &prepared_update.table_key,
1090 &full_path,
1091 prepared_update.table_branch.as_deref(),
1092 prepared_update.table_version,
1093 crate::db::MutationOpKind::SchemaRewrite,
1094 )
1095 .await?;
1096 let _pending = build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?;
1101 let state = db.storage().table_state(&full_path, &ds).await?;
1102 prepared_update.table_version = state.version;
1103 prepared_update.row_count = state.row_count;
1104 prepared_update.version_metadata = state.version_metadata;
1105 }
1106
1107 prepared.push(prepared_update);
1108 }
1109
1110 Ok(prepared)
1111}
1112
1113async fn commit_prepared_updates(
1114 db: &Omnigraph,
1115 updates: &[crate::db::SubTableUpdate],
1116 actor_id: Option<&str>,
1117) -> Result<u64> {
1118 let PublishedSnapshot {
1119 manifest_version,
1120 _snapshot_id: _,
1121 } = db
1122 .coordinator
1123 .write()
1124 .await
1125 .commit_updates_with_actor(updates, actor_id)
1126 .await?;
1127 Ok(manifest_version)
1128}
1129
1130async fn commit_prepared_updates_with_expected(
1131 db: &Omnigraph,
1132 updates: &[crate::db::SubTableUpdate],
1133 expected_table_versions: &std::collections::HashMap<String, u64>,
1134 actor_id: Option<&str>,
1135) -> Result<u64> {
1136 let PublishedSnapshot {
1137 manifest_version,
1138 _snapshot_id: _,
1139 } = db
1140 .coordinator
1141 .write()
1142 .await
1143 .commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id)
1144 .await?;
1145 Ok(manifest_version)
1146}
1147
1148pub(super) async fn commit_prepared_updates_on_branch(
1149 db: &Omnigraph,
1150 branch: Option<&str>,
1151 updates: &[crate::db::SubTableUpdate],
1152 actor_id: Option<&str>,
1153) -> Result<u64> {
1154 let current_branch = db
1155 .coordinator
1156 .read()
1157 .await
1158 .current_branch()
1159 .map(str::to_string);
1160 let requested_branch = branch.map(str::to_string);
1161 if requested_branch == current_branch {
1162 return commit_prepared_updates(db, updates, actor_id).await;
1163 }
1164
1165 let mut coordinator = match requested_branch.as_deref() {
1166 Some(branch) => {
1167 GraphCoordinator::open_branch(db.uri(), branch, Arc::clone(&db.storage)).await?
1168 }
1169 None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?,
1170 };
1171 let PublishedSnapshot {
1172 manifest_version,
1173 _snapshot_id: _,
1174 } = coordinator
1175 .commit_updates_with_actor(updates, actor_id)
1176 .await?;
1177 Ok(manifest_version)
1178}
1179
1180pub(super) async fn commit_prepared_updates_on_branch_with_expected(
1181 db: &Omnigraph,
1182 branch: Option<&str>,
1183 updates: &[crate::db::SubTableUpdate],
1184 expected_table_versions: &std::collections::HashMap<String, u64>,
1185 actor_id: Option<&str>,
1186) -> Result<u64> {
1187 let current_branch = db
1188 .coordinator
1189 .read()
1190 .await
1191 .current_branch()
1192 .map(str::to_string);
1193 let requested_branch = branch.map(str::to_string);
1194 if requested_branch == current_branch {
1195 return commit_prepared_updates_with_expected(
1196 db,
1197 updates,
1198 expected_table_versions,
1199 actor_id,
1200 )
1201 .await;
1202 }
1203
1204 let mut coordinator = match requested_branch.as_deref() {
1205 Some(branch) => {
1206 GraphCoordinator::open_branch(db.uri(), branch, Arc::clone(&db.storage)).await?
1207 }
1208 None => GraphCoordinator::open(db.uri(), Arc::clone(&db.storage)).await?,
1209 };
1210 let PublishedSnapshot {
1211 manifest_version,
1212 _snapshot_id: _,
1213 } = coordinator
1214 .commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id)
1215 .await?;
1216 Ok(manifest_version)
1217}
1218
1219#[cfg(test)]
1222pub(super) async fn commit_updates(
1223 db: &mut Omnigraph,
1224 updates: &[crate::db::SubTableUpdate],
1225) -> Result<u64> {
1226 db.ensure_schema_apply_not_locked("write commit").await?;
1227 let current_branch = db
1228 .coordinator
1229 .read()
1230 .await
1231 .current_branch()
1232 .map(str::to_string);
1233 let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?;
1234 commit_prepared_updates(db, &prepared, None).await
1235}
1236
1237pub(super) async fn commit_manifest_updates(
1238 db: &Omnigraph,
1239 updates: &[crate::db::SubTableUpdate],
1240) -> Result<u64> {
1241 db.coordinator
1242 .write()
1243 .await
1244 .commit_manifest_updates(updates)
1245 .await
1246}
1247
1248pub(super) async fn record_merge_commit(
1249 db: &Omnigraph,
1250 manifest_version: u64,
1251 parent_commit_id: &str,
1252 merged_parent_commit_id: &str,
1253 actor_id: Option<&str>,
1254) -> Result<String> {
1255 db.coordinator
1256 .write()
1257 .await
1258 .record_merge_commit(
1259 manifest_version,
1260 parent_commit_id,
1261 merged_parent_commit_id,
1262 actor_id,
1263 )
1264 .await
1265 .map(|snapshot_id| snapshot_id.as_str().to_string())
1266}
1267
1268pub(super) async fn commit_updates_on_branch_with_expected(
1272 db: &Omnigraph,
1273 branch: Option<&str>,
1274 updates: &[crate::db::SubTableUpdate],
1275 expected_table_versions: &std::collections::HashMap<String, u64>,
1276 actor_id: Option<&str>,
1277) -> Result<u64> {
1278 db.ensure_schema_apply_not_locked("write commit").await?;
1279 let prepared = prepare_updates_for_commit(db, branch, updates).await?;
1280 commit_prepared_updates_on_branch_with_expected(
1281 db,
1282 branch,
1283 &prepared,
1284 expected_table_versions,
1285 actor_id,
1286 )
1287 .await
1288}
1289
1290pub(super) async fn ensure_commit_graph_initialized(db: &Omnigraph) -> Result<()> {
1291 db.coordinator
1292 .write()
1293 .await
1294 .ensure_commit_graph_initialized()
1295 .await
1296}
1297
1298pub(super) async fn invalidate_graph_index(db: &Omnigraph) {
1299 db.runtime_cache.invalidate_all().await;
1300}
1301
1302#[cfg(test)]
1303mod classify_fork_ref_tests {
1304 use super::*;
1311 use crate::db::Omnigraph;
1312 use crate::loader::LoadMode;
1313
1314 const SCHEMA: &str = "node Person { name: String @key }\nnode Company { name: String @key }\n";
1315
1316 async fn node_path(db: &Omnigraph, branch: &str, table_key: &str) -> String {
1319 let snap = db.snapshot_for_branch(Some(branch)).await.unwrap();
1320 let entry = snap.entry(table_key).unwrap();
1321 format!("{}/{}", db.root_uri, entry.table_path)
1322 }
1323
1324 #[tokio::test]
1325 async fn classify_distinguishes_legitimate_unreferenced_and_ghost() {
1326 let dir = tempfile::tempdir().unwrap();
1327 let db = Omnigraph::init(dir.path().to_str().unwrap(), SCHEMA)
1328 .await
1329 .unwrap();
1330 db.branch_create("feature").await.unwrap();
1331
1332 db.load_as(
1335 "feature",
1336 None,
1337 r#"{"type":"Company","data":{"name":"Acme"}}"#,
1338 LoadMode::Merge,
1339 None,
1340 )
1341 .await
1342 .unwrap();
1343 assert_eq!(
1344 classify_fork_ref(&db, "node:Company", "feature").await,
1345 ForkRefStatus::Legitimate,
1346 "a manifest-placed fork must classify as Legitimate (never destroyed)"
1347 );
1348
1349 let person = node_path(&db, "feature", "node:Person").await;
1352 {
1353 let mut ds = lance::Dataset::open(&person).await.unwrap();
1354 let v = ds.version().version;
1355 ds.create_branch("feature", v, None).await.unwrap();
1356 }
1357 assert_eq!(
1358 classify_fork_ref(&db, "node:Person", "feature").await,
1359 ForkRefStatus::Orphan,
1360 "a ref the manifest does not place on the branch must classify as Orphan"
1361 );
1362
1363 {
1365 let mut ds = lance::Dataset::open(&person).await.unwrap();
1366 let v = ds.version().version;
1367 ds.create_branch("ghost", v, None).await.unwrap();
1368 }
1369 assert_eq!(
1370 classify_fork_ref(&db, "node:Person", "ghost").await,
1371 ForkRefStatus::Orphan,
1372 "a ref for a branch absent from the manifest must classify as Orphan"
1373 );
1374 }
1375}