Skip to main content

omnigraph/db/omnigraph/
schema_apply.rs

1use super::*;
2
3/// Operator-supplied options that gate schema-apply behavior.
4///
5/// Today the only knob is `allow_data_loss`, which promotes
6/// `DropMode::Soft` steps to `DropMode::Hard` (per chassis v1
7/// commit #5). Soft is the default — drops are reversible via Lance
8/// time travel until cleanup runs. Hard runs `cleanup_old_versions`
9/// on the affected datasets immediately after the manifest publish,
10/// making the prior column data unreachable.
11#[derive(Debug, Clone, Default)]
12pub struct SchemaApplyOptions {
13    /// Allow destructive (data-loss) schema changes. When true, the
14    /// planner promotes every `DropMode::Soft` step to
15    /// `DropMode::Hard`, and the apply path runs
16    /// `cleanup_old_versions` on affected datasets after the publish.
17    pub allow_data_loss: bool,
18}
19
20/// Promote every `Soft` drop variant in the plan to `Hard` when
21/// `allow_data_loss` is set. Idempotent on non-drop steps.
22fn promote_drops_to_hard(plan: &mut SchemaMigrationPlan, allow_data_loss: bool) {
23    if !allow_data_loss {
24        return;
25    }
26    for step in &mut plan.steps {
27        match step {
28            SchemaMigrationStep::DropType { mode, .. }
29            | SchemaMigrationStep::DropProperty { mode, .. } => {
30                *mode = DropMode::Hard;
31            }
32            _ => {}
33        }
34    }
35}
36
37pub(super) async fn plan_schema(
38    db: &Omnigraph,
39    desired_schema_source: &str,
40    options: SchemaApplyOptions,
41) -> Result<SchemaMigrationPlan> {
42    db.ensure_schema_state_valid().await?;
43    let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
44    let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
45    let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
46        .map_err(|err| OmniError::manifest(err.to_string()))?;
47    promote_drops_to_hard(&mut plan, options.allow_data_loss);
48    Ok(plan)
49}
50
51struct PlannedSchemaApply {
52    plan: SchemaMigrationPlan,
53    desired_ir: SchemaIR,
54    desired_catalog: Catalog,
55}
56
57async fn plan_schema_for_apply(
58    db: &Omnigraph,
59    desired_schema_source: &str,
60    options: SchemaApplyOptions,
61) -> Result<PlannedSchemaApply> {
62    db.ensure_schema_state_valid().await?;
63    let branches = db.coordinator.read().await.all_branches().await?;
64    // Skip `main` and internal system branches (the schema-apply lock branch,
65    // the cluster-wide schema-apply serializer). Legacy `__run__*` staging
66    // branches were swept off `__manifest` by the v2→v3 migration that runs in
67    // `Omnigraph::open(ReadWrite)` before this check (MR-770), so they no
68    // longer appear here.
69    let blocking_branches = branches
70        .into_iter()
71        .filter(|branch| branch != "main" && !is_internal_system_branch(branch))
72        .collect::<Vec<_>>();
73    if !blocking_branches.is_empty() {
74        return Err(OmniError::manifest_conflict(format!(
75            "schema apply requires a graph with only main; found non-main branches: {}",
76            blocking_branches.join(", ")
77        )));
78    }
79
80    let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
81    let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
82    let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
83        .map_err(|err| OmniError::manifest(err.to_string()))?;
84    promote_drops_to_hard(&mut plan, options.allow_data_loss);
85    if !plan.supported {
86        let message = plan
87            .steps
88            .iter()
89            .find_map(|step| step.unsupported_error_message())
90            .unwrap_or_else(|| "unsupported schema migration plan".to_string());
91        return Err(OmniError::manifest(message));
92    }
93
94    let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
95    fixup_blob_schemas(&mut desired_catalog);
96    Ok(PlannedSchemaApply {
97        plan,
98        desired_ir,
99        desired_catalog,
100    })
101}
102
103pub(super) async fn preview_schema_apply(
104    db: &Omnigraph,
105    desired_schema_source: &str,
106    options: SchemaApplyOptions,
107) -> Result<SchemaApplyPreview> {
108    let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
109    Ok(SchemaApplyPreview {
110        plan: planned.plan,
111        catalog: planned.desired_catalog,
112    })
113}
114
115pub(super) async fn apply_schema<F>(
116    db: &Omnigraph,
117    desired_schema_source: &str,
118    options: SchemaApplyOptions,
119    actor: Option<&str>,
120    validate_catalog: F,
121) -> Result<SchemaApplyResult>
122where
123    F: FnOnce(&Catalog) -> Result<()>,
124{
125    // Engine-layer policy gate (MR-722 chassis core).
126    //
127    // Fires BEFORE acquiring the schema-apply lock or doing any other
128    // work. When no PolicyChecker is installed this is a no-op and
129    // the apply path behaves exactly as it did before MR-722. When
130    // a PolicyChecker IS installed and the actor is None, this is a
131    // hard error — see Omnigraph::enforce's docstring for the
132    // forget-the-actor-footgun reasoning.
133    //
134    // Scope is TargetBranch("main") to match the HTTP-layer convention
135    // for SchemaApply: branch=None, target_branch=Some("main"). Cedar
136    // policies in the wild use `target_branch_scope: protected` to
137    // gate schema applies, so the engine-layer call has to set the
138    // target_branch shape that activates that predicate. Wrong scope
139    // here = silent policy mismatch with HTTP. See
140    // `omnigraph_policy::ResourceScope::to_branch_pair` for the mapping.
141    db.enforce(
142        omnigraph_policy::PolicyAction::SchemaApply,
143        &omnigraph_policy::ResourceScope::TargetBranch("main".to_string()),
144        actor,
145    )?;
146
147    acquire_schema_apply_lock(db).await?;
148    let result = apply_schema_with_lock(db, desired_schema_source, options, validate_catalog).await;
149    let release_result = release_schema_apply_lock(db).await;
150    match (result, release_result) {
151        (Ok(result), Ok(())) => Ok(result),
152        (Ok(_), Err(err)) => Err(err),
153        (Err(err), Ok(())) => Err(err),
154        (Err(err), Err(_)) => Err(err),
155    }
156}
157
158pub(super) async fn apply_schema_with_lock<F>(
159    db: &Omnigraph,
160    desired_schema_source: &str,
161    options: SchemaApplyOptions,
162    validate_catalog: F,
163) -> Result<SchemaApplyResult>
164where
165    F: FnOnce(&Catalog) -> Result<()>,
166{
167    let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
168    validate_catalog(&planned.desired_catalog)?;
169    let PlannedSchemaApply {
170        plan,
171        desired_ir,
172        desired_catalog,
173    } = planned;
174    if plan.steps.is_empty() {
175        return Ok(SchemaApplyResult {
176            supported: true,
177            applied: false,
178            manifest_version: db.version().await,
179            steps: plan.steps,
180        });
181    }
182
183    let snapshot = db.snapshot().await;
184    let base_manifest_version = snapshot.version();
185    let mut added_tables = BTreeSet::new();
186    let mut renamed_tables = HashMap::new();
187    let mut rewritten_tables = BTreeSet::new();
188    let mut indexed_tables = BTreeSet::new();
189    let mut dropped_tables = BTreeSet::new();
190    // Hard-drop cleanup targets: (table_key, full_dataset_uri).
191    // Populated for DropProperty { Hard } and DropType { Hard }; the
192    // post-publish cleanup runs `cleanup_old_versions` on each
193    // dataset to reclaim prior versions, making time-travel back
194    // to pre-drop state unreachable.
195    let mut hard_cleanup_targets: Vec<(String, String)> = Vec::new();
196    let mut property_renames = HashMap::<String, HashMap<String, String>>::new();
197    let mut changed_edge_tables = false;
198
199    for step in &plan.steps {
200        match step {
201            SchemaMigrationStep::AddType { type_kind, name } => {
202                let table_key = schema_table_key(*type_kind, name);
203                if table_key.starts_with("edge:") {
204                    changed_edge_tables = true;
205                }
206                added_tables.insert(table_key);
207            }
208            SchemaMigrationStep::RenameType {
209                type_kind,
210                from,
211                to,
212            } => {
213                let source_key = schema_table_key(*type_kind, from);
214                let target_key = schema_table_key(*type_kind, to);
215                if source_key.starts_with("edge:") {
216                    changed_edge_tables = true;
217                }
218                renamed_tables.insert(target_key, source_key);
219            }
220            SchemaMigrationStep::AddProperty {
221                type_kind,
222                type_name,
223                ..
224            } => {
225                let table_key = schema_table_key(*type_kind, type_name);
226                if table_key.starts_with("edge:") {
227                    changed_edge_tables = true;
228                }
229                rewritten_tables.insert(table_key);
230            }
231            SchemaMigrationStep::RenameProperty {
232                type_kind,
233                type_name,
234                from,
235                to,
236            } => {
237                let table_key = schema_table_key(*type_kind, type_name);
238                if table_key.starts_with("edge:") {
239                    changed_edge_tables = true;
240                }
241                rewritten_tables.insert(table_key.clone());
242                property_renames
243                    .entry(table_key)
244                    .or_default()
245                    .insert(to.clone(), from.clone());
246            }
247            SchemaMigrationStep::AddConstraint {
248                type_kind,
249                type_name,
250                ..
251            } => {
252                indexed_tables.insert(schema_table_key(*type_kind, type_name));
253            }
254            SchemaMigrationStep::UpdateTypeMetadata { .. }
255            | SchemaMigrationStep::UpdatePropertyMetadata { .. } => {}
256            SchemaMigrationStep::DropProperty {
257                type_kind,
258                type_name,
259                mode,
260                ..
261            } => {
262                // Both Soft and Hard route through the existing
263                // stage_overwrite rewrite path. batch_for_schema_apply_rewrite
264                // iterates the *target* schema fields, so a property
265                // absent from desired_catalog is naturally projected
266                // away in the rebuilt batch.
267                //
268                // The difference between Soft and Hard is what
269                // happens AFTER the manifest publish:
270                //   * Soft: nothing — the prior dataset version
271                //     retains the dropped column; reads at
272                //     snapshot_at_version(pre_drop) still see it.
273                //   * Hard: run cleanup_old_versions on the dataset
274                //     post-publish, removing the prior version (and
275                //     reclaiming any fragments unique to it). After
276                //     cleanup, time-travel back fails.
277                let table_key = schema_table_key(*type_kind, type_name);
278                if table_key.starts_with("edge:") {
279                    changed_edge_tables = true;
280                }
281                if matches!(mode, DropMode::Hard) {
282                    let entry = snapshot.entry(&table_key).ok_or_else(|| {
283                        OmniError::manifest(format!(
284                            "missing table '{}' for hard property drop",
285                            table_key
286                        ))
287                    })?;
288                    let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
289                    hard_cleanup_targets.push((table_key.clone(), full_uri));
290                }
291                rewritten_tables.insert(table_key);
292            }
293            SchemaMigrationStep::DropType {
294                type_kind,
295                name,
296                mode,
297            } => {
298                // Both Soft and Hard tombstone the table's entry in
299                // the current __manifest version (no per-table write).
300                //
301                // The difference is what happens after publish:
302                //   * Soft: dataset files retained; prior __manifest
303                //     versions still reference them; Lance time
304                //     travel + branch-from-snapshot can read the
305                //     dropped table.
306                //   * Hard: run cleanup_old_versions on the orphan
307                //     dataset post-publish. Prior dataset versions
308                //     (and their fragments) are reclaimed. The dataset
309                //     directory itself persists until a future
310                //     orphan-cleanup pass — operators who need the
311                //     directory gone too should run `omnigraph cleanup`
312                //     and (for now) remove the directory out-of-band.
313                let table_key = schema_table_key(*type_kind, name);
314                if table_key.starts_with("edge:") {
315                    changed_edge_tables = true;
316                }
317                if matches!(mode, DropMode::Hard) {
318                    let entry = snapshot.entry(&table_key).ok_or_else(|| {
319                        OmniError::manifest(format!(
320                            "missing table '{}' for hard type drop",
321                            table_key
322                        ))
323                    })?;
324                    let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
325                    hard_cleanup_targets.push((table_key.clone(), full_uri));
326                }
327                dropped_tables.insert(table_key);
328            }
329            step @ SchemaMigrationStep::UnsupportedChange { .. } => {
330                return Err(OmniError::manifest(
331                    step.unsupported_error_message()
332                        .unwrap_or_else(|| "unsupported schema migration step".to_string()),
333                ));
334            }
335        }
336    }
337
338    let mut table_registrations = HashMap::<String, String>::new();
339    let mut table_updates = HashMap::<String, crate::db::SubTableUpdate>::new();
340    let mut table_tombstones = HashMap::<String, u64>::new();
341
342    // Recovery sidecar: protect the per-table commit_staged loop in
343    // rewritten_tables + indexed_tables. The post_commit_pin we record
344    // here is a lower bound (expected + 1); the classifier loose-matches
345    // for SidecarKind::SchemaApply because the actual N depends on how
346    // many indices need building. See classify_table's loose-match arm.
347    let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = rewritten_tables
348        .iter()
349        .chain(indexed_tables.iter().filter(|t| {
350            !rewritten_tables.contains(*t)
351                && !added_tables.contains(*t)
352                && !renamed_tables.contains_key(*t)
353        }))
354        .filter_map(|table_key| {
355            let entry = snapshot.entry(table_key)?;
356            Some(crate::db::manifest::SidecarTablePin {
357                table_key: table_key.clone(),
358                table_path: db.table_store.dataset_uri(&entry.table_path),
359                expected_version: entry.table_version,
360                post_commit_pin: entry.table_version + 1,
361                table_branch: entry.table_branch.clone(),
362            })
363        })
364        .collect();
365    // Capture additional registrations + tombstones for the sidecar so
366    // recovery can publish them alongside the per-table updates. Without
367    // this, an added type's dataset is created in Phase B but the
368    // manifest never gains an entry for it after roll-forward — the
369    // live `_schema.pg` declares a type the manifest doesn't know about
370    // and reads through the engine fail with "no manifest entry for X".
371    let mut sidecar_registrations: Vec<crate::db::manifest::SidecarTableRegistration> = Vec::new();
372    for table_key in &added_tables {
373        sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
374            table_key: table_key.clone(),
375            table_path: table_path_for_table_key(table_key)?,
376            table_branch: None,
377        });
378    }
379    for target_table_key in renamed_tables.keys() {
380        sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
381            table_key: target_table_key.clone(),
382            table_path: table_path_for_table_key(target_table_key)?,
383            table_branch: None,
384        });
385    }
386    let mut sidecar_tombstones: Vec<crate::db::manifest::SidecarTombstone> = Vec::new();
387    for source_table_key in renamed_tables.values() {
388        let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
389            OmniError::manifest(format!(
390                "missing source table '{}' for schema rename when building recovery sidecar",
391                source_table_key
392            ))
393        })?;
394        sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
395            table_key: source_table_key.clone(),
396            tombstone_version: source_entry.table_version.saturating_add(1),
397        });
398    }
399    // Soft DropType: mark each dropped table for tombstoning in the
400    // recovery sidecar AND in the live table_tombstones map. The
401    // mechanism mirrors rename's source-table tombstone — manifest
402    // entry removed at version+1, dataset files retained, time-travel
403    // reachable until cleanup. No Phase B write happens for these
404    // tables; the recovery sidecar is purely the manifest delta.
405    for dropped_table_key in &dropped_tables {
406        let entry = snapshot.entry(dropped_table_key).ok_or_else(|| {
407            OmniError::manifest(format!(
408                "missing table '{}' for soft drop when building recovery sidecar",
409                dropped_table_key
410            ))
411        })?;
412        let tombstone_version = entry.table_version.saturating_add(1);
413        sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
414            table_key: dropped_table_key.clone(),
415            tombstone_version,
416        });
417        table_tombstones.insert(dropped_table_key.clone(), tombstone_version);
418    }
419
420    // Acquire per-(table_key, branch) queues for every existing table
421    // that schema_apply will rewrite or re-index. New tables (added or
422    // renamed targets) aren't acquired — they have no existing dataset
423    // to race against. Held across the per-table commit loop and the
424    // manifest publish via `commit_changes_with_actor` below.
425    //
426    // Schema-apply already holds the graph-wide `__schema_apply_lock__`
427    // sentinel branch, so under PR 1b's intermediate state these
428    // per-table acquisitions are uncontended. They exist for symmetry
429    // with future MR-870 recovery, which will need queue acquisition
430    // before any `Dataset::restore` it issues for SchemaApply sidecars.
431    let schema_apply_queue_keys: Vec<(String, Option<String>)> = recovery_pins
432        .iter()
433        .map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
434        .collect();
435    let _schema_apply_queue_guards = db
436        .write_queue()
437        .acquire_many(&schema_apply_queue_keys)
438        .await;
439
440    let recovery_handle = if recovery_pins.is_empty()
441        && sidecar_registrations.is_empty()
442        && sidecar_tombstones.is_empty()
443    {
444        None
445    } else {
446        // `branch=None` because schema_apply publishes against main —
447        // the `__schema_apply_lock__` branch is purely a serialization
448        // sentinel (acquire_schema_apply_lock creates it; the manifest
449        // publish via coordinator.commit_changes_with_actor below targets
450        // the coordinator's active branch, which is the pre-lock branch).
451        // If the lock release fires before recovery, the lock branch is
452        // gone — the sidecar must not reference it.
453        let mut sidecar = crate::db::manifest::new_sidecar(
454            crate::db::manifest::SidecarKind::SchemaApply,
455            None,
456            // `apply_schema` doesn't currently take an actor (no `apply_schema_as`
457            // public API). The HTTP server's /schema/apply handler can pass actor
458            // context through a follow-up addition. For now, system-attributed.
459            None,
460            recovery_pins,
461        );
462        sidecar.additional_registrations = sidecar_registrations;
463        sidecar.tombstones = sidecar_tombstones;
464        Some(
465            crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
466                .await?,
467        )
468    };
469
470    for table_key in &added_tables {
471        let table_path = table_path_for_table_key(table_key)?;
472        let dataset_uri = db.table_store.dataset_uri(&table_path);
473        let schema = schema_for_table_key(&desired_catalog, table_key)?;
474        let mut ds = TableStore::create_empty_dataset(&dataset_uri, &schema).await?;
475        db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
476            .await?;
477        let state = db.table_store.table_state(&dataset_uri, &ds).await?;
478        table_registrations.insert(table_key.clone(), table_path);
479        table_updates.insert(
480            table_key.clone(),
481            crate::db::SubTableUpdate {
482                table_key: table_key.clone(),
483                table_version: state.version,
484                table_branch: None,
485                row_count: state.row_count,
486                version_metadata: state.version_metadata,
487            },
488        );
489    }
490
491    for (target_table_key, source_table_key) in &renamed_tables {
492        let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
493            OmniError::manifest(format!(
494                "missing source table '{}' for schema rename",
495                source_table_key
496            ))
497        })?;
498        ensure_snapshot_entry_head_matches(db, source_entry).await?;
499        let source_ds = snapshot.open(source_table_key).await?;
500        let current_catalog = db.catalog();
501        let batch = batch_for_schema_apply_rewrite(
502            db,
503            &source_ds,
504            source_table_key,
505            &current_catalog,
506            target_table_key,
507            &desired_catalog,
508            property_renames.get(target_table_key),
509        )
510        .await?;
511        let table_path = table_path_for_table_key(target_table_key)?;
512        let dataset_uri = db.table_store.dataset_uri(&table_path);
513        let mut target_ds = TableStore::write_dataset(&dataset_uri, batch).await?;
514        db.build_indices_on_dataset_for_catalog(&desired_catalog, target_table_key, &mut target_ds)
515            .await?;
516        let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
517        table_registrations.insert(target_table_key.clone(), table_path);
518        table_updates.insert(
519            target_table_key.clone(),
520            crate::db::SubTableUpdate {
521                table_key: target_table_key.clone(),
522                table_version: state.version,
523                table_branch: None,
524                row_count: state.row_count,
525                version_metadata: state.version_metadata,
526            },
527        );
528        table_tombstones.insert(
529            source_table_key.clone(),
530            source_entry.table_version.saturating_add(1),
531        );
532    }
533
534    for table_key in &rewritten_tables {
535        if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) {
536            continue;
537        }
538        let entry = snapshot.entry(table_key).ok_or_else(|| {
539            OmniError::manifest(format!(
540                "missing source table '{}' for schema apply",
541                table_key
542            ))
543        })?;
544        ensure_snapshot_entry_head_matches(db, entry).await?;
545        let source_ds = snapshot.open(table_key).await?;
546        let current_catalog = db.catalog();
547        let batch = batch_for_schema_apply_rewrite(
548            db,
549            &source_ds,
550            table_key,
551            &current_catalog,
552            table_key,
553            &desired_catalog,
554            property_renames.get(table_key),
555        )
556        .await?;
557        let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
558        // Route through stage_overwrite + commit_staged for non-empty
559        // batches. Lance's `InsertBuilder::execute_uncommitted`
560        // errors on empty data (lance-4.0.0 `src/dataset/write/insert.rs:144`),
561        // so the empty-rewrite case stays on `overwrite_dataset` (which
562        // accepts empty input). The empty case is rare in schema_apply
563        // — it only fires when the source table itself was already empty
564        // — and schema_apply runs under `__schema_apply_lock__` so the
565        // narrow inline-commit residual is bounded.
566        let mut target_ds = if batch.num_rows() == 0 {
567            TableStore::overwrite_dataset(&dataset_uri, batch).await?
568        } else {
569            // Pass `entry.table_branch.as_deref()` (not `None`) for
570            // consistency with the indexed_tables block below. Schema
571            // apply runs under `__schema_apply_lock__` which today
572            // rejects non-main branches, so `entry.table_branch` is
573            // expected to be `None`. But the defensive passthrough
574            // means a future relaxation of the lock-check can't quietly
575            // open the wrong HEAD here.
576            let existing = db
577                .table_store
578                .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
579                .await?;
580            let staged = db.table_store.stage_overwrite(&existing, batch).await?;
581            db.table_store
582                .commit_staged(Arc::new(existing), staged.transaction)
583                .await?
584        };
585        db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut target_ds)
586            .await?;
587        let state = db.table_store.table_state(&dataset_uri, &target_ds).await?;
588        table_updates.insert(
589            table_key.clone(),
590            crate::db::SubTableUpdate {
591                table_key: table_key.clone(),
592                table_version: state.version,
593                table_branch: None,
594                row_count: state.row_count,
595                version_metadata: state.version_metadata,
596            },
597        );
598    }
599
600    for table_key in &indexed_tables {
601        if added_tables.contains(table_key)
602            || renamed_tables.contains_key(table_key)
603            || rewritten_tables.contains(table_key)
604        {
605            continue;
606        }
607        let entry = snapshot.entry(table_key).ok_or_else(|| {
608            OmniError::manifest(format!(
609                "missing table '{}' for schema index apply",
610                table_key
611            ))
612        })?;
613        ensure_snapshot_entry_head_matches(db, entry).await?;
614        let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
615        let mut ds = db
616            .table_store
617            .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
618            .await?;
619        db.table_store
620            .ensure_expected_version(&ds, table_key, entry.table_version)?;
621        db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
622            .await?;
623        let state = db.table_store.table_state(&dataset_uri, &ds).await?;
624        table_updates.insert(
625            table_key.clone(),
626            crate::db::SubTableUpdate {
627                table_key: table_key.clone(),
628                table_version: state.version,
629                table_branch: None,
630                row_count: state.row_count,
631                version_metadata: state.version_metadata,
632            },
633        );
634    }
635
636    let mut manifest_changes = Vec::new();
637    for (table_key, table_path) in table_registrations {
638        manifest_changes.push(ManifestChange::RegisterTable(TableRegistration {
639            table_key,
640            table_path,
641        }));
642    }
643    for update in table_updates.into_values() {
644        manifest_changes.push(ManifestChange::Update(update));
645    }
646    for (table_key, tombstone_version) in table_tombstones {
647        manifest_changes.push(ManifestChange::Tombstone(TableTombstone {
648            table_key,
649            tombstone_version,
650        }));
651    }
652
653    db.refresh_coordinator_only().await?;
654    let current_manifest_version = db.version().await;
655    if current_manifest_version != base_manifest_version {
656        return Err(OmniError::manifest_conflict(format!(
657            "schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
658            base_manifest_version, current_manifest_version,
659        )));
660    }
661
662    // Atomic schema apply.
663    //
664    // Write the new schema source + IR contract to staging filenames first,
665    // then commit the manifest, then rename staging → final. A crash
666    // between these stages is recoverable on next open via
667    // `recover_schema_state_files`:
668    //   - crash before commit  → manifest unchanged; staging deleted on open
669    //   - crash after commit   → manifest advanced; staging renamed on open
670    crate::failpoints::maybe_fail("schema_apply.before_staging_write")?;
671
672    let staging_pg_uri = schema_source_staging_uri(&db.root_uri);
673    db.storage
674        .write_text(&staging_pg_uri, desired_schema_source)
675        .await?;
676    write_schema_contract_staging(&db.root_uri, db.storage.as_ref(), &desired_ir).await?;
677
678    crate::failpoints::maybe_fail("schema_apply.after_staging_write")?;
679
680    // `apply_schema` doesn't currently take an actor; system-attributed.
681    let PublishedSnapshot {
682        manifest_version,
683        _snapshot_id: _,
684    } = db
685        .coordinator
686        .write()
687        .await
688        .commit_changes_with_actor(&manifest_changes, None)
689        .await?;
690
691    crate::failpoints::maybe_fail("schema_apply.after_manifest_commit")?;
692
693    db.storage
694        .rename_text(&staging_pg_uri, &schema_source_uri(&db.root_uri))
695        .await?;
696    db.storage
697        .rename_text(
698            &schema_ir_staging_uri(&db.root_uri),
699            &schema_ir_uri(&db.root_uri),
700        )
701        .await?;
702    db.storage
703        .rename_text(
704            &schema_state_staging_uri(&db.root_uri),
705            &schema_state_uri(&db.root_uri),
706        )
707        .await?;
708
709    db.store_catalog(desired_catalog);
710    db.store_schema_source(desired_schema_source.to_string());
711    db.coordinator.write().await.refresh().await?;
712    db.runtime_cache.invalidate_all().await;
713    if changed_edge_tables {
714        db.invalidate_graph_index().await;
715    }
716
717    // Recovery sidecar lifecycle: delete after the manifest commit
718    // succeeded. Best-effort: if this delete fails, the sidecar persists
719    // and on next open the sweep sees every table at the post-publish
720    // manifest pin (NoMovement) and the sidecar is treated as a stale
721    // artifact (recovery is a no-op and the sidecar is cleaned up).
722    // Failing the schema_apply call would report failure for a migration
723    // that already succeeded.
724    if let Some(handle) = recovery_handle {
725        if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
726            tracing::warn!(
727                error = %err,
728                operation_id = handle.operation_id.as_str(),
729                "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
730            );
731        }
732    }
733
734    // Hard-drop cleanup: run cleanup_old_versions on each dataset
735    // that had a Hard mode drop step. Best-effort — the schema apply
736    // is already durable. If cleanup fails, the prior data fragments
737    // remain on disk as orphans (reclaimable via `omnigraph cleanup`).
738    // We do NOT fail the apply on cleanup error; the manifest change
739    // is the load-bearing operation.
740    for (table_key, full_uri) in &hard_cleanup_targets {
741        match cleanup_dataset_old_versions(db, full_uri).await {
742            Ok(()) => {}
743            Err(err) => {
744                tracing::warn!(
745                    error = %err,
746                    table_key = table_key.as_str(),
747                    "hard-drop cleanup_old_versions failed; rerun `omnigraph cleanup` to reclaim",
748                );
749            }
750        }
751    }
752
753    Ok(SchemaApplyResult {
754        supported: true,
755        applied: true,
756        manifest_version,
757        steps: plan.steps,
758    })
759}
760
761/// Run `cleanup_old_versions` on a dataset URI with `before_timestamp = now`.
762/// Removes every version older than the current, making time-travel back
763/// to those versions unreachable. Used by Hard mode drops to enforce
764/// "data is gone" semantics post-apply.
765///
766/// The dataset itself isn't deleted — for DropType { Hard }, the
767/// dataset directory persists with only its current version (or, if
768/// no current version was written, its pre-drop version). A future
769/// orphan-cleanup pass should remove the directory entirely.
770async fn cleanup_dataset_old_versions(db: &Omnigraph, full_uri: &str) -> Result<()> {
771    use chrono::Utc;
772    use lance::dataset::cleanup::CleanupPolicy;
773    let ds = lance::Dataset::open(full_uri)
774        .await
775        .map_err(|e| OmniError::Lance(e.to_string()))?;
776    let policy = CleanupPolicy {
777        before_timestamp: Some(Utc::now()),
778        before_version: None,
779        delete_unverified: false,
780        error_if_tagged_old_versions: false,
781        clean_referenced_branches: false,
782        delete_rate_limit: None,
783    };
784    let _removed = lance::dataset::cleanup::cleanup_old_versions(&ds, policy)
785        .await
786        .map_err(|e| OmniError::Lance(e.to_string()))?;
787    let _ = db;
788    Ok(())
789}
790
791pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) -> Result<()> {
792    db.refresh_coordinator_only().await?;
793    ensure_schema_apply_not_locked(db, operation).await
794}
795
796pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> {
797    db.ensure_schema_state_valid().await?;
798    db.refresh_coordinator_only().await?;
799    let branches = db.coordinator.read().await.all_branches().await?;
800    if branches
801        .iter()
802        .any(|branch| is_schema_apply_lock_branch(branch))
803    {
804        return Err(OmniError::manifest_conflict(
805            "schema apply is already in progress".to_string(),
806        ));
807    }
808
809    db.coordinator
810        .write()
811        .await
812        .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
813        .await?;
814    db.refresh_coordinator_only().await?;
815
816    let blocking_branches = db
817        .coordinator
818        .read()
819        .await
820        .all_branches()
821        .await?
822        .into_iter()
823        .filter(|branch| branch != "main" && !is_internal_system_branch(branch))
824        .collect::<Vec<_>>();
825    if !blocking_branches.is_empty() {
826        let _ = release_schema_apply_lock(db).await;
827        return Err(OmniError::manifest_conflict(format!(
828            "schema apply requires a graph with only main; found non-main branches: {}",
829            blocking_branches.join(", ")
830        )));
831    }
832
833    Ok(())
834}
835
836pub(super) async fn release_schema_apply_lock(db: &Omnigraph) -> Result<()> {
837    db.coordinator
838        .write()
839        .await
840        .branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
841        .await?;
842    // Use refresh_coordinator_only — the full Omnigraph::refresh would
843    // run roll-forward-only recovery, and on the failure path the
844    // in-flight schema_apply sidecar is still on disk; recovery would
845    // race the caller's own publish (or roll forward an aborted apply
846    // we want to leave for next-open).
847    db.refresh_coordinator_only().await
848}
849
850pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> {
851    if db
852        .coordinator
853        .read()
854        .await
855        .all_branches()
856        .await?
857        .iter()
858        .any(|branch| is_schema_apply_lock_branch(branch))
859    {
860        return Err(OmniError::manifest_conflict(format!(
861            "{} is unavailable while schema apply is in progress",
862            operation
863        )));
864    }
865    Ok(())
866}
867
868pub(super) async fn ensure_snapshot_entry_head_matches(
869    db: &Omnigraph,
870    entry: &SubTableEntry,
871) -> Result<()> {
872    let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
873    let ds = db
874        .table_store
875        .open_dataset_head_for_write(
876            &entry.table_key,
877            &dataset_uri,
878            entry.table_branch.as_deref(),
879        )
880        .await?;
881    db.table_store
882        .ensure_expected_version(&ds, &entry.table_key, entry.table_version)
883}
884
885pub(super) async fn batch_for_schema_apply_rewrite(
886    db: &Omnigraph,
887    source_ds: &Dataset,
888    source_table_key: &str,
889    source_catalog: &Catalog,
890    target_table_key: &str,
891    target_catalog: &Catalog,
892    property_renames: Option<&HashMap<String, String>>,
893) -> Result<RecordBatch> {
894    let target_schema = schema_for_table_key(target_catalog, target_table_key)?;
895    let source_blob_properties = blob_properties_for_table_key(source_catalog, source_table_key)?;
896    let target_blob_properties = blob_properties_for_table_key(target_catalog, target_table_key)?;
897    let needs_row_ids = !source_blob_properties.is_empty() || !target_blob_properties.is_empty();
898    let batches = if needs_row_ids {
899        db.table_store()
900            .scan_with(source_ds, None, None, None, true, |_| Ok(()))
901            .await?
902    } else {
903        db.table_store().scan_batches(source_ds).await?
904    };
905    if batches.is_empty() {
906        return Ok(RecordBatch::new_empty(target_schema));
907    }
908    let source_schema = batches[0].schema();
909    let batch = concat_or_empty_batches(source_schema, batches)?;
910
911    let row_ids = if needs_row_ids {
912        Some(
913            batch
914                .column_by_name("_rowid")
915                .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
916                .ok_or_else(|| {
917                    OmniError::Lance(format!(
918                        "expected _rowid column when rewriting '{}'",
919                        source_table_key
920                    ))
921                })?
922                .values()
923                .iter()
924                .copied()
925                .collect::<Vec<_>>(),
926        )
927    } else {
928        None
929    };
930
931    let mut columns = Vec::with_capacity(target_schema.fields().len());
932    for field in target_schema.fields() {
933        let source_name = property_renames
934            .and_then(|renames| renames.get(field.name()))
935            .map(String::as_str)
936            .unwrap_or_else(|| field.name().as_str());
937        if let Some(column) = batch.column_by_name(source_name) {
938            if target_blob_properties.contains(field.name())
939                && source_blob_properties.contains(source_name)
940            {
941                let descriptions =
942                    column
943                        .as_any()
944                        .downcast_ref::<StructArray>()
945                        .ok_or_else(|| {
946                            OmniError::Lance(format!(
947                                "expected blob descriptions for '{}.{}'",
948                                source_table_key, source_name
949                            ))
950                        })?;
951                let rebuilt = rebuild_blob_column(
952                    db,
953                    source_ds,
954                    source_name,
955                    descriptions,
956                    row_ids.as_deref().unwrap_or(&[]),
957                )
958                .await?;
959                columns.push(rebuilt);
960            } else {
961                columns.push(column.clone());
962            }
963        } else {
964            columns.push(new_null_array(field.data_type(), batch.num_rows()));
965        }
966    }
967
968    RecordBatch::try_new(target_schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
969}
970
971async fn rebuild_blob_column(
972    _db: &Omnigraph,
973    source_ds: &Dataset,
974    column_name: &str,
975    descriptions: &StructArray,
976    row_ids: &[u64],
977) -> Result<Arc<dyn Array>> {
978    let mut builder = BlobArrayBuilder::new(row_ids.len());
979    let mut non_null_row_ids = Vec::new();
980    let mut row_has_blob = Vec::with_capacity(row_ids.len());
981
982    for row in 0..row_ids.len() {
983        let is_null = blob_description_is_null(descriptions, row)?;
984        row_has_blob.push(!is_null);
985        if !is_null {
986            non_null_row_ids.push(row_ids[row]);
987        }
988    }
989
990    let blob_files = if non_null_row_ids.is_empty() {
991        Vec::new()
992    } else {
993        Arc::new(source_ds.clone())
994            .take_blobs(&non_null_row_ids, column_name)
995            .await
996            .map_err(|e| OmniError::Lance(e.to_string()))?
997    };
998
999    let mut files = blob_files.into_iter();
1000    for has_blob in row_has_blob {
1001        if !has_blob {
1002            builder
1003                .push_null()
1004                .map_err(|e| OmniError::Lance(e.to_string()))?;
1005            continue;
1006        }
1007
1008        let blob = files.next().ok_or_else(|| {
1009            OmniError::Lance(format!(
1010                "blob rewrite for '{}' lost alignment with source rows",
1011                column_name
1012            ))
1013        })?;
1014        if let Some(uri) = blob.uri() {
1015            builder
1016                .push_uri(uri)
1017                .map_err(|e| OmniError::Lance(e.to_string()))?;
1018        } else {
1019            builder
1020                .push_bytes(
1021                    blob.read()
1022                        .await
1023                        .map_err(|e| OmniError::Lance(e.to_string()))?,
1024                )
1025                .map_err(|e| OmniError::Lance(e.to_string()))?;
1026        }
1027    }
1028
1029    if files.next().is_some() {
1030        return Err(OmniError::Lance(format!(
1031            "blob rewrite for '{}' produced extra source blobs",
1032            column_name
1033        )));
1034    }
1035
1036    builder
1037        .finish()
1038        .map_err(|e| OmniError::Lance(e.to_string()))
1039}