1use super::*;
2
3#[derive(Debug, Clone, Default)]
12pub struct SchemaApplyOptions {
13 pub allow_data_loss: bool,
18}
19
20fn promote_drops_to_hard(plan: &mut SchemaMigrationPlan, allow_data_loss: bool) {
23 if !allow_data_loss {
24 return;
25 }
26 for step in &mut plan.steps {
27 match step {
28 SchemaMigrationStep::DropType { mode, .. }
29 | SchemaMigrationStep::DropProperty { mode, .. } => {
30 *mode = DropMode::Hard;
31 }
32 _ => {}
33 }
34 }
35}
36
37pub(super) async fn plan_schema(
38 db: &Omnigraph,
39 desired_schema_source: &str,
40 options: SchemaApplyOptions,
41) -> Result<SchemaMigrationPlan> {
42 db.ensure_schema_state_valid().await?;
43 let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
44 let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
45 let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
46 .map_err(|err| OmniError::manifest(err.to_string()))?;
47 promote_drops_to_hard(&mut plan, options.allow_data_loss);
48 Ok(plan)
49}
50
51struct PlannedSchemaApply {
52 plan: SchemaMigrationPlan,
53 desired_ir: SchemaIR,
54 desired_catalog: Catalog,
55}
56
57async fn plan_schema_for_apply(
58 db: &Omnigraph,
59 desired_schema_source: &str,
60 options: SchemaApplyOptions,
61) -> Result<PlannedSchemaApply> {
62 db.ensure_schema_state_valid().await?;
63 let branches = db.coordinator.read().await.all_branches().await?;
64 let blocking_branches = branches
70 .into_iter()
71 .filter(|branch| branch != "main" && !is_internal_system_branch(branch))
72 .collect::<Vec<_>>();
73 if !blocking_branches.is_empty() {
74 return Err(OmniError::manifest_conflict(format!(
75 "schema apply requires a graph with only main; found non-main branches: {}",
76 blocking_branches.join(", ")
77 )));
78 }
79
80 let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
81 let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
82 let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
83 .map_err(|err| OmniError::manifest(err.to_string()))?;
84 promote_drops_to_hard(&mut plan, options.allow_data_loss);
85 if !plan.supported {
86 let message = plan
87 .steps
88 .iter()
89 .find_map(|step| step.unsupported_error_message())
90 .unwrap_or_else(|| "unsupported schema migration plan".to_string());
91 return Err(OmniError::manifest(message));
92 }
93
94 let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
95 fixup_blob_schemas(&mut desired_catalog);
96 Ok(PlannedSchemaApply {
97 plan,
98 desired_ir,
99 desired_catalog,
100 })
101}
102
103pub(super) async fn preview_schema_apply(
104 db: &Omnigraph,
105 desired_schema_source: &str,
106 options: SchemaApplyOptions,
107) -> Result<SchemaApplyPreview> {
108 let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
109 Ok(SchemaApplyPreview {
110 plan: planned.plan,
111 catalog: planned.desired_catalog,
112 })
113}
114
115pub(super) async fn apply_schema<F>(
116 db: &Omnigraph,
117 desired_schema_source: &str,
118 options: SchemaApplyOptions,
119 actor: Option<&str>,
120 validate_catalog: F,
121) -> Result<SchemaApplyResult>
122where
123 F: FnOnce(&Catalog) -> Result<()>,
124{
125 db.enforce(
142 omnigraph_policy::PolicyAction::SchemaApply,
143 &omnigraph_policy::ResourceScope::TargetBranch("main".to_string()),
144 actor,
145 )?;
146
147 db.heal_pending_recovery_sidecars().await?;
154
155 acquire_schema_apply_lock(db).await?;
156 let result = apply_schema_with_lock(db, desired_schema_source, options, validate_catalog).await;
157 let release_result = release_schema_apply_lock(db).await;
158 match (result, release_result) {
159 (Ok(result), Ok(())) => Ok(result),
160 (Ok(_), Err(err)) => Err(err),
161 (Err(err), Ok(())) => Err(err),
162 (Err(err), Err(_)) => Err(err),
163 }
164}
165
166pub(super) async fn apply_schema_with_lock<F>(
167 db: &Omnigraph,
168 desired_schema_source: &str,
169 options: SchemaApplyOptions,
170 validate_catalog: F,
171) -> Result<SchemaApplyResult>
172where
173 F: FnOnce(&Catalog) -> Result<()>,
174{
175 let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
176 validate_catalog(&planned.desired_catalog)?;
177 let PlannedSchemaApply {
178 plan,
179 desired_ir,
180 desired_catalog,
181 } = planned;
182 if plan.steps.is_empty() {
183 return Ok(SchemaApplyResult {
184 supported: true,
185 applied: false,
186 manifest_version: db.version().await,
187 steps: plan.steps,
188 });
189 }
190
191 let snapshot = db.snapshot().await;
192 let base_manifest_version = snapshot.version();
193 let mut added_tables = BTreeSet::new();
194 let mut renamed_tables = HashMap::new();
195 let mut rewritten_tables = BTreeSet::new();
196 let mut dropped_tables = BTreeSet::new();
197 let mut hard_cleanup_targets: Vec<(String, String)> = Vec::new();
203 let mut property_renames = HashMap::<String, HashMap<String, String>>::new();
204 let mut changed_edge_tables = false;
205
206 for step in &plan.steps {
207 match step {
208 SchemaMigrationStep::AddType { type_kind, name } => {
209 let table_key = schema_table_key(*type_kind, name);
210 if table_key.starts_with("edge:") {
211 changed_edge_tables = true;
212 }
213 added_tables.insert(table_key);
214 }
215 SchemaMigrationStep::RenameType {
216 type_kind,
217 from,
218 to,
219 } => {
220 let source_key = schema_table_key(*type_kind, from);
221 let target_key = schema_table_key(*type_kind, to);
222 if source_key.starts_with("edge:") {
223 changed_edge_tables = true;
224 }
225 renamed_tables.insert(target_key, source_key);
226 }
227 SchemaMigrationStep::AddProperty {
228 type_kind,
229 type_name,
230 ..
231 } => {
232 let table_key = schema_table_key(*type_kind, type_name);
233 if table_key.starts_with("edge:") {
234 changed_edge_tables = true;
235 }
236 rewritten_tables.insert(table_key);
237 }
238 SchemaMigrationStep::RenameProperty {
239 type_kind,
240 type_name,
241 from,
242 to,
243 } => {
244 let table_key = schema_table_key(*type_kind, type_name);
245 if table_key.starts_with("edge:") {
246 changed_edge_tables = true;
247 }
248 rewritten_tables.insert(table_key.clone());
249 property_renames
250 .entry(table_key)
251 .or_default()
252 .insert(to.clone(), from.clone());
253 }
254 SchemaMigrationStep::AddConstraint { .. }
261 | SchemaMigrationStep::UpdateTypeMetadata { .. }
262 | SchemaMigrationStep::UpdatePropertyMetadata { .. } => {}
263 SchemaMigrationStep::DropProperty {
264 type_kind,
265 type_name,
266 mode,
267 ..
268 } => {
269 let table_key = schema_table_key(*type_kind, type_name);
285 if table_key.starts_with("edge:") {
286 changed_edge_tables = true;
287 }
288 if matches!(mode, DropMode::Hard) {
289 let entry = snapshot.entry(&table_key).ok_or_else(|| {
290 OmniError::manifest(format!(
291 "missing table '{}' for hard property drop",
292 table_key
293 ))
294 })?;
295 let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
296 hard_cleanup_targets.push((table_key.clone(), full_uri));
297 }
298 rewritten_tables.insert(table_key);
299 }
300 SchemaMigrationStep::DropType {
301 type_kind,
302 name,
303 mode,
304 } => {
305 let table_key = schema_table_key(*type_kind, name);
321 if table_key.starts_with("edge:") {
322 changed_edge_tables = true;
323 }
324 if matches!(mode, DropMode::Hard) {
325 let entry = snapshot.entry(&table_key).ok_or_else(|| {
326 OmniError::manifest(format!(
327 "missing table '{}' for hard type drop",
328 table_key
329 ))
330 })?;
331 let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
332 hard_cleanup_targets.push((table_key.clone(), full_uri));
333 }
334 dropped_tables.insert(table_key);
335 }
336 step @ SchemaMigrationStep::UnsupportedChange { .. } => {
337 return Err(OmniError::manifest(
338 step.unsupported_error_message()
339 .unwrap_or_else(|| "unsupported schema migration step".to_string()),
340 ));
341 }
342 }
343 }
344
345 let mut table_registrations = HashMap::<String, String>::new();
346 let mut table_updates = HashMap::<String, crate::db::SubTableUpdate>::new();
347 let mut table_tombstones = HashMap::<String, u64>::new();
348
349 let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = rewritten_tables
357 .iter()
358 .filter_map(|table_key| {
359 let entry = snapshot.entry(table_key)?;
360 Some(crate::db::manifest::SidecarTablePin {
361 table_key: table_key.clone(),
362 table_path: db.storage().dataset_uri(&entry.table_path),
363 expected_version: entry.table_version,
364 post_commit_pin: entry.table_version + 1,
365 confirmed_version: None,
368 table_branch: entry.table_branch.clone(),
369 })
370 })
371 .collect();
372 let mut sidecar_registrations: Vec<crate::db::manifest::SidecarTableRegistration> = Vec::new();
379 for table_key in &added_tables {
380 sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
381 table_key: table_key.clone(),
382 table_path: table_path_for_table_key(table_key)?,
383 table_branch: None,
384 });
385 }
386 for target_table_key in renamed_tables.keys() {
387 sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
388 table_key: target_table_key.clone(),
389 table_path: table_path_for_table_key(target_table_key)?,
390 table_branch: None,
391 });
392 }
393 let mut sidecar_tombstones: Vec<crate::db::manifest::SidecarTombstone> = Vec::new();
394 for source_table_key in renamed_tables.values() {
395 let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
396 OmniError::manifest(format!(
397 "missing source table '{}' for schema rename when building recovery sidecar",
398 source_table_key
399 ))
400 })?;
401 sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
402 table_key: source_table_key.clone(),
403 tombstone_version: source_entry.table_version.saturating_add(1),
404 });
405 }
406 for dropped_table_key in &dropped_tables {
413 let entry = snapshot.entry(dropped_table_key).ok_or_else(|| {
414 OmniError::manifest(format!(
415 "missing table '{}' for soft drop when building recovery sidecar",
416 dropped_table_key
417 ))
418 })?;
419 let tombstone_version = entry.table_version.saturating_add(1);
420 sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
421 table_key: dropped_table_key.clone(),
422 tombstone_version,
423 });
424 table_tombstones.insert(dropped_table_key.clone(), tombstone_version);
425 }
426
427 let mut schema_apply_queue_keys: Vec<(String, Option<String>)> = recovery_pins
439 .iter()
440 .map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
441 .collect();
442 let writes_sidecar = !(recovery_pins.is_empty()
450 && sidecar_registrations.is_empty()
451 && sidecar_tombstones.is_empty());
452 if writes_sidecar {
453 schema_apply_queue_keys.push(crate::db::manifest::schema_apply_serial_queue_key());
454 }
455 let _schema_apply_queue_guards = db
456 .write_queue()
457 .acquire_many(&schema_apply_queue_keys)
458 .await;
459
460 let recovery_handle = if !writes_sidecar {
461 None
462 } else {
463 let mut sidecar = crate::db::manifest::new_sidecar(
471 crate::db::manifest::SidecarKind::SchemaApply,
472 None,
473 None,
477 recovery_pins,
478 );
479 sidecar.additional_registrations = sidecar_registrations;
480 sidecar.tombstones = sidecar_tombstones;
481 Some(
482 crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
483 .await?,
484 )
485 };
486
487 for table_key in &added_tables {
488 let table_path = table_path_for_table_key(table_key)?;
489 let dataset_uri = db.storage().dataset_uri(&table_path);
490 let schema = schema_for_table_key(&desired_catalog, table_key)?;
491 let ds =
492 SnapshotHandle::new(TableStore::create_empty_dataset(&dataset_uri, &schema).await?);
493 let state = db.storage().table_state(&dataset_uri, &ds).await?;
497 table_registrations.insert(table_key.clone(), table_path);
498 table_updates.insert(
499 table_key.clone(),
500 crate::db::SubTableUpdate {
501 table_key: table_key.clone(),
502 table_version: state.version,
503 table_branch: None,
504 row_count: state.row_count,
505 version_metadata: state.version_metadata,
506 },
507 );
508 }
509
510 for (target_table_key, source_table_key) in &renamed_tables {
511 let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
512 OmniError::manifest(format!(
513 "missing source table '{}' for schema rename",
514 source_table_key
515 ))
516 })?;
517 ensure_snapshot_entry_head_matches(db, source_entry).await?;
518 let source_ds = db
519 .storage()
520 .open_snapshot_at_table(&snapshot, source_table_key)
521 .await?;
522 let current_catalog = db.catalog();
523 let batch = batch_for_schema_apply_rewrite(
524 db,
525 &source_ds,
526 source_table_key,
527 ¤t_catalog,
528 target_table_key,
529 &desired_catalog,
530 property_renames.get(target_table_key),
531 )
532 .await?;
533 let table_path = table_path_for_table_key(target_table_key)?;
534 let dataset_uri = db.storage().dataset_uri(&table_path);
535 let target_ds = SnapshotHandle::new(TableStore::write_dataset(&dataset_uri, batch).await?);
536 let state = db.storage().table_state(&dataset_uri, &target_ds).await?;
538 table_registrations.insert(target_table_key.clone(), table_path);
539 table_updates.insert(
540 target_table_key.clone(),
541 crate::db::SubTableUpdate {
542 table_key: target_table_key.clone(),
543 table_version: state.version,
544 table_branch: None,
545 row_count: state.row_count,
546 version_metadata: state.version_metadata,
547 },
548 );
549 table_tombstones.insert(
550 source_table_key.clone(),
551 source_entry.table_version.saturating_add(1),
552 );
553 }
554
555 for table_key in &rewritten_tables {
556 if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) {
557 continue;
558 }
559 let entry = snapshot.entry(table_key).ok_or_else(|| {
560 OmniError::manifest(format!(
561 "missing source table '{}' for schema apply",
562 table_key
563 ))
564 })?;
565 ensure_snapshot_entry_head_matches(db, entry).await?;
566 let source_ds = db
567 .storage()
568 .open_snapshot_at_table(&snapshot, table_key)
569 .await?;
570 let current_catalog = db.catalog();
571 let batch = batch_for_schema_apply_rewrite(
572 db,
573 &source_ds,
574 table_key,
575 ¤t_catalog,
576 table_key,
577 &desired_catalog,
578 property_renames.get(table_key),
579 )
580 .await?;
581 let dataset_uri = db.storage().dataset_uri(&entry.table_path);
582 let existing = db
589 .storage()
590 .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
591 .await?;
592 let staged = db.storage().stage_overwrite(&existing, batch).await?;
593 let target_ds = db.storage().commit_staged(existing, staged).await?;
594 let state = db.storage().table_state(&dataset_uri, &target_ds).await?;
598 table_updates.insert(
599 table_key.clone(),
600 crate::db::SubTableUpdate {
601 table_key: table_key.clone(),
602 table_version: state.version,
603 table_branch: None,
604 row_count: state.row_count,
605 version_metadata: state.version_metadata,
606 },
607 );
608 }
609
610 let mut manifest_changes = Vec::new();
618 for (table_key, table_path) in table_registrations {
619 manifest_changes.push(ManifestChange::RegisterTable(TableRegistration {
620 table_key,
621 table_path,
622 }));
623 }
624 for update in table_updates.into_values() {
625 manifest_changes.push(ManifestChange::Update(update));
626 }
627 for (table_key, tombstone_version) in table_tombstones {
628 manifest_changes.push(ManifestChange::Tombstone(TableTombstone {
629 table_key,
630 tombstone_version,
631 }));
632 }
633
634 db.refresh_coordinator_only().await?;
635 let current_manifest_version = db.version().await;
636 if current_manifest_version != base_manifest_version {
637 return Err(OmniError::manifest_conflict(format!(
638 "schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
639 base_manifest_version, current_manifest_version,
640 )));
641 }
642
643 crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_BEFORE_STAGING_WRITE)?;
652
653 let staging_pg_uri = schema_source_staging_uri(&db.root_uri);
654 db.storage
655 .write_text(&staging_pg_uri, desired_schema_source)
656 .await?;
657 write_schema_contract_staging(&db.root_uri, db.storage.as_ref(), &desired_ir).await?;
658
659 crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_AFTER_STAGING_WRITE)?;
660
661 let PublishedSnapshot {
663 manifest_version,
664 _snapshot_id: _,
665 } = db
666 .coordinator
667 .write()
668 .await
669 .commit_changes_with_actor(&manifest_changes, None)
670 .await?;
671
672 crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_AFTER_MANIFEST_COMMIT)?;
673
674 db.storage
675 .rename_text(&staging_pg_uri, &schema_source_uri(&db.root_uri))
676 .await?;
677 db.storage
678 .rename_text(
679 &schema_ir_staging_uri(&db.root_uri),
680 &schema_ir_uri(&db.root_uri),
681 )
682 .await?;
683 db.storage
684 .rename_text(
685 &schema_state_staging_uri(&db.root_uri),
686 &schema_state_uri(&db.root_uri),
687 )
688 .await?;
689
690 db.store_catalog(desired_catalog);
691 db.store_schema_source(desired_schema_source.to_string());
692 db.coordinator.write().await.refresh().await?;
693 db.runtime_cache.invalidate_all().await;
694 if changed_edge_tables {
695 db.invalidate_graph_index().await;
696 }
697
698 if let Some(handle) = recovery_handle {
706 if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
707 tracing::warn!(
708 error = %err,
709 operation_id = handle.operation_id.as_str(),
710 "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
711 );
712 }
713 }
714
715 for (table_key, full_uri) in &hard_cleanup_targets {
722 match cleanup_dataset_old_versions(db, full_uri).await {
723 Ok(()) => {}
724 Err(err) => {
725 tracing::warn!(
726 error = %err,
727 table_key = table_key.as_str(),
728 "hard-drop cleanup_old_versions failed; rerun `omnigraph cleanup` to reclaim",
729 );
730 }
731 }
732 }
733
734 Ok(SchemaApplyResult {
735 supported: true,
736 applied: true,
737 manifest_version,
738 steps: plan.steps,
739 })
740}
741
742async fn cleanup_dataset_old_versions(db: &Omnigraph, full_uri: &str) -> Result<()> {
752 use chrono::Utc;
753 use lance::dataset::cleanup::CleanupPolicy;
754 let ds = lance::Dataset::open(full_uri)
756 .await
757 .map_err(|e| OmniError::Lance(e.to_string()))?;
758 let policy = CleanupPolicy {
759 before_timestamp: Some(Utc::now()),
760 before_version: None,
761 delete_unverified: false,
762 error_if_tagged_old_versions: false,
763 clean_referenced_branches: false,
764 delete_rate_limit: None,
765 };
766 let _removed = lance::dataset::cleanup::cleanup_old_versions(&ds, policy)
767 .await
768 .map_err(|e| OmniError::Lance(e.to_string()))?;
769 let _ = db;
770 Ok(())
771}
772
773pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) -> Result<()> {
774 db.refresh_coordinator_only().await?;
775 ensure_schema_apply_not_locked(db, operation).await
776}
777
778pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> {
779 db.ensure_schema_state_valid().await?;
780 db.refresh_coordinator_only().await?;
781 let branches = db.coordinator.read().await.all_branches().await?;
782 if branches
783 .iter()
784 .any(|branch| is_schema_apply_lock_branch(branch))
785 {
786 return Err(OmniError::manifest_conflict(
787 "schema apply is already in progress".to_string(),
788 ));
789 }
790
791 db.coordinator
792 .write()
793 .await
794 .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
795 .await?;
796 db.refresh_coordinator_only().await?;
797
798 let blocking_branches = db
799 .coordinator
800 .read()
801 .await
802 .all_branches()
803 .await?
804 .into_iter()
805 .filter(|branch| branch != "main" && !is_internal_system_branch(branch))
806 .collect::<Vec<_>>();
807 if !blocking_branches.is_empty() {
808 let _ = release_schema_apply_lock(db).await;
809 return Err(OmniError::manifest_conflict(format!(
810 "schema apply requires a graph with only main; found non-main branches: {}",
811 blocking_branches.join(", ")
812 )));
813 }
814
815 Ok(())
816}
817
818pub(super) async fn release_schema_apply_lock(db: &Omnigraph) -> Result<()> {
819 db.coordinator
820 .write()
821 .await
822 .branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
823 .await?;
824 db.refresh_coordinator_only().await
830}
831
832pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> {
833 if db
834 .coordinator
835 .read()
836 .await
837 .all_branches()
838 .await?
839 .iter()
840 .any(|branch| is_schema_apply_lock_branch(branch))
841 {
842 return Err(OmniError::manifest_conflict(format!(
843 "{} is unavailable while schema apply is in progress",
844 operation
845 )));
846 }
847 Ok(())
848}
849
850pub(super) async fn ensure_snapshot_entry_head_matches(
851 db: &Omnigraph,
852 entry: &SubTableEntry,
853) -> Result<()> {
854 let dataset_uri = db.storage().dataset_uri(&entry.table_path);
855 let ds = db
856 .storage()
857 .open_dataset_head_for_write(
858 &entry.table_key,
859 &dataset_uri,
860 entry.table_branch.as_deref(),
861 )
862 .await?;
863 db.storage()
864 .ensure_expected_version(&ds, &entry.table_key, entry.table_version)
865}
866
867pub(super) async fn batch_for_schema_apply_rewrite(
868 db: &Omnigraph,
869 source_ds: &SnapshotHandle,
870 source_table_key: &str,
871 source_catalog: &Catalog,
872 target_table_key: &str,
873 target_catalog: &Catalog,
874 property_renames: Option<&HashMap<String, String>>,
875) -> Result<RecordBatch> {
876 let target_schema = schema_for_table_key(target_catalog, target_table_key)?;
877 let source_blob_properties = blob_properties_for_table_key(source_catalog, source_table_key)?;
878 let target_blob_properties = blob_properties_for_table_key(target_catalog, target_table_key)?;
879 let needs_row_ids = !source_blob_properties.is_empty() || !target_blob_properties.is_empty();
880 let batches = if needs_row_ids {
881 db.storage()
882 .scan_with_row_id(source_ds, None, None, None, true)
883 .await?
884 } else {
885 db.storage().scan_batches(source_ds).await?
886 };
887 if batches.is_empty() {
888 return Ok(RecordBatch::new_empty(target_schema));
889 }
890 let source_schema = batches[0].schema();
891 let batch = concat_or_empty_batches(source_schema, batches)?;
892
893 let row_ids = if needs_row_ids {
894 Some(
895 batch
896 .column_by_name("_rowid")
897 .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
898 .ok_or_else(|| {
899 OmniError::Lance(format!(
900 "expected _rowid column when rewriting '{}'",
901 source_table_key
902 ))
903 })?
904 .values()
905 .iter()
906 .copied()
907 .collect::<Vec<_>>(),
908 )
909 } else {
910 None
911 };
912
913 let mut columns = Vec::with_capacity(target_schema.fields().len());
914 for field in target_schema.fields() {
915 let source_name = property_renames
916 .and_then(|renames| renames.get(field.name()))
917 .map(String::as_str)
918 .unwrap_or_else(|| field.name().as_str());
919 if let Some(column) = batch.column_by_name(source_name) {
920 if target_blob_properties.contains(field.name())
921 && source_blob_properties.contains(source_name)
922 {
923 let descriptions =
924 column
925 .as_any()
926 .downcast_ref::<StructArray>()
927 .ok_or_else(|| {
928 OmniError::Lance(format!(
929 "expected blob descriptions for '{}.{}'",
930 source_table_key, source_name
931 ))
932 })?;
933 let rebuilt = rebuild_blob_column(
934 db,
935 source_ds,
936 source_name,
937 descriptions,
938 row_ids.as_deref().unwrap_or(&[]),
939 )
940 .await?;
941 columns.push(rebuilt);
942 } else {
943 columns.push(column.clone());
944 }
945 } else {
946 columns.push(new_null_array(field.data_type(), batch.num_rows()));
947 }
948 }
949
950 RecordBatch::try_new(target_schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
951}
952
953async fn rebuild_blob_column(
954 _db: &Omnigraph,
955 source_ds: &SnapshotHandle,
956 column_name: &str,
957 descriptions: &StructArray,
958 row_ids: &[u64],
959) -> Result<Arc<dyn Array>> {
960 let mut builder = BlobArrayBuilder::new(row_ids.len());
961 let mut non_null_row_ids = Vec::new();
962 let mut row_has_blob = Vec::with_capacity(row_ids.len());
963
964 for row in 0..row_ids.len() {
965 let is_null = blob_description_is_null(descriptions, row)?;
966 row_has_blob.push(!is_null);
967 if !is_null {
968 non_null_row_ids.push(row_ids[row]);
969 }
970 }
971
972 let blob_files = if non_null_row_ids.is_empty() {
973 Vec::new()
974 } else {
975 Arc::new(source_ds.dataset().clone())
976 .take_blobs(&non_null_row_ids, column_name)
977 .await
978 .map_err(|e| OmniError::Lance(e.to_string()))?
979 };
980
981 let mut files = blob_files.into_iter();
982 for has_blob in row_has_blob {
983 if !has_blob {
984 builder
985 .push_null()
986 .map_err(|e| OmniError::Lance(e.to_string()))?;
987 continue;
988 }
989
990 let blob = files.next().ok_or_else(|| {
991 OmniError::Lance(format!(
992 "blob rewrite for '{}' lost alignment with source rows",
993 column_name
994 ))
995 })?;
996 if let Some(uri) = blob.uri() {
997 builder
998 .push_uri(uri)
999 .map_err(|e| OmniError::Lance(e.to_string()))?;
1000 } else {
1001 builder
1002 .push_bytes(
1003 blob.read()
1004 .await
1005 .map_err(|e| OmniError::Lance(e.to_string()))?,
1006 )
1007 .map_err(|e| OmniError::Lance(e.to_string()))?;
1008 }
1009 }
1010
1011 if files.next().is_some() {
1012 return Err(OmniError::Lance(format!(
1013 "blob rewrite for '{}' produced extra source blobs",
1014 column_name
1015 )));
1016 }
1017
1018 builder
1019 .finish()
1020 .map_err(|e| OmniError::Lance(e.to_string()))
1021}