Skip to main content

omnigraph/db/omnigraph/
schema_apply.rs

1use super::*;
2
3/// Operator-supplied options that gate schema-apply behavior.
4///
5/// Today the only knob is `allow_data_loss`, which promotes
6/// `DropMode::Soft` steps to `DropMode::Hard` (per chassis v1
7/// commit #5). Soft is the default — drops are reversible via Lance
8/// time travel until cleanup runs. Hard runs `cleanup_old_versions`
9/// on the affected datasets immediately after the manifest publish,
10/// making the prior column data unreachable.
11#[derive(Debug, Clone, Default)]
12pub struct SchemaApplyOptions {
13    /// Allow destructive (data-loss) schema changes. When true, the
14    /// planner promotes every `DropMode::Soft` step to
15    /// `DropMode::Hard`, and the apply path runs
16    /// `cleanup_old_versions` on affected datasets after the publish.
17    pub allow_data_loss: bool,
18}
19
20/// Promote every `Soft` drop variant in the plan to `Hard` when
21/// `allow_data_loss` is set. Idempotent on non-drop steps.
22fn promote_drops_to_hard(plan: &mut SchemaMigrationPlan, allow_data_loss: bool) {
23    if !allow_data_loss {
24        return;
25    }
26    for step in &mut plan.steps {
27        match step {
28            SchemaMigrationStep::DropType { mode, .. }
29            | SchemaMigrationStep::DropProperty { mode, .. } => {
30                *mode = DropMode::Hard;
31            }
32            _ => {}
33        }
34    }
35}
36
37pub(super) async fn plan_schema(
38    db: &Omnigraph,
39    desired_schema_source: &str,
40    options: SchemaApplyOptions,
41) -> Result<SchemaMigrationPlan> {
42    db.ensure_schema_state_valid().await?;
43    let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
44    let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
45    let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
46        .map_err(|err| OmniError::manifest(err.to_string()))?;
47    promote_drops_to_hard(&mut plan, options.allow_data_loss);
48    Ok(plan)
49}
50
51struct PlannedSchemaApply {
52    plan: SchemaMigrationPlan,
53    desired_ir: SchemaIR,
54    desired_catalog: Catalog,
55}
56
57async fn plan_schema_for_apply(
58    db: &Omnigraph,
59    desired_schema_source: &str,
60    options: SchemaApplyOptions,
61) -> Result<PlannedSchemaApply> {
62    db.ensure_schema_state_valid().await?;
63    let branches = db.coordinator.read().await.all_branches().await?;
64    // Skip `main` and internal system branches (the schema-apply lock branch,
65    // the cluster-wide schema-apply serializer). Legacy `__run__*` staging
66    // branches were swept off `__manifest` by the v2→v3 migration that runs in
67    // `Omnigraph::open(ReadWrite)` before this check (MR-770), so they no
68    // longer appear here.
69    let blocking_branches = branches
70        .into_iter()
71        .filter(|branch| branch != "main" && !is_internal_system_branch(branch))
72        .collect::<Vec<_>>();
73    if !blocking_branches.is_empty() {
74        return Err(OmniError::manifest_conflict(format!(
75            "schema apply requires a graph with only main; found non-main branches: {}",
76            blocking_branches.join(", ")
77        )));
78    }
79
80    let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
81    let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
82    let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
83        .map_err(|err| OmniError::manifest(err.to_string()))?;
84    promote_drops_to_hard(&mut plan, options.allow_data_loss);
85    if !plan.supported {
86        let message = plan
87            .steps
88            .iter()
89            .find_map(|step| step.unsupported_error_message())
90            .unwrap_or_else(|| "unsupported schema migration plan".to_string());
91        return Err(OmniError::manifest(message));
92    }
93
94    let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
95    fixup_blob_schemas(&mut desired_catalog);
96    Ok(PlannedSchemaApply {
97        plan,
98        desired_ir,
99        desired_catalog,
100    })
101}
102
103pub(super) async fn preview_schema_apply(
104    db: &Omnigraph,
105    desired_schema_source: &str,
106    options: SchemaApplyOptions,
107) -> Result<SchemaApplyPreview> {
108    let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
109    Ok(SchemaApplyPreview {
110        plan: planned.plan,
111        catalog: planned.desired_catalog,
112    })
113}
114
115pub(super) async fn apply_schema<F>(
116    db: &Omnigraph,
117    desired_schema_source: &str,
118    options: SchemaApplyOptions,
119    actor: Option<&str>,
120    validate_catalog: F,
121) -> Result<SchemaApplyResult>
122where
123    F: FnOnce(&Catalog) -> Result<()>,
124{
125    // Engine-layer policy gate (MR-722 chassis core).
126    //
127    // Fires BEFORE acquiring the schema-apply lock or doing any other
128    // work. When no PolicyChecker is installed this is a no-op and
129    // the apply path behaves exactly as it did before MR-722. When
130    // a PolicyChecker IS installed and the actor is None, this is a
131    // hard error — see Omnigraph::enforce's docstring for the
132    // forget-the-actor-footgun reasoning.
133    //
134    // Scope is TargetBranch("main") to match the HTTP-layer convention
135    // for SchemaApply: branch=None, target_branch=Some("main"). Cedar
136    // policies in the wild use `target_branch_scope: protected` to
137    // gate schema applies, so the engine-layer call has to set the
138    // target_branch shape that activates that predicate. Wrong scope
139    // here = silent policy mismatch with HTTP. See
140    // `omnigraph_policy::ResourceScope::to_branch_pair` for the mapping.
141    db.enforce(
142        omnigraph_policy::PolicyAction::SchemaApply,
143        &omnigraph_policy::ResourceScope::TargetBranch("main".to_string()),
144        actor,
145    )?;
146
147    // Converge any pending recovery sidecar before planning: a table
148    // rewrite over sidecar-covered drift would otherwise re-plan from
149    // the manifest pin and orphan the drifted Phase-B commit (silently
150    // dropping its rows) while the stale sidecar lingers to misclassify
151    // against the post-apply pins. Runs before the apply's own sidecar
152    // exists, so the heal can never observe it.
153    db.heal_pending_recovery_sidecars().await?;
154
155    acquire_schema_apply_lock(db).await?;
156    let result = apply_schema_with_lock(db, desired_schema_source, options, validate_catalog).await;
157    let release_result = release_schema_apply_lock(db).await;
158    match (result, release_result) {
159        (Ok(result), Ok(())) => Ok(result),
160        (Ok(_), Err(err)) => Err(err),
161        (Err(err), Ok(())) => Err(err),
162        (Err(err), Err(_)) => Err(err),
163    }
164}
165
166pub(super) async fn apply_schema_with_lock<F>(
167    db: &Omnigraph,
168    desired_schema_source: &str,
169    options: SchemaApplyOptions,
170    validate_catalog: F,
171) -> Result<SchemaApplyResult>
172where
173    F: FnOnce(&Catalog) -> Result<()>,
174{
175    let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
176    validate_catalog(&planned.desired_catalog)?;
177    let PlannedSchemaApply {
178        plan,
179        desired_ir,
180        desired_catalog,
181    } = planned;
182    if plan.steps.is_empty() {
183        return Ok(SchemaApplyResult {
184            supported: true,
185            applied: false,
186            manifest_version: db.version().await,
187            steps: plan.steps,
188        });
189    }
190
191    let snapshot = db.snapshot().await;
192    let base_manifest_version = snapshot.version();
193    let mut added_tables = BTreeSet::new();
194    let mut renamed_tables = HashMap::new();
195    let mut rewritten_tables = BTreeSet::new();
196    let mut dropped_tables = BTreeSet::new();
197    // Hard-drop cleanup targets: (table_key, full_dataset_uri).
198    // Populated for DropProperty { Hard } and DropType { Hard }; the
199    // post-publish cleanup runs `cleanup_old_versions` on each
200    // dataset to reclaim prior versions, making time-travel back
201    // to pre-drop state unreachable.
202    let mut hard_cleanup_targets: Vec<(String, String)> = Vec::new();
203    let mut property_renames = HashMap::<String, HashMap<String, String>>::new();
204    let mut changed_edge_tables = false;
205
206    for step in &plan.steps {
207        match step {
208            SchemaMigrationStep::AddType { type_kind, name } => {
209                let table_key = schema_table_key(*type_kind, name);
210                if table_key.starts_with("edge:") {
211                    changed_edge_tables = true;
212                }
213                added_tables.insert(table_key);
214            }
215            SchemaMigrationStep::RenameType {
216                type_kind,
217                from,
218                to,
219            } => {
220                let source_key = schema_table_key(*type_kind, from);
221                let target_key = schema_table_key(*type_kind, to);
222                if source_key.starts_with("edge:") {
223                    changed_edge_tables = true;
224                }
225                renamed_tables.insert(target_key, source_key);
226            }
227            SchemaMigrationStep::AddProperty {
228                type_kind,
229                type_name,
230                ..
231            } => {
232                let table_key = schema_table_key(*type_kind, type_name);
233                if table_key.starts_with("edge:") {
234                    changed_edge_tables = true;
235                }
236                rewritten_tables.insert(table_key);
237            }
238            SchemaMigrationStep::RenameProperty {
239                type_kind,
240                type_name,
241                from,
242                to,
243            } => {
244                let table_key = schema_table_key(*type_kind, type_name);
245                if table_key.starts_with("edge:") {
246                    changed_edge_tables = true;
247                }
248                rewritten_tables.insert(table_key.clone());
249                property_renames
250                    .entry(table_key)
251                    .or_default()
252                    .insert(to.clone(), from.clone());
253            }
254            // AddConstraint is only ever an `@index` addition (every other
255            // added constraint plans as UnsupportedChange). It records intent
256            // in the desired catalog/IR; the physical index is built off the
257            // critical path by ensure_indices/optimize (iss-848), so the apply
258            // does no table work for it — a pure metadata change like the two
259            // metadata steps below.
260            SchemaMigrationStep::AddConstraint { .. }
261            | SchemaMigrationStep::UpdateTypeMetadata { .. }
262            | SchemaMigrationStep::UpdatePropertyMetadata { .. } => {}
263            SchemaMigrationStep::DropProperty {
264                type_kind,
265                type_name,
266                mode,
267                ..
268            } => {
269                // Both Soft and Hard route through the existing
270                // stage_overwrite rewrite path. batch_for_schema_apply_rewrite
271                // iterates the *target* schema fields, so a property
272                // absent from desired_catalog is naturally projected
273                // away in the rebuilt batch.
274                //
275                // The difference between Soft and Hard is what
276                // happens AFTER the manifest publish:
277                //   * Soft: nothing — the prior dataset version
278                //     retains the dropped column; reads at
279                //     snapshot_at_version(pre_drop) still see it.
280                //   * Hard: run cleanup_old_versions on the dataset
281                //     post-publish, removing the prior version (and
282                //     reclaiming any fragments unique to it). After
283                //     cleanup, time-travel back fails.
284                let table_key = schema_table_key(*type_kind, type_name);
285                if table_key.starts_with("edge:") {
286                    changed_edge_tables = true;
287                }
288                if matches!(mode, DropMode::Hard) {
289                    let entry = snapshot.entry(&table_key).ok_or_else(|| {
290                        OmniError::manifest(format!(
291                            "missing table '{}' for hard property drop",
292                            table_key
293                        ))
294                    })?;
295                    let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
296                    hard_cleanup_targets.push((table_key.clone(), full_uri));
297                }
298                rewritten_tables.insert(table_key);
299            }
300            SchemaMigrationStep::DropType {
301                type_kind,
302                name,
303                mode,
304            } => {
305                // Both Soft and Hard tombstone the table's entry in
306                // the current __manifest version (no per-table write).
307                //
308                // The difference is what happens after publish:
309                //   * Soft: dataset files retained; prior __manifest
310                //     versions still reference them; Lance time
311                //     travel + branch-from-snapshot can read the
312                //     dropped table.
313                //   * Hard: run cleanup_old_versions on the orphan
314                //     dataset post-publish. Prior dataset versions
315                //     (and their fragments) are reclaimed. The dataset
316                //     directory itself persists until a future
317                //     orphan-cleanup pass — operators who need the
318                //     directory gone too should run `omnigraph cleanup`
319                //     and (for now) remove the directory out-of-band.
320                let table_key = schema_table_key(*type_kind, name);
321                if table_key.starts_with("edge:") {
322                    changed_edge_tables = true;
323                }
324                if matches!(mode, DropMode::Hard) {
325                    let entry = snapshot.entry(&table_key).ok_or_else(|| {
326                        OmniError::manifest(format!(
327                            "missing table '{}' for hard type drop",
328                            table_key
329                        ))
330                    })?;
331                    let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
332                    hard_cleanup_targets.push((table_key.clone(), full_uri));
333                }
334                dropped_tables.insert(table_key);
335            }
336            step @ SchemaMigrationStep::UnsupportedChange { .. } => {
337                return Err(OmniError::manifest(
338                    step.unsupported_error_message()
339                        .unwrap_or_else(|| "unsupported schema migration step".to_string()),
340                ));
341            }
342        }
343    }
344
345    let mut table_registrations = HashMap::<String, String>::new();
346    let mut table_updates = HashMap::<String, crate::db::SubTableUpdate>::new();
347    let mut table_tombstones = HashMap::<String, u64>::new();
348
349    // Recovery sidecar: protect the per-table `stage_overwrite` +
350    // `commit_staged` in rewritten_tables — the only tables that advance Lance
351    // HEAD inline now that index building is deferred to the reconciler
352    // (iss-848). Each rewritten table is exactly one commit, so
353    // `post_commit_pin = expected + 1` is now exact (it was a loose lower bound
354    // when index builds added extra commits); the classifier's loose-match for
355    // SidecarKind::SchemaApply still accepts it.
356    let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = rewritten_tables
357        .iter()
358        .filter_map(|table_key| {
359            let entry = snapshot.entry(table_key)?;
360            Some(crate::db::manifest::SidecarTablePin {
361                table_key: table_key.clone(),
362                table_path: db.storage().dataset_uri(&entry.table_path),
363                expected_version: entry.table_version,
364                post_commit_pin: entry.table_version + 1,
365                // SchemaApply uses the loose match, not BranchMerge's Phase-B
366                // confirmation — left None.
367                confirmed_version: None,
368                table_branch: entry.table_branch.clone(),
369            })
370        })
371        .collect();
372    // Capture additional registrations + tombstones for the sidecar so
373    // recovery can publish them alongside the per-table updates. Without
374    // this, an added type's dataset is created in Phase B but the
375    // manifest never gains an entry for it after roll-forward — the
376    // live `_schema.pg` declares a type the manifest doesn't know about
377    // and reads through the engine fail with "no manifest entry for X".
378    let mut sidecar_registrations: Vec<crate::db::manifest::SidecarTableRegistration> = Vec::new();
379    for table_key in &added_tables {
380        sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
381            table_key: table_key.clone(),
382            table_path: table_path_for_table_key(table_key)?,
383            table_branch: None,
384        });
385    }
386    for target_table_key in renamed_tables.keys() {
387        sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
388            table_key: target_table_key.clone(),
389            table_path: table_path_for_table_key(target_table_key)?,
390            table_branch: None,
391        });
392    }
393    let mut sidecar_tombstones: Vec<crate::db::manifest::SidecarTombstone> = Vec::new();
394    for source_table_key in renamed_tables.values() {
395        let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
396            OmniError::manifest(format!(
397                "missing source table '{}' for schema rename when building recovery sidecar",
398                source_table_key
399            ))
400        })?;
401        sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
402            table_key: source_table_key.clone(),
403            tombstone_version: source_entry.table_version.saturating_add(1),
404        });
405    }
406    // Soft DropType: mark each dropped table for tombstoning in the
407    // recovery sidecar AND in the live table_tombstones map. The
408    // mechanism mirrors rename's source-table tombstone — manifest
409    // entry removed at version+1, dataset files retained, time-travel
410    // reachable until cleanup. No Phase B write happens for these
411    // tables; the recovery sidecar is purely the manifest delta.
412    for dropped_table_key in &dropped_tables {
413        let entry = snapshot.entry(dropped_table_key).ok_or_else(|| {
414            OmniError::manifest(format!(
415                "missing table '{}' for soft drop when building recovery sidecar",
416                dropped_table_key
417            ))
418        })?;
419        let tombstone_version = entry.table_version.saturating_add(1);
420        sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
421            table_key: dropped_table_key.clone(),
422            tombstone_version,
423        });
424        table_tombstones.insert(dropped_table_key.clone(), tombstone_version);
425    }
426
427    // Acquire per-(table_key, branch) queues for every existing table
428    // that schema_apply will rewrite or re-index. New tables (added or
429    // renamed targets) aren't acquired — they have no existing dataset
430    // to race against. Held across the per-table commit loop and the
431    // manifest publish via `commit_changes_with_actor` below.
432    //
433    // Schema-apply already holds the graph-wide `__schema_apply_lock__`
434    // sentinel branch, so these per-table acquisitions are uncontended in
435    // practice. They exist for symmetry with the recovery reconciler, which
436    // acquires the same queues before any `Dataset::restore` it issues for
437    // SchemaApply sidecars.
438    let mut schema_apply_queue_keys: Vec<(String, Option<String>)> = recovery_pins
439        .iter()
440        .map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
441        .collect();
442    // The serialization key the write-entry heal acquires before touching
443    // schema staging or a SchemaApply sidecar. Per-table keys alone don't
444    // cover a registration-only migration (no pins, but a sidecar and
445    // staging files on disk) — without this, a concurrent write's heal can
446    // promote this apply's staging files and publish its registrations out
447    // from under it. Acquired whenever a sidecar will be written, held
448    // through Phase D (the guards live to the end of this function).
449    let writes_sidecar = !(recovery_pins.is_empty()
450        && sidecar_registrations.is_empty()
451        && sidecar_tombstones.is_empty());
452    if writes_sidecar {
453        schema_apply_queue_keys.push(crate::db::manifest::schema_apply_serial_queue_key());
454    }
455    let _schema_apply_queue_guards = db
456        .write_queue()
457        .acquire_many(&schema_apply_queue_keys)
458        .await;
459
460    let recovery_handle = if !writes_sidecar {
461        None
462    } else {
463        // `branch=None` because schema_apply publishes against main —
464        // the `__schema_apply_lock__` branch is purely a serialization
465        // sentinel (acquire_schema_apply_lock creates it; the manifest
466        // publish via coordinator.commit_changes_with_actor below targets
467        // the coordinator's active branch, which is the pre-lock branch).
468        // If the lock release fires before recovery, the lock branch is
469        // gone — the sidecar must not reference it.
470        let mut sidecar = crate::db::manifest::new_sidecar(
471            crate::db::manifest::SidecarKind::SchemaApply,
472            None,
473            // `apply_schema` doesn't currently take an actor (no `apply_schema_as`
474            // public API). The HTTP server's /schema/apply handler can pass actor
475            // context through a follow-up addition. For now, system-attributed.
476            None,
477            recovery_pins,
478        );
479        sidecar.additional_registrations = sidecar_registrations;
480        sidecar.tombstones = sidecar_tombstones;
481        Some(
482            crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
483                .await?,
484        )
485    };
486
487    for table_key in &added_tables {
488        let table_path = table_path_for_table_key(table_key)?;
489        let dataset_uri = db.storage().dataset_uri(&table_path);
490        let schema = schema_for_table_key(&desired_catalog, table_key)?;
491        let ds =
492            SnapshotHandle::new(TableStore::create_empty_dataset(&dataset_uri, &schema).await?);
493        // Indexes for the new table are materialized off the critical path by
494        // ensure_indices/optimize (iss-848); a 0-row table is never trainable
495        // anyway. The @index intent is recorded in the persisted catalog/IR.
496        let state = db.storage().table_state(&dataset_uri, &ds).await?;
497        table_registrations.insert(table_key.clone(), table_path);
498        table_updates.insert(
499            table_key.clone(),
500            crate::db::SubTableUpdate {
501                table_key: table_key.clone(),
502                table_version: state.version,
503                table_branch: None,
504                row_count: state.row_count,
505                version_metadata: state.version_metadata,
506            },
507        );
508    }
509
510    for (target_table_key, source_table_key) in &renamed_tables {
511        let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
512            OmniError::manifest(format!(
513                "missing source table '{}' for schema rename",
514                source_table_key
515            ))
516        })?;
517        ensure_snapshot_entry_head_matches(db, source_entry).await?;
518        let source_ds = db
519            .storage()
520            .open_snapshot_at_table(&snapshot, source_table_key)
521            .await?;
522        let current_catalog = db.catalog();
523        let batch = batch_for_schema_apply_rewrite(
524            db,
525            &source_ds,
526            source_table_key,
527            &current_catalog,
528            target_table_key,
529            &desired_catalog,
530            property_renames.get(target_table_key),
531        )
532        .await?;
533        let table_path = table_path_for_table_key(target_table_key)?;
534        let dataset_uri = db.storage().dataset_uri(&table_path);
535        let target_ds = SnapshotHandle::new(TableStore::write_dataset(&dataset_uri, batch).await?);
536        // Indexes on the renamed table are reconciled later (iss-848).
537        let state = db.storage().table_state(&dataset_uri, &target_ds).await?;
538        table_registrations.insert(target_table_key.clone(), table_path);
539        table_updates.insert(
540            target_table_key.clone(),
541            crate::db::SubTableUpdate {
542                table_key: target_table_key.clone(),
543                table_version: state.version,
544                table_branch: None,
545                row_count: state.row_count,
546                version_metadata: state.version_metadata,
547            },
548        );
549        table_tombstones.insert(
550            source_table_key.clone(),
551            source_entry.table_version.saturating_add(1),
552        );
553    }
554
555    for table_key in &rewritten_tables {
556        if added_tables.contains(table_key) || renamed_tables.contains_key(table_key) {
557            continue;
558        }
559        let entry = snapshot.entry(table_key).ok_or_else(|| {
560            OmniError::manifest(format!(
561                "missing source table '{}' for schema apply",
562                table_key
563            ))
564        })?;
565        ensure_snapshot_entry_head_matches(db, entry).await?;
566        let source_ds = db
567            .storage()
568            .open_snapshot_at_table(&snapshot, table_key)
569            .await?;
570        let current_catalog = db.catalog();
571        let batch = batch_for_schema_apply_rewrite(
572            db,
573            &source_ds,
574            table_key,
575            &current_catalog,
576            table_key,
577            &desired_catalog,
578            property_renames.get(table_key),
579        )
580        .await?;
581        let dataset_uri = db.storage().dataset_uri(&entry.table_path);
582        // Pass `entry.table_branch.as_deref()` (not `None`) for
583        // consistency with the indexed_tables block below. Schema
584        // apply runs under `__schema_apply_lock__` which today rejects
585        // non-main branches, so `entry.table_branch` is expected to be
586        // `None`. But the defensive passthrough means a future relaxation
587        // of the lock-check can't quietly open the wrong HEAD here.
588        let existing = db
589            .storage()
590            .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
591            .await?;
592        let staged = db.storage().stage_overwrite(&existing, batch).await?;
593        let target_ds = db.storage().commit_staged(existing, staged).await?;
594        // The rewrite drops the table's existing index coverage; it is
595        // restored off the critical path by optimize's optimize_indices /
596        // ensure_indices (iss-848). Reads scan uncovered fragments meanwhile.
597        let state = db.storage().table_state(&dataset_uri, &target_ds).await?;
598        table_updates.insert(
599            table_key.clone(),
600            crate::db::SubTableUpdate {
601                table_key: table_key.clone(),
602                table_version: state.version,
603                table_branch: None,
604                row_count: state.row_count,
605                version_metadata: state.version_metadata,
606            },
607        );
608    }
609
610    // Index-only changes (AddConstraint, i.e. adding an `@index`) are pure
611    // metadata: the new `@index` intent is recorded in the desired catalog/IR
612    // persisted below, and the physical index is materialized off the critical
613    // path by `ensure_indices`/`optimize` (iss-848). Schema apply touches no
614    // table data for them, so there is no per-table loop here and no recovery
615    // pin (no Lance HEAD advances). Reads stay correct meanwhile via a scan.
616
617    let mut manifest_changes = Vec::new();
618    for (table_key, table_path) in table_registrations {
619        manifest_changes.push(ManifestChange::RegisterTable(TableRegistration {
620            table_key,
621            table_path,
622        }));
623    }
624    for update in table_updates.into_values() {
625        manifest_changes.push(ManifestChange::Update(update));
626    }
627    for (table_key, tombstone_version) in table_tombstones {
628        manifest_changes.push(ManifestChange::Tombstone(TableTombstone {
629            table_key,
630            tombstone_version,
631        }));
632    }
633
634    db.refresh_coordinator_only().await?;
635    let current_manifest_version = db.version().await;
636    if current_manifest_version != base_manifest_version {
637        return Err(OmniError::manifest_conflict(format!(
638            "schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
639            base_manifest_version, current_manifest_version,
640        )));
641    }
642
643    // Atomic schema apply.
644    //
645    // Write the new schema source + IR contract to staging filenames first,
646    // then commit the manifest, then rename staging → final. A crash
647    // between these stages is recoverable on next open via
648    // `recover_schema_state_files`:
649    //   - crash before commit  → manifest unchanged; staging deleted on open
650    //   - crash after commit   → manifest advanced; staging renamed on open
651    crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_BEFORE_STAGING_WRITE)?;
652
653    let staging_pg_uri = schema_source_staging_uri(&db.root_uri);
654    db.storage
655        .write_text(&staging_pg_uri, desired_schema_source)
656        .await?;
657    write_schema_contract_staging(&db.root_uri, db.storage.as_ref(), &desired_ir).await?;
658
659    crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_AFTER_STAGING_WRITE)?;
660
661    // `apply_schema` doesn't currently take an actor; system-attributed.
662    let PublishedSnapshot {
663        manifest_version,
664        _snapshot_id: _,
665    } = db
666        .coordinator
667        .write()
668        .await
669        .commit_changes_with_actor(&manifest_changes, None)
670        .await?;
671
672    crate::failpoints::maybe_fail(crate::failpoints::names::SCHEMA_APPLY_AFTER_MANIFEST_COMMIT)?;
673
674    db.storage
675        .rename_text(&staging_pg_uri, &schema_source_uri(&db.root_uri))
676        .await?;
677    db.storage
678        .rename_text(
679            &schema_ir_staging_uri(&db.root_uri),
680            &schema_ir_uri(&db.root_uri),
681        )
682        .await?;
683    db.storage
684        .rename_text(
685            &schema_state_staging_uri(&db.root_uri),
686            &schema_state_uri(&db.root_uri),
687        )
688        .await?;
689
690    db.store_catalog(desired_catalog);
691    db.store_schema_source(desired_schema_source.to_string());
692    db.coordinator.write().await.refresh().await?;
693    db.runtime_cache.invalidate_all().await;
694    if changed_edge_tables {
695        db.invalidate_graph_index().await;
696    }
697
698    // Recovery sidecar lifecycle: delete after the manifest commit
699    // succeeded. Best-effort: if this delete fails, the sidecar persists
700    // and on next open the sweep sees every table at the post-publish
701    // manifest pin (NoMovement) and the sidecar is treated as a stale
702    // artifact (recovery is a no-op and the sidecar is cleaned up).
703    // Failing the schema_apply call would report failure for a migration
704    // that already succeeded.
705    if let Some(handle) = recovery_handle {
706        if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
707            tracing::warn!(
708                error = %err,
709                operation_id = handle.operation_id.as_str(),
710                "recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
711            );
712        }
713    }
714
715    // Hard-drop cleanup: run cleanup_old_versions on each dataset
716    // that had a Hard mode drop step. Best-effort — the schema apply
717    // is already durable. If cleanup fails, the prior data fragments
718    // remain on disk as orphans (reclaimable via `omnigraph cleanup`).
719    // We do NOT fail the apply on cleanup error; the manifest change
720    // is the load-bearing operation.
721    for (table_key, full_uri) in &hard_cleanup_targets {
722        match cleanup_dataset_old_versions(db, full_uri).await {
723            Ok(()) => {}
724            Err(err) => {
725                tracing::warn!(
726                    error = %err,
727                    table_key = table_key.as_str(),
728                    "hard-drop cleanup_old_versions failed; rerun `omnigraph cleanup` to reclaim",
729                );
730            }
731        }
732    }
733
734    Ok(SchemaApplyResult {
735        supported: true,
736        applied: true,
737        manifest_version,
738        steps: plan.steps,
739    })
740}
741
742/// Run `cleanup_old_versions` on a dataset URI with `before_timestamp = now`.
743/// Removes every version older than the current, making time-travel back
744/// to those versions unreachable. Used by Hard mode drops to enforce
745/// "data is gone" semantics post-apply.
746///
747/// The dataset itself isn't deleted — for DropType { Hard }, the
748/// dataset directory persists with only its current version (or, if
749/// no current version was written, its pre-drop version). A future
750/// orphan-cleanup pass should remove the directory entirely.
751async fn cleanup_dataset_old_versions(db: &Omnigraph, full_uri: &str) -> Result<()> {
752    use chrono::Utc;
753    use lance::dataset::cleanup::CleanupPolicy;
754    // forbidden-api-allow: maintenance (Hard-drop version GC) opens the dataset to run cleanup_old_versions.
755    let ds = lance::Dataset::open(full_uri)
756        .await
757        .map_err(|e| OmniError::Lance(e.to_string()))?;
758    let policy = CleanupPolicy {
759        before_timestamp: Some(Utc::now()),
760        before_version: None,
761        delete_unverified: false,
762        error_if_tagged_old_versions: false,
763        clean_referenced_branches: false,
764        delete_rate_limit: None,
765    };
766    let _removed = lance::dataset::cleanup::cleanup_old_versions(&ds, policy)
767        .await
768        .map_err(|e| OmniError::Lance(e.to_string()))?;
769    let _ = db;
770    Ok(())
771}
772
773pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) -> Result<()> {
774    db.refresh_coordinator_only().await?;
775    ensure_schema_apply_not_locked(db, operation).await
776}
777
778pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> {
779    db.ensure_schema_state_valid().await?;
780    db.refresh_coordinator_only().await?;
781    let branches = db.coordinator.read().await.all_branches().await?;
782    if branches
783        .iter()
784        .any(|branch| is_schema_apply_lock_branch(branch))
785    {
786        return Err(OmniError::manifest_conflict(
787            "schema apply is already in progress".to_string(),
788        ));
789    }
790
791    db.coordinator
792        .write()
793        .await
794        .branch_create(SCHEMA_APPLY_LOCK_BRANCH)
795        .await?;
796    db.refresh_coordinator_only().await?;
797
798    let blocking_branches = db
799        .coordinator
800        .read()
801        .await
802        .all_branches()
803        .await?
804        .into_iter()
805        .filter(|branch| branch != "main" && !is_internal_system_branch(branch))
806        .collect::<Vec<_>>();
807    if !blocking_branches.is_empty() {
808        let _ = release_schema_apply_lock(db).await;
809        return Err(OmniError::manifest_conflict(format!(
810            "schema apply requires a graph with only main; found non-main branches: {}",
811            blocking_branches.join(", ")
812        )));
813    }
814
815    Ok(())
816}
817
818pub(super) async fn release_schema_apply_lock(db: &Omnigraph) -> Result<()> {
819    db.coordinator
820        .write()
821        .await
822        .branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
823        .await?;
824    // Use refresh_coordinator_only — the full Omnigraph::refresh would
825    // run roll-forward-only recovery, and on the failure path the
826    // in-flight schema_apply sidecar is still on disk; recovery would
827    // race the caller's own publish (or roll forward an aborted apply
828    // we want to leave for next-open).
829    db.refresh_coordinator_only().await
830}
831
832pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> {
833    if db
834        .coordinator
835        .read()
836        .await
837        .all_branches()
838        .await?
839        .iter()
840        .any(|branch| is_schema_apply_lock_branch(branch))
841    {
842        return Err(OmniError::manifest_conflict(format!(
843            "{} is unavailable while schema apply is in progress",
844            operation
845        )));
846    }
847    Ok(())
848}
849
850pub(super) async fn ensure_snapshot_entry_head_matches(
851    db: &Omnigraph,
852    entry: &SubTableEntry,
853) -> Result<()> {
854    let dataset_uri = db.storage().dataset_uri(&entry.table_path);
855    let ds = db
856        .storage()
857        .open_dataset_head_for_write(
858            &entry.table_key,
859            &dataset_uri,
860            entry.table_branch.as_deref(),
861        )
862        .await?;
863    db.storage()
864        .ensure_expected_version(&ds, &entry.table_key, entry.table_version)
865}
866
867pub(super) async fn batch_for_schema_apply_rewrite(
868    db: &Omnigraph,
869    source_ds: &SnapshotHandle,
870    source_table_key: &str,
871    source_catalog: &Catalog,
872    target_table_key: &str,
873    target_catalog: &Catalog,
874    property_renames: Option<&HashMap<String, String>>,
875) -> Result<RecordBatch> {
876    let target_schema = schema_for_table_key(target_catalog, target_table_key)?;
877    let source_blob_properties = blob_properties_for_table_key(source_catalog, source_table_key)?;
878    let target_blob_properties = blob_properties_for_table_key(target_catalog, target_table_key)?;
879    let needs_row_ids = !source_blob_properties.is_empty() || !target_blob_properties.is_empty();
880    let batches = if needs_row_ids {
881        db.storage()
882            .scan_with_row_id(source_ds, None, None, None, true)
883            .await?
884    } else {
885        db.storage().scan_batches(source_ds).await?
886    };
887    if batches.is_empty() {
888        return Ok(RecordBatch::new_empty(target_schema));
889    }
890    let source_schema = batches[0].schema();
891    let batch = concat_or_empty_batches(source_schema, batches)?;
892
893    let row_ids = if needs_row_ids {
894        Some(
895            batch
896                .column_by_name("_rowid")
897                .and_then(|col| col.as_any().downcast_ref::<UInt64Array>())
898                .ok_or_else(|| {
899                    OmniError::Lance(format!(
900                        "expected _rowid column when rewriting '{}'",
901                        source_table_key
902                    ))
903                })?
904                .values()
905                .iter()
906                .copied()
907                .collect::<Vec<_>>(),
908        )
909    } else {
910        None
911    };
912
913    let mut columns = Vec::with_capacity(target_schema.fields().len());
914    for field in target_schema.fields() {
915        let source_name = property_renames
916            .and_then(|renames| renames.get(field.name()))
917            .map(String::as_str)
918            .unwrap_or_else(|| field.name().as_str());
919        if let Some(column) = batch.column_by_name(source_name) {
920            if target_blob_properties.contains(field.name())
921                && source_blob_properties.contains(source_name)
922            {
923                let descriptions =
924                    column
925                        .as_any()
926                        .downcast_ref::<StructArray>()
927                        .ok_or_else(|| {
928                            OmniError::Lance(format!(
929                                "expected blob descriptions for '{}.{}'",
930                                source_table_key, source_name
931                            ))
932                        })?;
933                let rebuilt = rebuild_blob_column(
934                    db,
935                    source_ds,
936                    source_name,
937                    descriptions,
938                    row_ids.as_deref().unwrap_or(&[]),
939                )
940                .await?;
941                columns.push(rebuilt);
942            } else {
943                columns.push(column.clone());
944            }
945        } else {
946            columns.push(new_null_array(field.data_type(), batch.num_rows()));
947        }
948    }
949
950    RecordBatch::try_new(target_schema, columns).map_err(|e| OmniError::Lance(e.to_string()))
951}
952
953async fn rebuild_blob_column(
954    _db: &Omnigraph,
955    source_ds: &SnapshotHandle,
956    column_name: &str,
957    descriptions: &StructArray,
958    row_ids: &[u64],
959) -> Result<Arc<dyn Array>> {
960    let mut builder = BlobArrayBuilder::new(row_ids.len());
961    let mut non_null_row_ids = Vec::new();
962    let mut row_has_blob = Vec::with_capacity(row_ids.len());
963
964    for row in 0..row_ids.len() {
965        let is_null = blob_description_is_null(descriptions, row)?;
966        row_has_blob.push(!is_null);
967        if !is_null {
968            non_null_row_ids.push(row_ids[row]);
969        }
970    }
971
972    let blob_files = if non_null_row_ids.is_empty() {
973        Vec::new()
974    } else {
975        Arc::new(source_ds.dataset().clone())
976            .take_blobs(&non_null_row_ids, column_name)
977            .await
978            .map_err(|e| OmniError::Lance(e.to_string()))?
979    };
980
981    let mut files = blob_files.into_iter();
982    for has_blob in row_has_blob {
983        if !has_blob {
984            builder
985                .push_null()
986                .map_err(|e| OmniError::Lance(e.to_string()))?;
987            continue;
988        }
989
990        let blob = files.next().ok_or_else(|| {
991            OmniError::Lance(format!(
992                "blob rewrite for '{}' lost alignment with source rows",
993                column_name
994            ))
995        })?;
996        if let Some(uri) = blob.uri() {
997            builder
998                .push_uri(uri)
999                .map_err(|e| OmniError::Lance(e.to_string()))?;
1000        } else {
1001            builder
1002                .push_bytes(
1003                    blob.read()
1004                        .await
1005                        .map_err(|e| OmniError::Lance(e.to_string()))?,
1006                )
1007                .map_err(|e| OmniError::Lance(e.to_string()))?;
1008        }
1009    }
1010
1011    if files.next().is_some() {
1012        return Err(OmniError::Lance(format!(
1013            "blob rewrite for '{}' produced extra source blobs",
1014            column_name
1015        )));
1016    }
1017
1018    builder
1019        .finish()
1020        .map_err(|e| OmniError::Lance(e.to_string()))
1021}