1use super::*;
2
3#[derive(Debug, Clone, Default)]
12pub struct SchemaApplyOptions {
13 pub allow_data_loss: bool,
18}
19
20fn promote_drops_to_hard(plan: &mut SchemaMigrationPlan, allow_data_loss: bool) {
23 if !allow_data_loss {
24 return;
25 }
26 for step in &mut plan.steps {
27 match step {
28 SchemaMigrationStep::DropType { mode, .. }
29 | SchemaMigrationStep::DropProperty { mode, .. } => {
30 *mode = DropMode::Hard;
31 }
32 _ => {}
33 }
34 }
35}
36
37pub(super) async fn plan_schema(
38 db: &Omnigraph,
39 desired_schema_source: &str,
40 options: SchemaApplyOptions,
41) -> Result<SchemaMigrationPlan> {
42 db.ensure_schema_state_valid().await?;
43 let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
44 let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
45 let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
46 .map_err(|err| OmniError::manifest(err.to_string()))?;
47 promote_drops_to_hard(&mut plan, options.allow_data_loss);
48 Ok(plan)
49}
50
51struct PlannedSchemaApply {
52 plan: SchemaMigrationPlan,
53 desired_ir: SchemaIR,
54 desired_catalog: Catalog,
55}
56
57async fn plan_schema_for_apply(
58 db: &Omnigraph,
59 desired_schema_source: &str,
60 options: SchemaApplyOptions,
61) -> Result<PlannedSchemaApply> {
62 db.ensure_schema_state_valid().await?;
63 let branches = db.coordinator.read().await.all_branches().await?;
64 let blocking_branches = branches
70 .into_iter()
71 .filter(|branch| branch != "main" && !is_internal_system_branch(branch))
72 .collect::<Vec<_>>();
73 if !blocking_branches.is_empty() {
74 return Err(OmniError::manifest_conflict(format!(
75 "schema apply requires a graph with only main; found non-main branches: {}",
76 blocking_branches.join(", ")
77 )));
78 }
79
80 let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
81 let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
82 let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
83 .map_err(|err| OmniError::manifest(err.to_string()))?;
84 promote_drops_to_hard(&mut plan, options.allow_data_loss);
85 if !plan.supported {
86 let message = plan
87 .steps
88 .iter()
89 .find_map(|step| step.unsupported_error_message())
90 .unwrap_or_else(|| "unsupported schema migration plan".to_string());
91 return Err(OmniError::manifest(message));
92 }
93
94 let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
95 fixup_blob_schemas(&mut desired_catalog);
96 Ok(PlannedSchemaApply {
97 plan,
98 desired_ir,
99 desired_catalog,
100 })
101}
102
103pub(super) async fn preview_schema_apply(
104 db: &Omnigraph,
105 desired_schema_source: &str,
106 options: SchemaApplyOptions,
107) -> Result<SchemaApplyPreview> {
108 let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
109 Ok(SchemaApplyPreview {
110 plan: planned.plan,
111 catalog: planned.desired_catalog,
112 })
113}
114
115pub(super) async fn apply_schema<F>(
116 db: &Omnigraph,
117 desired_schema_source: &str,
118 options: SchemaApplyOptions,
119 actor: Option<&str>,
120 validate_catalog: F,
121) -> Result<SchemaApplyResult>
122where
123 F: FnOnce(&Catalog) -> Result<()>,
124{
125 db.enforce(
142 omnigraph_policy::PolicyAction::SchemaApply,
143 &omnigraph_policy::ResourceScope::TargetBranch("main".to_string()),
144 actor,
145 )?;
146
147 acquire_schema_apply_lock(db).await?;
148 let result = apply_schema_with_lock(db, desired_schema_source, options, validate_catalog).await;
149 let release_result = release_schema_apply_lock(db).await;
150 match (result, release_result) {
151 (Ok(result), Ok(())) => Ok(result),
152 (Ok(_), Err(err)) => Err(err),
153 (Err(err), Ok(())) => Err(err),
154 (Err(err), Err(_)) => Err(err),
155 }
156}
157
158pub(super) async fn apply_schema_with_lock<F>(
159 db: &Omnigraph,
160 desired_schema_source: &str,
161 options: SchemaApplyOptions,
162 validate_catalog: F,
163) -> Result<SchemaApplyResult>
164where
165 F: FnOnce(&Catalog) -> Result<()>,
166{
167 let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
168 validate_catalog(&planned.desired_catalog)?;
169 let PlannedSchemaApply {
170 plan,
171 desired_ir,
172 desired_catalog,
173 } = planned;
174 if plan.steps.is_empty() {
175 return Ok(SchemaApplyResult {
176 supported: true,
177 applied: false,
178 manifest_version: db.version().await,
179 steps: plan.steps,
180 });
181 }
182
183 let snapshot = db.snapshot().await;
184 let base_manifest_version = snapshot.version();
185 let mut added_tables = BTreeSet::new();
186 let mut renamed_tables = HashMap::new();
187 let mut rewritten_tables = BTreeSet::new();
188 let mut indexed_tables = BTreeSet::new();
189 let mut dropped_tables = BTreeSet::new();
190 let mut hard_cleanup_targets: Vec<(String, String)> = Vec::new();
196 let mut property_renames = HashMap::<String, HashMap<String, String>>::new();
197 let mut changed_edge_tables = false;
198
199 for step in &plan.steps {
200 match step {
201 SchemaMigrationStep::AddType { type_kind, name } => {
202 let table_key = schema_table_key(*type_kind, name);
203 if table_key.starts_with("edge:") {
204 changed_edge_tables = true;
205 }
206 added_tables.insert(table_key);
207 }
208 SchemaMigrationStep::RenameType {
209 type_kind,
210 from,
211 to,
212 } => {
213 let source_key = schema_table_key(*type_kind, from);
214 let target_key = schema_table_key(*type_kind, to);
215 if source_key.starts_with("edge:") {
216 changed_edge_tables = true;
217 }
218 renamed_tables.insert(target_key, source_key);
219 }
220 SchemaMigrationStep::AddProperty {
221 type_kind,
222 type_name,
223 ..
224 } => {
225 let table_key = schema_table_key(*type_kind, type_name);
226 if table_key.starts_with("edge:") {
227 changed_edge_tables = true;
228 }
229 rewritten_tables.insert(table_key);
230 }
231 SchemaMigrationStep::RenameProperty {
232 type_kind,
233 type_name,
234 from,
235 to,
236 } => {
237 let table_key = schema_table_key(*type_kind, type_name);
238 if table_key.starts_with("edge:") {
239 changed_edge_tables = true;
240 }
241 rewritten_tables.insert(table_key.clone());
242 property_renames
243 .entry(table_key)
244 .or_default()
245 .insert(to.clone(), from.clone());
246 }
247 SchemaMigrationStep::AddConstraint {
248 type_kind,
249 type_name,
250 ..
251 } => {
252 indexed_tables.insert(schema_table_key(*type_kind, type_name));
253 }
254 SchemaMigrationStep::UpdateTypeMetadata { .. }
255 | SchemaMigrationStep::UpdatePropertyMetadata { .. } => {}
256 SchemaMigrationStep::DropProperty {
257 type_kind,
258 type_name,
259 mode,
260 ..
261 } => {
262 let table_key = schema_table_key(*type_kind, type_name);
278 if table_key.starts_with("edge:") {
279 changed_edge_tables = true;
280 }
281 if matches!(mode, DropMode::Hard) {
282 let entry = snapshot.entry(&table_key).ok_or_else(|| {
283 OmniError::manifest(format!(
284 "missing table '{}' for hard property drop",
285 table_key
286 ))
287 })?;
288 let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
289 hard_cleanup_targets.push((table_key.clone(), full_uri));
290 }
291 rewritten_tables.insert(table_key);
292 }
293 SchemaMigrationStep::DropType {
294 type_kind,
295 name,
296 mode,
297 } => {
298 let table_key = schema_table_key(*type_kind, name);
314 if table_key.starts_with("edge:") {
315 changed_edge_tables = true;
316 }
317 if matches!(mode, DropMode::Hard) {
318 let entry = snapshot.entry(&table_key).ok_or_else(|| {
319 OmniError::manifest(format!(
320 "missing table '{}' for hard type drop",
321 table_key
322 ))
323 })?;
324 let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
325 hard_cleanup_targets.push((table_key.clone(), full_uri));
326 }
327 dropped_tables.insert(table_key);
328 }
329 step @ SchemaMigrationStep::UnsupportedChange { .. } => {
330 return Err(OmniError::manifest(
331 step.unsupported_error_message()
332 .unwrap_or_else(|| "unsupported schema migration step".to_string()),
333 ));
334 }
335 }
336 }
337
338 let mut table_registrations = HashMap::<String, String>::new();
339 let mut table_updates = HashMap::<String, crate::db::SubTableUpdate>::new();
340 let mut table_tombstones = HashMap::<String, u64>::new();
341
342 let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = rewritten_tables
348 .iter()
349 .chain(indexed_tables.iter().filter(|t| {
350 !rewritten_tables.contains(*t)
351 && !added_tables.contains(*t)
352 && !renamed_tables.contains_key(*t)
353 }))
354 .filter_map(|table_key| {
355 let entry = snapshot.entry(table_key)?;
356 Some(crate::db::manifest::SidecarTablePin {
357 table_key: table_key.clone(),
358 table_path: db.table_store.dataset_uri(&entry.table_path),
359 expected_version: entry.table_version,
360 post_commit_pin: entry.table_version + 1,
361 table_branch: entry.table_branch.clone(),
362 })
363 })
364 .collect();
365 let mut sidecar_registrations: Vec<crate::db::manifest::SidecarTableRegistration> = Vec::new();
372 for table_key in &added_tables {
373 sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
374 table_key: table_key.clone(),
375 table_path: table_path_for_table_key(table_key)?,
376 table_branch: None,
377 });
378 }
379 for target_table_key in renamed_tables.keys() {
380 sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
381 table_key: target_table_key.clone(),
382 table_path: table_path_for_table_key(target_table_key)?,
383 table_branch: None,
384 });
385 }
386 let mut sidecar_tombstones: Vec<crate::db::manifest::SidecarTombstone> = Vec::new();
387 for source_table_key in renamed_tables.values() {
388 let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
389 OmniError::manifest(format!(
390 "missing source table '{}' for schema rename when building recovery sidecar",
391 source_table_key
392 ))
393 })?;
394 sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
395 table_key: source_table_key.clone(),
396 tombstone_version: source_entry.table_version.saturating_add(1),
397 });
398 }
399 for dropped_table_key in &dropped_tables {
406 let entry = snapshot.entry(dropped_table_key).ok_or_else(|| {
407 OmniError::manifest(format!(
408 "missing table '{}' for soft drop when building recovery sidecar",
409 dropped_table_key
410 ))
411 })?;
412 let tombstone_version = entry.table_version.saturating_add(1);
413 sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
414 table_key: dropped_table_key.clone(),
415 tombstone_version,
416 });
417 table_tombstones.insert(dropped_table_key.clone(), tombstone_version);
418 }
419
420 let schema_apply_queue_keys: Vec<(String, Option<String>)> = recovery_pins
432 .iter()
433 .map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
434 .collect();
435 let _schema_apply_queue_guards = db
436 .write_queue()
437 .acquire_many(&schema_apply_queue_keys)
438 .await;
439
440 let recovery_handle = if recovery_pins.is_empty()
441 && sidecar_registrations.is_empty()
442 && sidecar_tombstones.is_empty()
443 {
444 None
445 } else {
446 let mut sidecar = crate::db::manifest::new_sidecar(
454 crate::db::manifest::SidecarKind::SchemaApply,
455 None,
456 None,
460 recovery_pins,
461 );
462 sidecar.additional_registrations = sidecar_registrations;
463 sidecar.tombstones = sidecar_tombstones;
464 Some(
465 crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
466 .await?,
467 )
468 };
469
470 for table_key in &added_tables {
471 let table_path = table_path_for_table_key(table_key)?;
472 let dataset_uri = db.table_store.dataset_uri(&table_path);
473 let schema = schema_for_table_key(&desired_catalog, table_key)?;
474 let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?;
475 db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
476 .await?;
477 let state = db.table_store.table_state(&dataset_uri, &ds).await?;
478 table_registrations.insert(table_key.clone(), table_path);
479 table_updates.insert(
480 table_key.clone(),
481 crate::db::SubTableUpdate {
482 table_key: table_key.clone(),
483 table_version: state.version,
484 table_branch: None,
485 row_count: state.row_count,
486 version_metadata: state.version_metadata,
487 },
488 );
489 }
490
491 for (target_table_key, source_table_key) in &renamed_tables {
492 let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
493 OmniError::manifest(format!(
494 "missing source table '{}' for schema rename",
495 source_table_key
496 ))
497 })?;
498 ensure_snapshot_entry_head_matches(db, source_entry).await?;
499 let source_ds = snapshot.open(source_table_key).await?;
500 let current_catalog = db.catalog();
501 let batch = batch_for_schema_apply_rewrite(
502 db,
503 &source_ds,
504 source_table_key,
505 ¤t_catalog,
506 target_table_key,
507 &desired_catalog,
508 property_renames.get(target_table_key),
509 )
510 .await?;
511 let table_path = table_path_for_table_key(target_table_key)?;
512 let dataset_uri = db.table_store.dataset_uri(&table_path);
513 let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?;
514 db.build_indices_on_dataset_for_catalog(&desired_catalog, target_table_key, &mut target_ds)
515 .await?;
516 let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
517 table_registrations.insert(target_table_key.clone(), table_path);
518 table_updates.insert(
519 target_table_key.clone(),
520 crate::db::SubTableUpdate {
521 table_key: target_table_key.clone(),
522 table_version: state.version,
523 table_branch: None,
524 row_count: state.row_count,
525 version_metadata: state.version_metadata,
526 },
527 );
528 table_tombstones.insert(
529 source_table_key.clone(),
530 source_entry.table_version.saturating_add(1),
531 );
532 }
533
534 for table_key in &rewritten_tables {
535 if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) {
536 continue;
537 }
538 let entry = snapshot.entry(table_key).ok_or_else(|| {
539 OmniError::manifest(format!(
540 "missing source table '{}' for schema apply",
541 table_key
542 ))
543 })?;
544 ensure_snapshot_entry_head_matches(db, entry).await?;
545 let source_ds = snapshot.open(table_key).await?;
546 let current_catalog = db.catalog();
547 let batch = batch_for_schema_apply_rewrite(
548 db,
549 &source_ds,
550 table_key,
551 ¤t_catalog,
552 table_key,
553 &desired_catalog,
554 property_renames.get(table_key),
555 )
556 .await?;
557 let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
558 let mut target_ds = if batch.num_rows() == 0 {
567 TableStore::overwrite_dataset(&dataset_uri, batch).await?
568 } else {
569 let existing = db
577 .table_store
578 .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
579 .await?;
580 let staged = db.table_store.stage_overwrite(&existing, batch).await?;
581 db.table_store
582 .commit_staged(Arc::new(existing), staged.transaction)
583 .await?
584 };
585 db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds)
586 .await?;
587 let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
588 table_updates.insert(
589 table_key.clone(),
590 crate::db::SubTableUpdate {
591 table_key: table_key.clone(),
592 table_version: state.version,
593 table_branch: None,
594 row_count: state.row_count,
595 version_metadata: state.version_metadata,
596 },
597 );
598 }
599
600 for table_key in &indexed_tables {
601 if added_tables.contains(table_key)
602 || renamed_tables.contains_key(table_key)
603 || rewritten_tables.contains(table_key)
604 {
605 continue;
606 }
607 let entry = snapshot.entry(table_key).ok_or_else(|| {
608 OmniError::manifest(format!(
609 "missing table '{}' for schema index apply",
610 table_key
611 ))
612 })?;
613 ensure_snapshot_entry_head_matches(db, entry).await?;
614 let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
615 let mut ds = db
616 .table_store
617 .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
618 .await?;
619 db.table_store
620 .ensure_expected_version(&ds, table_key, entry.table_version)?;
621 db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
622 .await?;
623 let state = db.table_store.table_state(&dataset_uri, &ds).await?;
624 table_updates.insert(
625 table_key.clone(),
626 crate::db::SubTableUpdate {
627 table_key: table_key.clone(),
628 table_version: state.version,
629 table_branch: None,
630 row_count: state.row_count,
631 version_metadata: state.version_metadata,
632 },
633 );
634 }
635
636 let mut manifest_changes = Vec::new();
637 for (table_key, table_path) in table_registrations {
638 manifest_changes.push(ManifestChange::RegisterTable(TableRegistration {
639 table_key,
640 table_path,
641 }));
642 }
643 for update in table_updates.into_values() {
644 manifest_changes.push(ManifestChange::Update(update));
645 }
646 for (table_key, tombstone_version) in table_tombstones {
647 manifest_changes.push(ManifestChange::Tombstone(TableTombstone {
648 table_key,
649 tombstone_version,
650 }));
651 }
652
653 db.refresh_coordinator_only().await?;
654 let current_manifest_version = db.version().await;
655 if current_manifest_version != base_manifest_version {
656 return Err(OmniError::manifest_conflict(format!(
657 "schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
658 base_manifest_version, current_manifest_version,
659 )));
660 }
661
662 crate::failpoints::maybe_fail("schema_apply.before_staging_write")?;
671
672 let staging_pg_uri = schema_source_staging_uri(&db.root_uri);
673 db.storage
674 .write_text(&staging_pg_uri, desired_schema_source)
675 .await?;
676 write_schema_contract_staging(&db.root_uri, db.storage.as_ref(), &desired_ir).await?;
677
678 crate::failpoints::maybe_fail("schema_apply.after_staging_write")?;
679
680 let PublishedSnapshot {
682 manifest_version,
683 _snapshot_id: _,
684 } = db
685 .coordinator
686 .write()
687 .await
688 .commit_changes_with_actor(&manifest_changes, None)
689 .await?;
690
691 crate::failpoints::maybe_fail("schema_apply.after_manifest_commit")?;
692
693 db.storage
694 .rename_text(&staging_pg_uri, &schema_source_uri(&db.root_uri))
695 .await?;
696 db.storage
697 .rename_text(
698 &schema_ir_staging_uri(&db.root_uri),
699 &schema_ir_uri(&db.root_uri),
700 )
701 .await?;
702 db.storage
703 .rename_text(
704 &schema_state_staging_uri(&db.root_uri),
705 &schema_state_uri(&db.root_uri),
706 )
707 .await?;
708
709 db.store_catalog(desired_catalog);
710 db.store_schema_source(desired_schema_source.to_string());
711 db.coordinator.write().await.refresh().await?;
712 db.runtime_cache.invalidate_all().await;
713 if changed_edge_tables {
714 db.invalidate_graph_index().await;
715 }
716
717 if let Some(handle) = recovery_handle {
725 if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
726 tracing::warn!(
727 error = %err,
728 operation_id = handle.operation_id.as_str(),
729 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
730 );
731 }
732 }
733
734 for (table_key, full_uri) in &hard_cleanup_targets {
741 match cleanup_dataset_old_versions(db, full_uri).await {
742 Ok(()) => {}
743 Err(err) => {
744 tracing::warn!(
745 error = %err,
746 table_key = table_key.as_str(),
747 "hard-drop cleanup_old_versions failed; rerun `omnigraph cleanup` to reclaim",
748 );
749 }
750 }
751 }
752
753 Ok(SchemaApplyResult {
754 supported: true,
755 applied: true,
756 manifest_version,
757 steps: plan.steps,
758 })
759}
760
761async fn cleanup_dataset_old_versions(db: &Omnigraph, full_uri: &str) -> Result<()> {
771 use chrono::Utc;
772 use lance::dataset::cleanup::CleanupPolicy;
773 let ds = lance::Dataset::open(full_uri)
774 .await
775 .map_err(|e| OmniError::Lance(e.to_string()))?;
776 let policy = CleanupPolicy {
777 before_timestamp: Some(Utc::now()),
778 before_version: None,
779 delete_unverified: false,
780 error_if_tagged_old_versions: false,
781 clean_referenced_branches: false,
782 delete_rate_limit: None,
783 };
784 let _removed = lance::dataset::cleanup::cleanup_old_versions(&ds, policy)
785 .await
786 .map_err(|e| OmniError::Lance(e.to_string()))?;
787 let _ = db;
788 Ok(())
789}
790
791pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) -> Result<()> {
792 db.refresh_coordinator_only().await?;
793 ensure_schema_apply_not_locked(db, operation).await
794}
795
796pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> {
797 db.ensure_schema_state_valid().await?;
798 db.refresh_coordinator_only().await?;
799 let branches = db.coordinator.read().await.all_branches().await?;
800 if branches
801 .iter()
802 .any(|branch| is_schema_apply_lock_branch(branch))
803 {
804 return Err(OmniError::manifest_conflict(
805 "schema apply is already in progress".to_string(),
806 ));
807 }
808
809 db.coordinator
810 .write()
811 .await
812 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
813 .await?;
814 db.refresh_coordinator_only().await?;
815
816 let blocking_branches = db
817 .coordinator
818 .read()
819 .await
820 .all_branches()
821 .await?
822 .into_iter()
823 .filter(|branch| branch != "main" && !is_internal_system_branch(branch))
824 .collect::<Vec<_>>();
825 if !blocking_branches.is_empty() {
826 let _ = release_schema_apply_lock(db).await;
827 return Err(OmniError::manifest_conflict(format!(
828 "schema apply requires a graph with only main; found non-main branches: {}",
829 blocking_branches.join(", ")
830 )));
831 }
832
833 Ok(())
834}
835
836pub(super) async fn release_schema_apply_lock(db: &Omnigraph) -> Result<()> {
837 db.coordinator
838 .write()
839 .await
840 .branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
841 .await?;
842 db.refresh_coordinator_only().await
848}
849
850pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> {
851 if db
852 .coordinator
853 .read()
854 .await
855 .all_branches()
856 .await?
857 .iter()
858 .any(|branch| is_schema_apply_lock_branch(branch))
859 {
860 return Err(OmniError::manifest_conflict(format!(
861 "{} is unavailable while schema apply is in progress",
862 operation
863 )));
864 }
865 Ok(())
866}
867
868pub(super) async fn ensure_snapshot_entry_head_matches(
869 db: &Omnigraph,
870 entry: &SubTableEntry,
871) -> Result<()> {
872 let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
873 let ds = db
874 .table_store
875 .open_dataset_head_for_write(
876 &entry.table_key,
877 &dataset_uri,
878 entry.table_branch.as_deref(),
879 )
880 .await?;
881 db.table_store
882 .ensure_expected_version(&ds, &entry.table_key, entry.table_version)
883}
884
885pub(super) async fn batch_for_schema_apply_rewrite(
886 db: &Omnigraph,
887 source_ds: &Dataset,
888 source_table_key: &str,
889 source_catalog: &Catalog,
890 target_table_key: &str,
891 target_catalog: &Catalog,
892 property_renames: Option<&HashMap<String, String>>,
893) -> Result<RecordBatch> {
894 let target_schema = schema_for_table_key(target_catalog, target_table_key)?;
895 let source_blob_properties = blob_properties_for_table_key(source_catalog, source_table_key)?;
896 let target_blob_properties = blob_properties_for_table_key(target_catalog, target_table_key)?;
897 let needs_row_ids = !source_blob_properties.is_empty() || !target_blob_properties.is_empty();
898 let batches = if needs_row_ids {
899 db.table_store()
900 .scan_with(source_ds, None, None, None, true, |_| Ok(()))
901 .await?
902 } else {
903 db.table_store().scan_batches(source_ds).await?
904 };
905 if batches.is_empty() {
906 return Ok(RecordBatch::new_empty(target_schema));
907 }
908 let source_schema = batches[0].schema();
909 let batch = concat_or_empty_batches(source_schema, batches)?;
910
911 let row_ids = if needs_row_ids {
912 Some(
913 batch
914 .column_by_name("_rowid")
915 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
916 .ok_or_else(|| {
917 OmniError::Lance(format!(
918 "expected _rowid column when rewriting '{}'",
919 source_table_key
920 ))
921 })?
922 .values()
923 .iter()
924 .copied()
925 .collect::<Vec<_>>(),
926 )
927 } else {
928 None
929 };
930
931 let mut columns = Vec::with_capacity(target_schema.fields().len());
932 for field in target_schema.fields() {
933 let source_name = property_renames
934 .and_then(|renames| renames.get(field.name()))
935 .map(String::as_str)
936 .unwrap_or_else(|| field.name().as_str());
937 if let Some(column) = batch.column_by_name(source_name) {
938 if target_blob_properties.contains(field.name())
939 && source_blob_properties.contains(source_name)
940 {
941 let descriptions =
942 column
943 .as_any()
944 .downcast_ref::<StructArray>()
945 .ok_or_else(|| {
946 OmniError::Lance(format!(
947 "expected blob descriptions for '{}.{}'",
948 source_table_key, source_name
949 ))
950 })?;
951 let rebuilt = rebuild_blob_column(
952 db,
953 source_ds,
954 source_name,
955 descriptions,
956 row_ids.as_deref().unwrap_or(&[]),
957 )
958 .await?;
959 columns.push(rebuilt);
960 } else {
961 columns.push(column.clone());
962 }
963 } else {
964 columns.push(new_null_array(field.data_type(), batch.num_rows()));
965 }
966 }
967
968 RecordBatch::try_new(target_schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
969}
970
971async fn rebuild_blob_column(
972 _db: &Omnigraph,
973 source_ds: &Dataset,
974 column_name: &str,
975 descriptions: &StructArray,
976 row_ids: &[u64],
977) -> Result<Arc<dyn Array>> {
978 let mut builder = BlobArrayBuilder::new(row_ids.len());
979 let mut non_null_row_ids = Vec::new();
980 let mut row_has_blob = Vec::with_capacity(row_ids.len());
981
982 for row in 0..row_ids.len() {
983 let is_null = blob_description_is_null(descriptions, row)?;
984 row_has_blob.push(!is_null);
985 if !is_null {
986 non_null_row_ids.push(row_ids[row]);
987 }
988 }
989
990 let blob_files = if non_null_row_ids.is_empty() {
991 Vec::new()
992 } else {
993 Arc::new(source_ds.clone())
994 .take_blobs(&non_null_row_ids, column_name)
995 .await
996 .map_err(|e| OmniError::Lance(e.to_string()))?
997 };
998
999 let mut files = blob_files.into_iter();
1000 for has_blob in row_has_blob {
1001 if !has_blob {
1002 builder
1003 .push_null()
1004 .map_err(|e| OmniError::Lance(e.to_string()))?;
1005 continue;
1006 }
1007
1008 let blob = files.next().ok_or_else(|| {
1009 OmniError::Lance(format!(
1010 "blob rewrite for '{}' lost alignment with source rows",
1011 column_name
1012 ))
1013 })?;
1014 if let Some(uri) = blob.uri() {
1015 builder
1016 .push_uri(uri)
1017 .map_err(|e| OmniError::Lance(e.to_string()))?;
1018 } else {
1019 builder
1020 .push_bytes(
1021 blob.read()
1022 .await
1023 .map_err(|e| OmniError::Lance(e.to_string()))?,
1024 )
1025 .map_err(|e| OmniError::Lance(e.to_string()))?;
1026 }
1027 }
1028
1029 if files.next().is_some() {
1030 return Err(OmniError::Lance(format!(
1031 "blob rewrite for '{}' produced extra source blobs",
1032 column_name
1033 )));
1034 }
1035
1036 builder
1037 .finish()
1038 .map_err(|e| OmniError::Lance(e.to_string()))
1039}